-Re: Performance of direct vs indirect shuffling
Binglin Chang 2011-12-21, 06:07
> One possible optimization I hope to look into next year is to change
> the map output code to push the data to the local TT, which would have
> configurable in-memory buffers.
Have someone ever considered a general data transfer service bundled with
YARN? So other applications(rather than MR) can also benefits from it.
The data transfer service looks like real world mail service, it has two
simple interface: register send receive (stream based)
Mapper & Recuder: Register(LocalMailService, Address(AppId,
Mapper: send(LocalMailService, from=Address(AppId, MapperId),
to=Address(AppId, ReducerId), data=xxx);
Reducer: recv(LocalMailService, from=Address(AppId, MapperId),
LocalMailService manages a big buffer(configurable), so it can cache map
outputs or dump to disk if there is no memory;
LocalMailService can start transfering data from source to dest if both
addresses is registered;
If source & dest are in the same machine, there will be no network transfer;
To address can be multiple(broadcast or like mail group), this is useful
for 1:N data transfers(binary/side data distribution), the service can use
P2P for this kind of work(much better than -cacheFile)
Just an idea, if anyone interested.
On Wed, Dec 21, 2011 at 10:12 AM, Kevin Burton <[EMAIL PROTECTED]> wrote:
> We've discussed 'push' v/s 'pull' shuffle multiple times and each time
>> turned away due to complexities in MR1. With MRv2 (YARN) this would be much
>> more doable.
> Ah.... gotcha. This is what I expected as well. It would be interesting
> to see a list of changes like this in MR1 vs MR2 to see what could
> POTENTIALLY happen in the future should everyone get spare time.
>> So, to really not do any disk i/o during the shuffle you'd need very
>> large amounts of RAM...
> Why is that? I'm not suggesting buffering it *all* but send it directly
> when it is generated.
> I think there should be a SMALL amount of buffer for combining , and
> compressing the data though. Normally like 250-500MB per mapper but this
> is when running say a 250GB job so this buffer is just to reduce IO sent to
> the remote node.
> Also, currently, the shuffle is effected by the reduce task. This has two
>> major benefits :
>> # The 'pull' can only be initiated after the reduce is scheduled. The
>> 'push' model would be hampered if the reduce hasn't been started.
> I've gone over this problem a number of times. The way I'm handling it is
> that ever map attempt is recorded and only successful maps actually have
> their data reduced. You end up having MORE intermediate data if machines
> are crashing but it's only CHUNK_SIZE per ever crash and even in LARGE
> clusters with lots of crashes this won't end up being a significant
> percentage of the data.
>> # The 'pull' is more resilient to failure of a single reduce. In the push
>> model, it's harder to deal with a reduce
> failing after a push from the map.
> I don't see how this would be the case ... I'm replicating all the shuffle
> data ... so if a reducer crashes I just startup a new one.
> There IS the problem of whether we replicate the intermediate data from
> the reducer but this can be a configuration option...
>> Again, with MR2 we could experiment with push v/s pull where it makes
>> sense (small jobs etc.). I'd love to mentor/help someone interested in
>> putting cycles into it.
> I'm going to be doing a ton of work in this area and I'll publish it if I
> come across anything interesting.