-Re: Performance of direct vs indirect shuffling
Kevin Burton 2011-12-21, 08:32
Twister, another mapred impl, uses a pub/sub based system like ActiveMQ ...
Peregrine , the mapred impl I've been working on just uses HTTP, Netty, and
async IO to do the same thing.
Note that you mention a BUFFER ... just buffering the IO is not enough.
Most jobs will be larger than memory AND you have to transfer the data ...
so use the buffer to compress and combine the data before it is sent.
On Tue, Dec 20, 2011 at 10:07 PM, Binglin Chang <[EMAIL PROTECTED]> wrote:
> One possible optimization I hope to look into next year is to change
>> the map output code to push the data to the local TT, which would have
>> configurable in-memory buffers.
> Have someone ever considered a general data transfer service bundled with
> YARN? So other applications(rather than MR) can also benefits from it.
> The data transfer service looks like real world mail service, it has two
> simple interface: register send receive (stream based)
> Mapper & Recuder: Register(LocalMailService, Address(AppId,
> Mapper: send(LocalMailService, from=Address(AppId, MapperId),
> to=Address(AppId, ReducerId), data=xxx);
> Reducer: recv(LocalMailService, from=Address(AppId, MapperId),
> to=Address(AppId, ReducerId));
> LocalMailService manages a big buffer(configurable), so it can cache map
> outputs or dump to disk if there is no memory;
> LocalMailService can start transfering data from source to dest if both
> addresses is registered;
> If source & dest are in the same machine, there will be no network
> To address can be multiple(broadcast or like mail group), this is useful
> for 1:N data transfers(binary/side data distribution), the service can use
> P2P for this kind of work(much better than -cacheFile)
> Just an idea, if anyone interested.
> On Wed, Dec 21, 2011 at 10:12 AM, Kevin Burton <[EMAIL PROTECTED]>wrote:
>> We've discussed 'push' v/s 'pull' shuffle multiple times and each time
>>> turned away due to complexities in MR1. With MRv2 (YARN) this would be much
>>> more doable.
>> Ah.... gotcha. This is what I expected as well. It would be interesting
>> to see a list of changes like this in MR1 vs MR2 to see what could
>> POTENTIALLY happen in the future should everyone get spare time.
>>> So, to really not do any disk i/o during the shuffle you'd need very
>>> large amounts of RAM...
>> Why is that? I'm not suggesting buffering it *all* but send it directly
>> when it is generated.
>> I think there should be a SMALL amount of buffer for combining , and
>> compressing the data though. Normally like 250-500MB per mapper but this
>> is when running say a 250GB job so this buffer is just to reduce IO sent to
>> the remote node.
>> Also, currently, the shuffle is effected by the reduce task. This has two
>>> major benefits :
>>> # The 'pull' can only be initiated after the reduce is scheduled. The
>>> 'push' model would be hampered if the reduce hasn't been started.
>> I've gone over this problem a number of times. The way I'm handling it
>> is that ever map attempt is recorded and only successful maps actually have
>> their data reduced. You end up having MORE intermediate data if machines
>> are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
>> clusters with lots of crashes this won't end up being a significant
>> percentage of the data.
>>> # The 'pull' is more resilient to failure of a single reduce. In the
>>> push model, it's harder to deal with a reduce
>> failing after a push from the map.
>> I don't see how this would be the case ... I'm replicating all the
>> shuffle data ... so if a reducer crashes I just startup a new one.
>> There IS the problem of whether we replicate the intermediate data from
>> the reducer but this can be a configuration option...
>>> Again, with MR2 we could experiment with push v/s pull where it makes
>>> sense (small jobs etc.). I'd love to mentor/help someone interested in
Founder/CEO Spinn3r.com <http://spinn3r.com/>
Location: *San Francisco, CA*
Skype-in: *(415) 871-0687*