|
Jiwei Li
2012-11-07, 06:56
Harsh J
2012-11-07, 14:05
Robert Evans
2012-11-07, 15:45
Jiwei Li
2012-11-08, 05:32
Robert Evans
2012-11-08, 16:59
Luke Lu
2012-11-09, 17:54
|
-
Shuffle phase: fine-grained control of data flowJiwei Li 2012-11-07, 06:56
Dear all,
For jobs like Sort, massive amounts of network traffic happen during shuffle phase. The simple mechanism in Hadoop 1.0.4 to choose reduce nodes does not help reduce network traffic. If JobTracker is fully aware of locations of every map output, why not take advantage of this topology knowledge? So, is there anyone who knows where to develop such codes upon? Many thanks. Regards. -- Jiwei
-
Re: Shuffle phase: fine-grained control of data flowHarsh J 2012-11-07, 14:05
Hi Jiwei,
In trunk (i.e. MR2), the completion events selection + scheduling logic lies under class EventFetcher's getMapCompletionEvents() method, as viewable at http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?view=markup This EventFetcher thread is used by the Shuffle (reduce package) class, to continually do the shuffling. The Shuffle class is then itself used by the ReduceTask class (look in mapred package of same maven module). I guess you can start there, to see if a better selection+scheduling logic would yield better results. On Wed, Nov 7, 2012 at 12:26 PM, Jiwei Li <[EMAIL PROTECTED]> wrote: > Dear all, > > For jobs like Sort, massive amounts of network traffic happen during > shuffle phase. The simple mechanism in Hadoop 1.0.4 to choose reduce nodes > does not help reduce network traffic. If JobTracker is fully aware of > locations of every map output, why not take advantage of this topology > knowledge? > > So, is there anyone who knows where to develop such codes upon? Many thanks. > > Regards. > -- > Jiwei -- Harsh J
-
Re: Shuffle phase: fine-grained control of data flowRobert Evans 2012-11-07, 15:45
Jiwei,
I think you could use that knowledge to launch reducers closer to the map output, but I am not sure that it would make much difference. It may even slow things down. It is a question of several things 1) Can we get enough map tasks close to one another that it will make a difference? 2) Does the reduced shuffle time offset the overhead of waiting for the map location data before launching and fetching data early? 3) and do the time savings also offset the overhead of getting the map tasks to be close to one another? For #2 you might be able to deal with this by using speculative execution, and launching some reduce tasks later if you see a clustering of map output. For #1 it will require changes to how we schedule tasks which depending on how well it is implemented will impact #3 as well. Additionally for #1 any job that approaches the same order of size as the cluster will almost require the map tasks to be evenly distributed around the cluster. If you can come up with a patch I would love to see some performance numbers. Personally I think spending time reducing the size of the data sent to the reducers is a much bigger win. Can you use a combiner? Do you really need all of the data or can you sample the data to get a statistically significant picture of what is in the data? Have you enabled compression between the maps and the reducers? --Bobby On 11/7/12 8:05 AM, "Harsh J" <[EMAIL PROTECTED]> wrote: >Hi Jiwei, > >In trunk (i.e. MR2), the completion events selection + scheduling >logic lies under class EventFetcher's getMapCompletionEvents() method, >as viewable at >http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/ >hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apa >che/hadoop/mapreduce/task/reduce/EventFetcher.java?view=markup > >This EventFetcher thread is used by the Shuffle (reduce package) >class, to continually do the shuffling. The Shuffle class is then >itself used by the ReduceTask class (look in mapred package of same >maven module). > >I guess you can start there, to see if a better selection+scheduling >logic would yield better results. > >On Wed, Nov 7, 2012 at 12:26 PM, Jiwei Li <[EMAIL PROTECTED]> wrote: >> Dear all, >> >> For jobs like Sort, massive amounts of network traffic happen during >> shuffle phase. The simple mechanism in Hadoop 1.0.4 to choose reduce >>nodes >> does not help reduce network traffic. If JobTracker is fully aware of >> locations of every map output, why not take advantage of this topology >> knowledge? >> >> So, is there anyone who knows where to develop such codes upon? Many >>thanks. >> >> Regards. >> -- >> Jiwei > > > >-- >Harsh J
-
Re: Shuffle phase: fine-grained control of data flowJiwei Li 2012-11-08, 05:32
Hi Bobby,
Thank you a lot for your suggestions. My whole idea is to minimize the aggregate network bandwidth during Shuffle Phase, that is, to limit the hops to minimum when transmitting data from map node to reduce node. Usually, Partitioner creates skews that the JobTracker allocates different amounts of map outputs to participating reduce nodes. Making reduce nodes near map outputs with largest concerned partitions can reduce the aggregate network bandwidth. For #1, there is no need to schedule map tasks to be close to one another, since it will only congest links among the cluster. For #2, the location and size of each partition in each map output can be sent to JobTracker along with the processing of InputSplit. Collecting enough such information (not necessarily waiting map tasks to finish), the JobTracker starts to schedule reduce tasks to fetch map output data. #3 is the same as #1. Now the tricky part is that if all map outputs are spilled to disks, network bandwidth may not be a bottleneck, because the time consumed in disk seeks outnumbers that in data transmission. If map outputs fit in memory, then network must be taken seriously. Also note that for evenly distributed map outputs, current scheduling policy works just fine. Jiwei On Wed, Nov 7, 2012 at 11:45 PM, Robert Evans <[EMAIL PROTECTED]> wrote: > Jiwei, > > I think you could use that knowledge to launch reducers closer to the map > output, but I am not sure that it would make much difference. It may even > slow things down. It is a question of several things > > 1) Can we get enough map tasks close to one another that it will make a > difference? > 2) Does the reduced shuffle time offset the overhead of waiting for the > map location data before launching and fetching data early? > 3) and do the time savings also offset the overhead of getting the map > tasks to be close to one another? > > For #2 you might be able to deal with this by using speculative execution, > and launching some reduce tasks later if you see a clustering of map > output. For #1 it will require changes to how we schedule tasks which > depending on how well it is implemented will impact #3 as well. > Additionally for #1 any job that approaches the same order of size as the > cluster will almost require the map tasks to be evenly distributed around > the cluster. If you can come up with a patch I would love to see some > performance numbers. > > Personally I think spending time reducing the size of the data sent to the > reducers is a much bigger win. Can you use a combiner? Do you really need > all of the data or can you sample the data to get a statistically > significant picture of what is in the data? Have you enabled compression > between the maps and the reducers? > > --Bobby > > On 11/7/12 8:05 AM, "Harsh J" <[EMAIL PROTECTED]> wrote: > > >Hi Jiwei, > > > >In trunk (i.e. MR2), the completion events selection + scheduling > >logic lies under class EventFetcher's getMapCompletionEvents() method, > >as viewable at > > > http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/ > >hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apa > >che/hadoop/mapreduce/task/reduce/EventFetcher.java?view=markup > > > >This EventFetcher thread is used by the Shuffle (reduce package) > >class, to continually do the shuffling. The Shuffle class is then > >itself used by the ReduceTask class (look in mapred package of same > >maven module). > > > >I guess you can start there, to see if a better selection+scheduling > >logic would yield better results. > > > >On Wed, Nov 7, 2012 at 12:26 PM, Jiwei Li <[EMAIL PROTECTED]> wrote: > >> Dear all, > >> > >> For jobs like Sort, massive amounts of network traffic happen during > >> shuffle phase. The simple mechanism in Hadoop 1.0.4 to choose reduce > >>nodes > >> does not help reduce network traffic. If JobTracker is fully aware of > >> locations of every map output, why not take advantage of this topology > > Jiwei Li
-
Re: Shuffle phase: fine-grained control of data flowRobert Evans 2012-11-08, 16:59
Jiwei,
Ok so you are specifically looking at reducing overall network bandwidth of skewed map outputs, not all map outputs. That would very much mean that #1 and #3 are off base. But as you point out it would only really be performance win if the data fits into memory. It seems like an interesting idea. If the goal is to reduce bandwidth and not improve individual job performance then it seems more plausible. Do you have a benchmark (grid mix run etc) that really taxes the network that you could use to measure the impact such a change would have? Something like this really needs some hard numbers for a proper evaluation. --Bobby Evans On 11/7/12 11:32 PM, "Jiwei Li" <[EMAIL PROTECTED]> wrote: >Hi Bobby, > >Thank you a lot for your suggestions. My whole idea is to minimize the >aggregate network bandwidth during Shuffle Phase, that is, to limit the >hops to minimum when transmitting data from map node to reduce node. >Usually, Partitioner creates skews that the JobTracker allocates different >amounts of map outputs to participating reduce nodes. Making reduce nodes >near map outputs with largest concerned partitions can reduce the >aggregate >network bandwidth. > >For #1, there is no need to schedule map tasks to be close to one another, >since it will only congest links among the cluster. For #2, the location >and size of each partition in each map output can be sent to JobTracker >along with the processing of InputSplit. Collecting enough such >information >(not necessarily waiting map tasks to finish), the JobTracker starts to >schedule reduce tasks to fetch map output data. #3 is the same as #1. > >Now the tricky part is that if all map outputs are spilled to disks, >network bandwidth may not be a bottleneck, because the time consumed in >disk seeks outnumbers that in data transmission. If map outputs fit in >memory, then network must be taken seriously. Also note that for evenly >distributed map outputs, current scheduling policy works just fine. > >Jiwei > > >On Wed, Nov 7, 2012 at 11:45 PM, Robert Evans <[EMAIL PROTECTED]> wrote: > >> Jiwei, >> >> I think you could use that knowledge to launch reducers closer to the >>map >> output, but I am not sure that it would make much difference. It may >>even >> slow things down. It is a question of several things >> >> 1) Can we get enough map tasks close to one another that it will make a >> difference? >> 2) Does the reduced shuffle time offset the overhead of waiting for the >> map location data before launching and fetching data early? >> 3) and do the time savings also offset the overhead of getting the map >> tasks to be close to one another? >> >> For #2 you might be able to deal with this by using speculative >>execution, >> and launching some reduce tasks later if you see a clustering of map >> output. For #1 it will require changes to how we schedule tasks which >> depending on how well it is implemented will impact #3 as well. >> Additionally for #1 any job that approaches the same order of size as >>the >> cluster will almost require the map tasks to be evenly distributed >>around >> the cluster. If you can come up with a patch I would love to see some >> performance numbers. >> >> Personally I think spending time reducing the size of the data sent to >>the >> reducers is a much bigger win. Can you use a combiner? Do you really >>need >> all of the data or can you sample the data to get a statistically >> significant picture of what is in the data? Have you enabled >>compression >> between the maps and the reducers? >> >> --Bobby >> >> On 11/7/12 8:05 AM, "Harsh J" <[EMAIL PROTECTED]> wrote: >> >> >Hi Jiwei, >> > >> >In trunk (i.e. MR2), the completion events selection + scheduling >> >logic lies under class EventFetcher's getMapCompletionEvents() method, >> >as viewable at >> > >> >>http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project >>/ >> >>>hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/a >>>pa >> >che/hadoop/mapreduce/task/reduce/EventFetcher.java?view=markup
-
Re: Shuffle phase: fine-grained control of data flowLuke Lu 2012-11-09, 17:54
For skewed map output, another optimization would be "local direct shuffle"
to further minimize network usage AND disk spillage (on the reducer side), ie. schedule reduce tasks on the same host with the largest map output. __Luke On Thu, Nov 8, 2012 at 8:59 AM, Robert Evans <[EMAIL PROTECTED]> wrote: > Jiwei, > > Ok so you are specifically looking at reducing overall network bandwidth > of skewed map outputs, not all map outputs. That would very much mean > that #1 and #3 are off base. But as you point out it would only really be > performance win if the data fits into memory. It seems like an interesting > idea. If the goal is to reduce bandwidth and not improve individual job > performance then it seems more plausible. Do you have a benchmark (grid > mix run etc) that really taxes the network that you could use to measure > the impact such a change would have? Something like this really needs > some hard numbers for a proper evaluation. > > --Bobby Evans > > On 11/7/12 11:32 PM, "Jiwei Li" <[EMAIL PROTECTED]> wrote: > > >Hi Bobby, > > > >Thank you a lot for your suggestions. My whole idea is to minimize the > >aggregate network bandwidth during Shuffle Phase, that is, to limit the > >hops to minimum when transmitting data from map node to reduce node. > >Usually, Partitioner creates skews that the JobTracker allocates different > >amounts of map outputs to participating reduce nodes. Making reduce nodes > >near map outputs with largest concerned partitions can reduce the > >aggregate > >network bandwidth. > > > >For #1, there is no need to schedule map tasks to be close to one another, > >since it will only congest links among the cluster. For #2, the location > >and size of each partition in each map output can be sent to JobTracker > >along with the processing of InputSplit. Collecting enough such > >information > >(not necessarily waiting map tasks to finish), the JobTracker starts to > >schedule reduce tasks to fetch map output data. #3 is the same as #1. > > > >Now the tricky part is that if all map outputs are spilled to disks, > >network bandwidth may not be a bottleneck, because the time consumed in > >disk seeks outnumbers that in data transmission. If map outputs fit in > >memory, then network must be taken seriously. Also note that for evenly > >distributed map outputs, current scheduling policy works just fine. > > > >Jiwei > > > > > >On Wed, Nov 7, 2012 at 11:45 PM, Robert Evans <[EMAIL PROTECTED]> > wrote: > > > >> Jiwei, > >> > >> I think you could use that knowledge to launch reducers closer to the > >>map > >> output, but I am not sure that it would make much difference. It may > >>even > >> slow things down. It is a question of several things > >> > >> 1) Can we get enough map tasks close to one another that it will make a > >> difference? > >> 2) Does the reduced shuffle time offset the overhead of waiting for the > >> map location data before launching and fetching data early? > >> 3) and do the time savings also offset the overhead of getting the map > >> tasks to be close to one another? > >> > >> For #2 you might be able to deal with this by using speculative > >>execution, > >> and launching some reduce tasks later if you see a clustering of map > >> output. For #1 it will require changes to how we schedule tasks which > >> depending on how well it is implemented will impact #3 as well. > >> Additionally for #1 any job that approaches the same order of size as > >>the > >> cluster will almost require the map tasks to be evenly distributed > >>around > >> the cluster. If you can come up with a patch I would love to see some > >> performance numbers. > >> > >> Personally I think spending time reducing the size of the data sent to > >>the > >> reducers is a much bigger win. Can you use a combiner? Do you really > >>need > >> all of the data or can you sample the data to get a statistically > >> significant picture of what is in the data? Have you enabled > >>compression > >> between the maps and the reducers? |