|
|
bharath vissapragada 2009-08-20, 15:58
Hi all,
Can anyone tell me how the MR scheduler schedule the MR jobs? How does it decide where t create MAP tasks and how many to create. Once the MAP tasks are over how does it decide to move the keys to the reducer efficiently(minimizing the data movement across the network). Is there any doc available which describes this scheduling process quite efficiently
Kindly respond to this mail.
Thanks in advance.
Arun C Murthy 2009-08-20, 18:41
On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote:
> Hi all, > > Can anyone tell me how the MR scheduler schedule the MR jobs? > How does it decide where t create MAP tasks and how many to create. > Once the MAP tasks are over how does it decide to move the keys to the > reducer efficiently(minimizing the data movement across the network). > Is there any doc available which describes this scheduling process > quite > efficiently >
The #maps is decided by the application. The scheduler decides where to execute them.
Once the map is done, the reduce tasks connect to the tasktracker (on the node where the map-task executed) and copies the entire output over http.
Arun
bharath vissapragada 2009-08-21, 04:20
OK i'll be a bit more specific ,
Suppose map outputs 100 different keys .
Consider a key "K" whose correspoding values may be on N diff datanodes. Consider a datanode "D" which have maximum number of values . So instead of moving the values on "D" to other systems , it is useful to bring in the values from other datanodes to "D" to minimize the data movement and also the delay. Similar is the case with All the other keys . How does the scheduler take care of this ? 2009/8/21 zjffdu <[EMAIL PROTECTED]>
> Add some detials: > > 1. #map is determined by the block size and InputFormat (whether you can > want to split or not split) > > 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and > Capacity Scheduler are other two options as I know. JobTracker has the > scheduler. > > 3. Once the map task is done, it will tell its own tasktracker, and the > tasktracker will tell jobtracker, so jobtracker manage all the tasks, and > it > will decide how to and when to start the reduce task > > > > -----Original Message----- > From: Arun C Murthy [mailto:[EMAIL PROTECTED]] > Sent: 2009年8月20日 11:41 > To: [EMAIL PROTECTED] > Subject: Re: MR job scheduler > > > On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: > > > Hi all, > > > > Can anyone tell me how the MR scheduler schedule the MR jobs? > > How does it decide where t create MAP tasks and how many to create. > > Once the MAP tasks are over how does it decide to move the keys to the > > reducer efficiently(minimizing the data movement across the network). > > Is there any doc available which describes this scheduling process > > quite > > efficiently > > > > The #maps is decided by the application. The scheduler decides where > to execute them. > > Once the map is done, the reduce tasks connect to the tasktracker (on > the node where the map-task executed) and copies the entire output > over http. > > Arun > >
Amogh Vasekar 2009-08-21, 06:09
I'm not sure that is the case with Hadoop. I think its assigning reduce task to an available tasktracker at any instant; Since a reducer polls JT for completed maps. And if it were the case as you said, a reducer wont be initialized until all maps have completed , after which copy phase would start.
Thanks, Amogh
-----Original Message----- From: bharath vissapragada [mailto:[EMAIL PROTECTED]] Sent: Friday, August 21, 2009 9:50 AM To: [EMAIL PROTECTED] Subject: Re: MR job scheduler
OK i'll be a bit more specific ,
Suppose map outputs 100 different keys .
Consider a key "K" whose correspoding values may be on N diff datanodes. Consider a datanode "D" which have maximum number of values . So instead of moving the values on "D" to other systems , it is useful to bring in the values from other datanodes to "D" to minimize the data movement and also the delay. Similar is the case with All the other keys . How does the scheduler take care of this ? 2009/8/21 zjffdu <[EMAIL PROTECTED]>
> Add some detials: > > 1. #map is determined by the block size and InputFormat (whether you can > want to split or not split) > > 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and > Capacity Scheduler are other two options as I know. JobTracker has the > scheduler. > > 3. Once the map task is done, it will tell its own tasktracker, and the > tasktracker will tell jobtracker, so jobtracker manage all the tasks, and > it > will decide how to and when to start the reduce task > > > > -----Original Message----- > From: Arun C Murthy [mailto:[EMAIL PROTECTED]] > Sent: 2009年8月20日 11:41 > To: [EMAIL PROTECTED] > Subject: Re: MR job scheduler > > > On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: > > > Hi all, > > > > Can anyone tell me how the MR scheduler schedule the MR jobs? > > How does it decide where t create MAP tasks and how many to create. > > Once the MAP tasks are over how does it decide to move the keys to the > > reducer efficiently(minimizing the data movement across the network). > > Is there any doc available which describes this scheduling process > > quite > > efficiently > > > > The #maps is decided by the application. The scheduler decides where > to execute them. > > Once the map is done, the reduce tasks connect to the tasktracker (on > the node where the map-task executed) and copies the entire output > over http. > > Arun > >
Arun C Murthy 2009-08-21, 06:23
On Aug 20, 2009, at 9:20 PM, bharath vissapragada wrote:
> OK i'll be a bit more specific , > > Suppose map outputs 100 different keys . > > Consider a key "K" whose correspoding values may be on N diff > datanodes. > Consider a datanode "D" which have maximum number of values . So > instead of > moving the values on "D" > to other systems , it is useful to bring in the values from other > datanodes > to "D" to minimize the data movement and > also the delay. Similar is the case with All the other keys . How > does the > scheduler take care of this ?
Map-Reduce doesn't 'bring' values from N datanodes to the map. A map gets a single block of data to work with, N-1 other maps get the other N-1 blocks; thus multiple maps might get the key K and different values. Eventually the output of the maps i.e. K and values <V> land up at one of the reduces (based on the Partitioner). Please read some of the widely available map-reduce literature for more details.
Arun
bharath vissapragada 2009-08-21, 06:29
Amogh
i think Reduce phase starts only when all the map phases are completed . Because it needs all the values corresponding to a particular key!
2009/8/21 Amogh Vasekar <[EMAIL PROTECTED]>
> I'm not sure that is the case with Hadoop. I think its assigning reduce > task to an available tasktracker at any instant; Since a reducer polls JT > for completed maps. And if it were the case as you said, a reducer wont be > initialized until all maps have completed , after which copy phase would > start. > > Thanks, > Amogh > > -----Original Message----- > From: bharath vissapragada [mailto:[EMAIL PROTECTED]] > Sent: Friday, August 21, 2009 9:50 AM > To: [EMAIL PROTECTED] > Subject: Re: MR job scheduler > > OK i'll be a bit more specific , > > Suppose map outputs 100 different keys . > > Consider a key "K" whose correspoding values may be on N diff datanodes. > Consider a datanode "D" which have maximum number of values . So instead of > moving the values on "D" > to other systems , it is useful to bring in the values from other datanodes > to "D" to minimize the data movement and > also the delay. Similar is the case with All the other keys . How does the > scheduler take care of this ? > 2009/8/21 zjffdu <[EMAIL PROTECTED]> > > > Add some detials: > > > > 1. #map is determined by the block size and InputFormat (whether you can > > want to split or not split) > > > > 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and > > Capacity Scheduler are other two options as I know. JobTracker has the > > scheduler. > > > > 3. Once the map task is done, it will tell its own tasktracker, and the > > tasktracker will tell jobtracker, so jobtracker manage all the tasks, and > > it > > will decide how to and when to start the reduce task > > > > > > > > -----Original Message----- > > From: Arun C Murthy [mailto:[EMAIL PROTECTED]] > > Sent: 2009年8月20日 11:41 > > To: [EMAIL PROTECTED] > > Subject: Re: MR job scheduler > > > > > > On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: > > > > > Hi all, > > > > > > Can anyone tell me how the MR scheduler schedule the MR jobs? > > > How does it decide where t create MAP tasks and how many to create. > > > Once the MAP tasks are over how does it decide to move the keys to the > > > reducer efficiently(minimizing the data movement across the network). > > > Is there any doc available which describes this scheduling process > > > quite > > > efficiently > > > > > > > The #maps is decided by the application. The scheduler decides where > > to execute them. > > > > Once the map is done, the reduce tasks connect to the tasktracker (on > > the node where the map-task executed) and copies the entire output > > over http. > > > > Arun > > > > >
bharath vissapragada 2009-08-21, 06:33
Arun
iam not talkin about the map phase . Iam talking abt the reduce phase which starts after the map gets finished
The Key "K" iam referring to in my example is one of the distinct keys wch map outputs. and its corresponding values may be on any system depending on where the map phase gets executed. In order to start the reduce phase on a machine it has to copy all the values corresponding to a particular key over http. Iam talking abt the way it done . In that sense am i right?
On Fri, Aug 21, 2009 at 11:53 AM, Arun C Murthy <[EMAIL PROTECTED]> wrote:
> > On Aug 20, 2009, at 9:20 PM, bharath vissapragada wrote: > > OK i'll be a bit more specific , >> >> Suppose map outputs 100 different keys . >> >> Consider a key "K" whose correspoding values may be on N diff datanodes. >> Consider a datanode "D" which have maximum number of values . So instead >> of >> moving the values on "D" >> to other systems , it is useful to bring in the values from other >> datanodes >> to "D" to minimize the data movement and >> also the delay. Similar is the case with All the other keys . How does the >> scheduler take care of this ? >> > > Map-Reduce doesn't 'bring' values from N datanodes to the map. A map gets a > single block of data to work with, N-1 other maps get the other N-1 blocks; > thus multiple maps might get the key K and different values. Eventually the > output of the maps i.e. K and values <V> land up at one of the reduces > (based on the Partitioner). Please read some of the widely available > map-reduce literature for more details. > > Arun > >
Amogh Vasekar 2009-08-21, 06:35
Yes, but the copy phase starts with the initialization for a reducer, after which it would keep polling for completed map tasks to fetch the respective outputs.
-----Original Message----- From: bharath vissapragada [mailto:[EMAIL PROTECTED]] Sent: Friday, August 21, 2009 12:00 PM To: [EMAIL PROTECTED] Subject: Re: MR job scheduler
Amogh
i think Reduce phase starts only when all the map phases are completed . Because it needs all the values corresponding to a particular key!
2009/8/21 Amogh Vasekar <[EMAIL PROTECTED]>
> I'm not sure that is the case with Hadoop. I think its assigning reduce > task to an available tasktracker at any instant; Since a reducer polls JT > for completed maps. And if it were the case as you said, a reducer wont be > initialized until all maps have completed , after which copy phase would > start. > > Thanks, > Amogh > > -----Original Message----- > From: bharath vissapragada [mailto:[EMAIL PROTECTED]] > Sent: Friday, August 21, 2009 9:50 AM > To: [EMAIL PROTECTED] > Subject: Re: MR job scheduler > > OK i'll be a bit more specific , > > Suppose map outputs 100 different keys . > > Consider a key "K" whose correspoding values may be on N diff datanodes. > Consider a datanode "D" which have maximum number of values . So instead of > moving the values on "D" > to other systems , it is useful to bring in the values from other datanodes > to "D" to minimize the data movement and > also the delay. Similar is the case with All the other keys . How does the > scheduler take care of this ? > 2009/8/21 zjffdu <[EMAIL PROTECTED]> > > > Add some detials: > > > > 1. #map is determined by the block size and InputFormat (whether you can > > want to split or not split) > > > > 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and > > Capacity Scheduler are other two options as I know. JobTracker has the > > scheduler. > > > > 3. Once the map task is done, it will tell its own tasktracker, and the > > tasktracker will tell jobtracker, so jobtracker manage all the tasks, and > > it > > will decide how to and when to start the reduce task > > > > > > > > -----Original Message----- > > From: Arun C Murthy [mailto:[EMAIL PROTECTED]] > > Sent: 2009年8月20日 11:41 > > To: [EMAIL PROTECTED] > > Subject: Re: MR job scheduler > > > > > > On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: > > > > > Hi all, > > > > > > Can anyone tell me how the MR scheduler schedule the MR jobs? > > > How does it decide where t create MAP tasks and how many to create. > > > Once the MAP tasks are over how does it decide to move the keys to the > > > reducer efficiently(minimizing the data movement across the network). > > > Is there any doc available which describes this scheduling process > > > quite > > > efficiently > > > > > > > The #maps is decided by the application. The scheduler decides where > > to execute them. > > > > Once the map is done, the reduce tasks connect to the tasktracker (on > > the node where the map-task executed) and copies the entire output > > over http. > > > > Arun > > > > >
bharath vissapragada 2009-08-21, 06:41
Yes , My doubt is that how is the location of the reducer selected . Is it selected arbitrarily or is selected on a particular machine which has already the more values (corresponding to the key of that reducer) which reduces the cost of transferring data across the network(because already many values to that key are on that machine where the map phase completed)..
2009/8/21 Amogh Vasekar <[EMAIL PROTECTED]>
> Yes, but the copy phase starts with the initialization for a reducer, after > which it would keep polling for completed map tasks to fetch the respective > outputs. > > -----Original Message----- > From: bharath vissapragada [mailto:[EMAIL PROTECTED]] > Sent: Friday, August 21, 2009 12:00 PM > To: [EMAIL PROTECTED] > Subject: Re: MR job scheduler > > Amogh > > i think Reduce phase starts only when all the map phases are completed . > Because it needs all the values corresponding to a particular key! > > 2009/8/21 Amogh Vasekar <[EMAIL PROTECTED]> > > > I'm not sure that is the case with Hadoop. I think its assigning reduce > > task to an available tasktracker at any instant; Since a reducer polls JT > > for completed maps. And if it were the case as you said, a reducer wont > be > > initialized until all maps have completed , after which copy phase would > > start. > > > > Thanks, > > Amogh > > > > -----Original Message----- > > From: bharath vissapragada [mailto:[EMAIL PROTECTED]] > > Sent: Friday, August 21, 2009 9:50 AM > > To: [EMAIL PROTECTED] > > Subject: Re: MR job scheduler > > > > OK i'll be a bit more specific , > > > > Suppose map outputs 100 different keys . > > > > Consider a key "K" whose correspoding values may be on N diff datanodes. > > Consider a datanode "D" which have maximum number of values . So instead > of > > moving the values on "D" > > to other systems , it is useful to bring in the values from other > datanodes > > to "D" to minimize the data movement and > > also the delay. Similar is the case with All the other keys . How does > the > > scheduler take care of this ? > > 2009/8/21 zjffdu <[EMAIL PROTECTED]> > > > > > Add some detials: > > > > > > 1. #map is determined by the block size and InputFormat (whether you > can > > > want to split or not split) > > > > > > 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and > > > Capacity Scheduler are other two options as I know. JobTracker has the > > > scheduler. > > > > > > 3. Once the map task is done, it will tell its own tasktracker, and the > > > tasktracker will tell jobtracker, so jobtracker manage all the tasks, > and > > > it > > > will decide how to and when to start the reduce task > > > > > > > > > > > > -----Original Message----- > > > From: Arun C Murthy [mailto:[EMAIL PROTECTED]] > > > Sent: 2009年8月20日 11:41 > > > To: [EMAIL PROTECTED] > > > Subject: Re: MR job scheduler > > > > > > > > > On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: > > > > > > > Hi all, > > > > > > > > Can anyone tell me how the MR scheduler schedule the MR jobs? > > > > How does it decide where t create MAP tasks and how many to create. > > > > Once the MAP tasks are over how does it decide to move the keys to > the > > > > reducer efficiently(minimizing the data movement across the network). > > > > Is there any doc available which describes this scheduling process > > > > quite > > > > efficiently > > > > > > > > > > The #maps is decided by the application. The scheduler decides where > > > to execute them. > > > > > > Once the map is done, the reduce tasks connect to the tasktracker (on > > > the node where the map-task executed) and copies the entire output > > > over http. > > > > > > Arun > > > > > > > > >
Harish Mallipeddi 2009-08-21, 08:13
On Fri, Aug 21, 2009 at 12:11 PM, bharath vissapragada < [EMAIL PROTECTED]> wrote: > Yes , My doubt is that how is the location of the reducer selected . Is it > selected arbitrarily or is selected on a particular machine which has > already the more values (corresponding to the key of that reducer) which > reduces the cost of transferring data across the network(because already > many values to that key are on that machine where the map phase > completed).. > I think what you're asking for is whether a ReduceTask is scheduled on a node which has the largest partition among all the mapoutput partitions (p1-pN) that the ReduceTask has to fetch in order to do its job. The answer is "no" - the ReduceTasks are assigned arbitrarily (no such optimization is done and I think this can really be an optimization only if 1 of your partitions is heavily skewed for some reason). Also as Amogh pointed out, the ReduceTasks start fetching their mapoutput-partitions (shuffle phase) as and when they hear about completed ones. So it would not be possible to schedule ReduceTasks only on nodes with the largest partitions. -- Harish Mallipeddi http://blog.poundbang.in
Amogh Vasekar 2009-08-21, 09:20
Let me rephrase,
1. Copy phase starts after reducer initialization, which happens before all maps have completed. 2. Which mapper has maximum values for a particular key wont be known until all mappers have completed ( to be more precise, until a particular percentage of running mappers is completed as we have the "current" maximum value mapper). Also, there is no rule which says one record can go to only one reducer.
Thanks, Amogh
-----Original Message----- From: bharath vissapragada [mailto:[EMAIL PROTECTED]] Sent: Friday, August 21, 2009 12:12 PM To: [EMAIL PROTECTED] Subject: Re: MR job scheduler
Yes , My doubt is that how is the location of the reducer selected . Is it selected arbitrarily or is selected on a particular machine which has already the more values (corresponding to the key of that reducer) which reduces the cost of transferring data across the network(because already many values to that key are on that machine where the map phase completed)..
2009/8/21 Amogh Vasekar <[EMAIL PROTECTED]>
> Yes, but the copy phase starts with the initialization for a reducer, after > which it would keep polling for completed map tasks to fetch the respective > outputs. > > -----Original Message----- > From: bharath vissapragada [mailto:[EMAIL PROTECTED]] > Sent: Friday, August 21, 2009 12:00 PM > To: [EMAIL PROTECTED] > Subject: Re: MR job scheduler > > Amogh > > i think Reduce phase starts only when all the map phases are completed . > Because it needs all the values corresponding to a particular key! > > 2009/8/21 Amogh Vasekar <[EMAIL PROTECTED]> > > > I'm not sure that is the case with Hadoop. I think its assigning reduce > > task to an available tasktracker at any instant; Since a reducer polls JT > > for completed maps. And if it were the case as you said, a reducer wont > be > > initialized until all maps have completed , after which copy phase would > > start. > > > > Thanks, > > Amogh > > > > -----Original Message----- > > From: bharath vissapragada [mailto:[EMAIL PROTECTED]] > > Sent: Friday, August 21, 2009 9:50 AM > > To: [EMAIL PROTECTED] > > Subject: Re: MR job scheduler > > > > OK i'll be a bit more specific , > > > > Suppose map outputs 100 different keys . > > > > Consider a key "K" whose correspoding values may be on N diff datanodes. > > Consider a datanode "D" which have maximum number of values . So instead > of > > moving the values on "D" > > to other systems , it is useful to bring in the values from other > datanodes > > to "D" to minimize the data movement and > > also the delay. Similar is the case with All the other keys . How does > the > > scheduler take care of this ? > > 2009/8/21 zjffdu <[EMAIL PROTECTED]> > > > > > Add some detials: > > > > > > 1. #map is determined by the block size and InputFormat (whether you > can > > > want to split or not split) > > > > > > 2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and > > > Capacity Scheduler are other two options as I know. JobTracker has the > > > scheduler. > > > > > > 3. Once the map task is done, it will tell its own tasktracker, and the > > > tasktracker will tell jobtracker, so jobtracker manage all the tasks, > and > > > it > > > will decide how to and when to start the reduce task > > > > > > > > > > > > -----Original Message----- > > > From: Arun C Murthy [mailto:[EMAIL PROTECTED]] > > > Sent: 2009年8月20日 11:41 > > > To: [EMAIL PROTECTED] > > > Subject: Re: MR job scheduler > > > > > > > > > On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote: > > > > > > > Hi all, > > > > > > > > Can anyone tell me how the MR scheduler schedule the MR jobs? > > > > How does it decide where t create MAP tasks and how many to create. > > > > Once the MAP tasks are over how does it decide to move the keys to > the > > > > reducer efficiently(minimizing the data movement across the network). > > > > Is there any doc available which describes this scheduling process
Add some detials:
1. #map is determined by the block size and InputFormat (whether you can want to split or not split)
2. The default scheduler for Hadoop is FIFO, and the Fair Scheduler and Capacity Scheduler are other two options as I know. JobTracker has the scheduler.
3. Once the map task is done, it will tell its own tasktracker, and the tasktracker will tell jobtracker, so jobtracker manage all the tasks, and it will decide how to and when to start the reduce task
-----Original Message----- From: Arun C Murthy [mailto:[EMAIL PROTECTED]] Sent: 2009年8月20日 11:41 To: [EMAIL PROTECTED] Subject: Re: MR job scheduler On Aug 20, 2009, at 9:00 AM, bharath vissapragada wrote:
> Hi all, > > Can anyone tell me how the MR scheduler schedule the MR jobs? > How does it decide where t create MAP tasks and how many to create. > Once the MAP tasks are over how does it decide to move the keys to the > reducer efficiently(minimizing the data movement across the network). > Is there any doc available which describes this scheduling process > quite > efficiently >
The #maps is decided by the application. The scheduler decides where to execute them.
Once the map is done, the reduce tasks connect to the tasktracker (on the node where the map-task executed) and copies the entire output over http.
Arun
bharath vissapragada 2009-08-21, 16:57
I discussed the same doubt in Hbase forums .. Iam pasting the reply i got (for those who aren't subscribed to that list) Regarding optimizing the reduce phase(similar to what harish was pointing out) I got the following reply .. frm Ryan "I think people are confused about how optimal map reduces have to be. Keeping all the data super-local on each machine is not always helping you, since you have to read via a socket anyways. Going remote doesn't actually make things that much slower, since on a modern lan ping times are < 0.1ms. If your entire cluster is hanging off a single switch, there is nearly unlimited bandwidth between all nodes (certainly much higher than any single system could push). Only once you go multi-switch then switch-locality (aka rack locality) becomes important. Remember, hadoop isn't about the instantaneous speed of any job, but about running jobs in a highly scalable manner that works on tens or tens of thousands of nodes. You end up blocking on single machine limits anyways, and the r=3 of HDFS helps you transcend a single machine read speed for large files. Keeping the data transfer local in this case results in lower performance." Just FYI! Thanks On Fri, Aug 21, 2009 at 1:43 PM, Harish Mallipeddi < [EMAIL PROTECTED]> wrote: > On Fri, Aug 21, 2009 at 12:11 PM, bharath vissapragada < > [EMAIL PROTECTED]> wrote: > > > Yes , My doubt is that how is the location of the reducer selected . Is > it > > selected arbitrarily or is selected on a particular machine which has > > already the more values (corresponding to the key of that reducer) which > > reduces the cost of transferring data across the network(because already > > many values to that key are on that machine where the map phase > > completed).. > > > I discussed the same issue on hbase forums and one of its developers answered my questi > > I think what you're asking for is whether a ReduceTask is scheduled on a > node which has the largest partition among all the mapoutput partitions > (p1-pN) that the ReduceTask has to fetch in order to do its job. The answer > is "no" - the ReduceTasks are assigned arbitrarily (no such optimization is > done and I think this can really be an optimization only if 1 of your > partitions is heavily skewed for some reason). Also as Amogh pointed out, > the ReduceTasks start fetching their mapoutput-partitions (shuffle phase) > as > and when they hear about completed ones. So it would not be possible to > schedule ReduceTasks only on nodes with the largest partitions. > > -- > Harish Mallipeddi > http://blog.poundbang.in>
|
|