|
|
-
Combining MultithreadedMapper threadpool size & map.tasks.maximum
Rob Stewart 2012-02-10, 12:25
I'm looking to clarify the relationship between MultithreadedMapper.setNumberOfThreads(i) and mapreduce.tasktracker.map.tasks.maximum .
If I set: - MultithreadedMapper.setNumberOfThreads( 4 ) - mapreduce.tasktracker.map.tasks.maximum = 1
Will 4 map tasks be executed in four separate threads within one JVM ? Or are the number of threads also restricted by the map.tasks.maximum parameter?
What about if I set: - MultithreadedMapper.setNumberOfThreads( 4 ) - mapreduce.tasktracker.map.tasks.maximum = 4
Will this mean that 4 map tasks are executed in 4 threads in one JVM, or will it mean that 4 JVMs be instantiated, each executing 4 map tasks in individual threads?
thanks,
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Harsh J 2012-02-10, 12:42
Hi Rob, On Fri, Feb 10, 2012 at 5:55 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: > I'm looking to clarify the relationship between > MultithreadedMapper.setNumberOfThreads(i) and > mapreduce.tasktracker.map.tasks.maximum . The former is an in-user-application value that controls the total number of threads to run for map() calls (inside a mapper). This is _inside_ one JVM (a task, in hadoop terms, is one complete JVM running user code). The latter controls, at a TaskTracker level, the max total number of map-task JVMs that it can run concurrently at any given time. > What about if I set: > - MultithreadedMapper.setNumberOfThreads( 4 ) > - mapreduce.tasktracker.map.tasks.maximum = 4 > > Will this mean that 4 map tasks are executed in 4 threads in one JVM, > or will it mean that 4 JVMs be instantiated, each executing 4 map > tasks in individual threads? 4 JVMs if you have 4 tasks in your Job (# of map tasks of a job is dependent on its input). Each JVM will then run the MultithreadedMapper code, which will then run 4 threads to call your map() inside of it cause you've asked that of it. -- Harsh J Customer Ops. Engineer Cloudera | http://tiny.cloudera.com/about
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Rob Stewart 2012-02-10, 13:02
hi Harsh,
On 10 February 2012 12:42, Harsh J <[EMAIL PROTECTED]> wrote:
> 4 JVMs if you have 4 tasks in your Job (# of map tasks of a job is > dependent on its input). > > Each JVM will then run the MultithreadedMapper code, which will then > run 4 threads to call your map() inside of it cause you've asked that > of it.
So.. the MultithreadedMapper class splits *one* map task into N number of threads? How is this achieved? I wasn't aware that a map task could be implicitly sub-divided implicitly? I was under the (false?) impression that the purpose of a MultithreadedMapper enabled the opportunity to send N number of independent map tasks to be forked as threads. ?
Also, from what you say.. if you have map.tasks.maximum = 4 and setNumberOfThreads(4), then in all, for each compute node, up to 16 threads could be forked at any one time?
I'm trying to identify the performance penalty or performance benefit of achieving node concurrency with threads, rather than multiple JVMs. I and I was hoping that setting map.tasks.maximum = 1, and setNumberOfThreads( #cores ), I would achieve that. Maybe not?
thanks,
-- Rob
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Harsh J 2012-02-10, 13:33
Rob, On Fri, Feb 10, 2012 at 6:32 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: > So.. the MultithreadedMapper class splits *one* map task into N number > of threads? How is this achieved? I wasn't aware that a map task could > be implicitly sub-divided implicitly? I was under the (false?) > impression that the purpose of a MultithreadedMapper enabled the > opportunity to send N number of independent map tasks to be forked as > threads. ? Imagine writing your own Mapper code that runs threads to do some processing when beginning the map() process. MultithreadedMapper is just an abstraction of something like that, provided for developer convenience. It makes no relationship with task, task scheduling, or any other thing higher up in the framework. Does that make it clear? > Also, from what you say.. if you have map.tasks.maximum = 4 and > setNumberOfThreads(4), then in all, for each compute node, up to 16 > threads could be forked at any one time? Yeah you'd be running, at maximum, 4 JVMs, each with 4 threads inside it. > I'm trying to identify the performance penalty or performance benefit > of achieving node concurrency with threads, rather than multiple JVMs. > I and I was hoping that setting map.tasks.maximum = 1, and > setNumberOfThreads( #cores ), I would achieve that. Maybe not? What you're missing to see here is that the multithreaded mapper is something that runs as part of one single map task. Each map task has a defined input split from which it reads off keys and values to map() calls. With just one JVM slot, you'd end up processing only one input-chunk at a time, though with 4 threads doing map() computation, while with four slots, you may be processing 4 input-chunks (4 tasks) at the same time. The choice between the two has to be application-sensitive. If your work were IO intensive, the slot approach would win at parallelism. Using single slot with 4 threads when the map() computation is cheap would be a waste of time you could instead do more IO with parallel tasks. But if your work were more CPU intensive, where each map() may take a long time to run before moving to next, then MTMapper with a set amount of threads may make more sense to use. -- Harsh J Customer Ops. Engineer Cloudera | http://tiny.cloudera.com/about
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Rob Stewart 2012-02-10, 14:01
Harsh,
On 10 February 2012 13:33, Harsh J <[EMAIL PROTECTED]> wrote:
> What you're missing to see here is that the multithreaded mapper is > something that runs as part of one single map task.
> With just one JVM slot, you'd end up processing only one input-chunk > at a time, though with 4 threads doing map() computation, while with > four slots, you may be processing 4 input-chunks (4 tasks) at the same > time. The choice between the two has to be application-sensitive.
OK, take word count. The <k,v> to the map is <null,"foo bar lambda beta">. The canonical Hadoop program would tokenize this line of text and output <"foo",1> and so on. How would the multithreadedmapper know how to further divide this line of text into, say: [<null,"foo bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you somehow provide an additional record reader to split the input to the map task into sub-inputs for each thread?
> If your work were IO intensive, the slot approach would win at > parallelism.
Are you saying here that 4 single-threaded OS processes can achieve a higher rate of OS IO, than 4 threads within one OS process doing IO (which would sound sensible if that's the case).
> Using single slot with 4 threads when the map() > computation is cheap would be a waste of time you could instead do > more IO with parallel tasks.
The argument against this approach is that the cost starting up OS processes is far more expensive that forking threads within processes. So I would have said the contrary - where map tasks are small and input size is large, than many JVMs would be instantiated throughout the system, one per task. Instead, one might speculate that reducing the number of JVMs, replacing with lower latency thread forking would improve runtime speeds. ?
> But if your work were more CPU intensive, where each map() may take a > long time to run before moving to next, then MTMapper with a set > amount of threads may make more sense to use.
OK, so are you saying: - For CPU intensive tasks, multiple threads might help - For IO intensive tasks, multiple OS processes achieve higher throughput than multiple threads within a smaller number of OS processes?
Thanks,
-- Rob
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Harsh J 2012-02-10, 14:20
Hello again, On Fri, Feb 10, 2012 at 7:31 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: > OK, take word count. The <k,v> to the map is <null,"foo bar lambda > beta">. The canonical Hadoop program would tokenize this line of text > and output <"foo",1> and so on. How would the multithreadedmapper know > how to further divide this line of text into, say: [<null,"foo > bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you > somehow provide an additional record reader to split the input to the > map task into sub-inputs for each thread? In MultithreadedMapper, the IO work is still single threaded, while the map() calling post-read is multithreaded. But yes you could use a mix of CombineFileInputFormat and some custom logic to have multiple local splits per map task, and divide readers of them among your threads. But why do all this when thats what slots at the TT are for? The cost of a single map task failure with your mammoth task approach would also be higher - more work to repeat. > Are you saying here that 4 single-threaded OS processes can achieve a > higher rate of OS IO, than 4 threads within one OS process doing IO > (which would sound sensible if that's the case). Yeah thats what I meant, but with the earlier point of "In MultithreadedMapper, the IO work is still single threaded" specifically in mind. > The argument against this approach is that the cost starting up OS > processes is far more expensive that forking threads within processes. > So I would have said the contrary - where map tasks are small and > input size is large, than many JVMs would be instantiated throughout > the system, one per task. Instead, one might speculate that reducing > the number of JVMs, replacing with lower latency thread forking would > improve runtime speeds. ? Agreed here. The JVM startup overhead does exist but I wouldn't think its too high a cost overall, given the simple benefits it can provide instead. There is also JVM reuse which makes sense to use for CPU intensive applications, so you can take advantage of the HotSpot features of the JVM as it gets reused for running tasks of the same job. > OK, so are you saying: > - For CPU intensive tasks, multiple threads might help > - For IO intensive tasks, multiple OS processes achieve higher > throughput than multiple threads within a smaller number of OS > processes? Yep, but also if you limit your total slots to 1 in favor of going all for multi-threading, you won't be able to smoothly run multiple jobs at the same time. Tasks from new jobs may have to wait longer to run, while in regular slotted environments this is easier to achieve. -- Harsh J Customer Ops. Engineer Cloudera | http://tiny.cloudera.com/about
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Rob Stewart 2012-02-10, 18:30
Harsh... Oddly, this blog post has appeared within the last hour or so.... http://kickstarthadoop.blogspot.com/2012/02/enable-multiple-threads-in-mapper-aka.html-- Rob On 10 February 2012 14:20, Harsh J <[EMAIL PROTECTED]> wrote: > Hello again, > > On Fri, Feb 10, 2012 at 7:31 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: >> OK, take word count. The <k,v> to the map is <null,"foo bar lambda >> beta">. The canonical Hadoop program would tokenize this line of text >> and output <"foo",1> and so on. How would the multithreadedmapper know >> how to further divide this line of text into, say: [<null,"foo >> bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you >> somehow provide an additional record reader to split the input to the >> map task into sub-inputs for each thread? > > In MultithreadedMapper, the IO work is still single threaded, while > the map() calling post-read is multithreaded. But yes you could use a > mix of CombineFileInputFormat and some custom logic to have multiple > local splits per map task, and divide readers of them among your > threads. But why do all this when thats what slots at the TT are for? > The cost of a single map task failure with your mammoth task approach > would also be higher - more work to repeat. > >> Are you saying here that 4 single-threaded OS processes can achieve a >> higher rate of OS IO, than 4 threads within one OS process doing IO >> (which would sound sensible if that's the case). > > Yeah thats what I meant, but with the earlier point of "In > MultithreadedMapper, the IO work is still single threaded" > specifically in mind. > >> The argument against this approach is that the cost starting up OS >> processes is far more expensive that forking threads within processes. >> So I would have said the contrary - where map tasks are small and >> input size is large, than many JVMs would be instantiated throughout >> the system, one per task. Instead, one might speculate that reducing >> the number of JVMs, replacing with lower latency thread forking would >> improve runtime speeds. ? > > Agreed here. > > The JVM startup overhead does exist but I wouldn't think its too high > a cost overall, given the simple benefits it can provide instead. > There is also JVM reuse which makes sense to use for CPU intensive > applications, so you can take advantage of the HotSpot features of the > JVM as it gets reused for running tasks of the same job. > >> OK, so are you saying: >> - For CPU intensive tasks, multiple threads might help >> - For IO intensive tasks, multiple OS processes achieve higher >> throughput than multiple threads within a smaller number of OS >> processes? > > Yep, but also if you limit your total slots to 1 in favor of going all > for multi-threading, you won't be able to smoothly run multiple jobs > at the same time. Tasks from new jobs may have to wait longer to run, > while in regular slotted environments this is easier to achieve. > > -- > Harsh J > Customer Ops. Engineer > Cloudera | http://tiny.cloudera.com/about
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Rob Stewart 2012-02-10, 18:39
Thanks, this is a lot clearer. One final question...
On 10 February 2012 14:20, Harsh J <[EMAIL PROTECTED]> wrote: > Hello again, > > On Fri, Feb 10, 2012 at 7:31 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: >> OK, take word count. The <k,v> to the map is <null,"foo bar lambda >> beta">. The canonical Hadoop program would tokenize this line of text >> and output <"foo",1> and so on. How would the multithreadedmapper know >> how to further divide this line of text into, say: [<null,"foo >> bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you >> somehow provide an additional record reader to split the input to the >> map task into sub-inputs for each thread? > > In MultithreadedMapper, the IO work is still single threaded, while > the map() calling post-read is multithreaded. But yes you could use a > mix of CombineFileInputFormat and some custom logic to have multiple > local splits per map task, and divide readers of them among your > threads. But why do all this when thats what slots at the TT are for?
I'm still unsure how the multi-threaded mapper knows how to split the input value into chunks, one chunk for each thread. There is only one example in the Hadoop 0.23 trunk that offers an example: hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java
And in that source code, there is no custom logic for local splits per map task at all. Again, going back to the word count example. Given a line of text as input to a map, which comprises of 6 words. I specificy .setNumberOfThreads( 2 ), so ideally, I'd want 3 words analysed by one thread, and the 3 to the other. Is what what would happen? i.e. - I'm unsure whether the multithreadedmapper class does the splitting of inputs to map tasks...
Regards,
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
bejoy.hadoop@... 2012-02-10, 19:00
Hi Rob I'm the culprit who posted the blog. :) The topic was of my interest as well and I found the conversation informative and useful. Just thought of documenting the same as it could be useful for others as well in future. Hope you don't mind!.. Regards Bejoy K S From handheld, Please excuse typos. -----Original Message----- From: Rob Stewart <[EMAIL PROTECTED]> Date: Fri, 10 Feb 2012 18:30:53 To: <[EMAIL PROTECTED]> Reply-To: [EMAIL PROTECTED] Subject: Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum Harsh... Oddly, this blog post has appeared within the last hour or so.... http://kickstarthadoop.blogspot.com/2012/02/enable-multiple-threads-in-mapper-aka.html-- Rob On 10 February 2012 14:20, Harsh J <[EMAIL PROTECTED]> wrote: > Hello again, > > On Fri, Feb 10, 2012 at 7:31 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: >> OK, take word count. The <k,v> to the map is <null,"foo bar lambda >> beta">. The canonical Hadoop program would tokenize this line of text >> and output <"foo",1> and so on. How would the multithreadedmapper know >> how to further divide this line of text into, say: [<null,"foo >> bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you >> somehow provide an additional record reader to split the input to the >> map task into sub-inputs for each thread? > > In MultithreadedMapper, the IO work is still single threaded, while > the map() calling post-read is multithreaded. But yes you could use a > mix of CombineFileInputFormat and some custom logic to have multiple > local splits per map task, and divide readers of them among your > threads. But why do all this when thats what slots at the TT are for? > The cost of a single map task failure with your mammoth task approach > would also be higher - more work to repeat. > >> Are you saying here that 4 single-threaded OS processes can achieve a >> higher rate of OS IO, than 4 threads within one OS process doing IO >> (which would sound sensible if that's the case). > > Yeah thats what I meant, but with the earlier point of "In > MultithreadedMapper, the IO work is still single threaded" > specifically in mind. > >> The argument against this approach is that the cost starting up OS >> processes is far more expensive that forking threads within processes. >> So I would have said the contrary - where map tasks are small and >> input size is large, than many JVMs would be instantiated throughout >> the system, one per task. Instead, one might speculate that reducing >> the number of JVMs, replacing with lower latency thread forking would >> improve runtime speeds. ? > > Agreed here. > > The JVM startup overhead does exist but I wouldn't think its too high > a cost overall, given the simple benefits it can provide instead. > There is also JVM reuse which makes sense to use for CPU intensive > applications, so you can take advantage of the HotSpot features of the > JVM as it gets reused for running tasks of the same job. > >> OK, so are you saying: >> - For CPU intensive tasks, multiple threads might help >> - For IO intensive tasks, multiple OS processes achieve higher >> throughput than multiple threads within a smaller number of OS >> processes? > > Yep, but also if you limit your total slots to 1 in favor of going all > for multi-threading, you won't be able to smoothly run multiple jobs > at the same time. Tasks from new jobs may have to wait longer to run, > while in regular slotted environments this is easier to achieve. > > -- > Harsh J > Customer Ops. Engineer > Cloudera | http://tiny.cloudera.com/about
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
bejoy.hadoop@... 2012-02-10, 19:15
Hi Rob I'd try to answer this. From my understanding if you are using Multithreaded mapper on word count example with TextInputFormat and imagine you have 2 threads and 2 lines in your input split . RecordReader would read Line 1 and give it to map thread 1 and line 2 to map thread 2. So kind of identical process as defined would be happening with these two lines in parallel. This would be the default behavior. Regards Bejoy K S
From handheld, Please excuse typos.
-----Original Message----- From: Rob Stewart <[EMAIL PROTECTED]> Date: Fri, 10 Feb 2012 18:39:44 To: <[EMAIL PROTECTED]> Reply-To: [EMAIL PROTECTED] Subject: Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Thanks, this is a lot clearer. One final question...
On 10 February 2012 14:20, Harsh J <[EMAIL PROTECTED]> wrote: > Hello again, > > On Fri, Feb 10, 2012 at 7:31 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: >> OK, take word count. The <k,v> to the map is <null,"foo bar lambda >> beta">. The canonical Hadoop program would tokenize this line of text >> and output <"foo",1> and so on. How would the multithreadedmapper know >> how to further divide this line of text into, say: [<null,"foo >> bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you >> somehow provide an additional record reader to split the input to the >> map task into sub-inputs for each thread? > > In MultithreadedMapper, the IO work is still single threaded, while > the map() calling post-read is multithreaded. But yes you could use a > mix of CombineFileInputFormat and some custom logic to have multiple > local splits per map task, and divide readers of them among your > threads. But why do all this when thats what slots at the TT are for?
I'm still unsure how the multi-threaded mapper knows how to split the input value into chunks, one chunk for each thread. There is only one example in the Hadoop 0.23 trunk that offers an example: hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java
And in that source code, there is no custom logic for local splits per map task at all. Again, going back to the word count example. Given a line of text as input to a map, which comprises of 6 words. I specificy .setNumberOfThreads( 2 ), so ideally, I'd want 3 words analysed by one thread, and the 3 to the other. Is what what would happen? i.e. - I'm unsure whether the multithreadedmapper class does the splitting of inputs to map tasks...
Regards,
-
Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum
Raj Vishwanathan 2012-02-10, 22:39
Here is what I understand
The RecordReader for the MTMappert takes the input split and cycles the records among the available threads. It also ensures that the map outputs are synchronized.
So what Bejoy says is what will happen for the wordcount program.
Raj
>>________________________________ > From: "[EMAIL PROTECTED]" <[EMAIL PROTECTED]> >To: [EMAIL PROTECTED] >Sent: Friday, February 10, 2012 11:15 AM >Subject: Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum > >Hi Rob > I'd try to answer this. From my understanding if you are using Multithreaded mapper on word count example with TextInputFormat and imagine you have 2 threads and 2 lines in your input split . RecordReader would read Line 1 and give it to map thread 1 and line 2 to map thread 2. So kind of identical process as defined would be happening with these two lines in parallel. This would be the default behavior. >Regards >Bejoy K S > >From handheld, Please excuse typos. > >-----Original Message----- >From: Rob Stewart <[EMAIL PROTECTED]> >Date: Fri, 10 Feb 2012 18:39:44 >To: <[EMAIL PROTECTED]> >Reply-To: [EMAIL PROTECTED] >Subject: Re: Combining MultithreadedMapper threadpool size & map.tasks.maximum > >Thanks, this is a lot clearer. One final question... > >On 10 February 2012 14:20, Harsh J <[EMAIL PROTECTED]> wrote: >> Hello again, >> >> On Fri, Feb 10, 2012 at 7:31 PM, Rob Stewart <[EMAIL PROTECTED]> wrote: >>> OK, take word count. The <k,v> to the map is <null,"foo bar lambda >>> beta">. The canonical Hadoop program would tokenize this line of text >>> and output <"foo",1> and so on. How would the multithreadedmapper know >>> how to further divide this line of text into, say: [<null,"foo >>> bar">,<null,"lambda beta">] for 2 threads to run in parallel? Can you >>> somehow provide an additional record reader to split the input to the >>> map task into sub-inputs for each thread? >> >> In MultithreadedMapper, the IO work is still single threaded, while >> the map() calling post-read is multithreaded. But yes you could use a >> mix of CombineFileInputFormat and some custom logic to have multiple >> local splits per map task, and divide readers of them among your >> threads. But why do all this when thats what slots at the TT are for? > >I'm still unsure how the multi-threaded mapper knows how to split the >input value into chunks, one chunk for each thread. There is only one >example in the Hadoop 0.23 trunk that offers an example: >hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java > >And in that source code, there is no custom logic for local splits per >map task at all. Again, going back to the word count example. Given a >line of text as input to a map, which comprises of 6 words. I >specificy .setNumberOfThreads( 2 ), so ideally, I'd want 3 words >analysed by one thread, and the 3 to the other. Is what what would >happen? i.e. - I'm unsure whether the multithreadedmapper class does >the splitting of inputs to map tasks... > >Regards, > > >
|
|