Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> Multiple various streaming questions

Copy link to this message
Re: Multiple various streaming questions
That's weird.  I thought I responded to this, but I don't see one on the list (and have vague recollection at best of whether I actually did respond)...anyway...

On Feb 3, 2011, at 6:41 PM, Allen Wittenauer wrote:

> On Feb 1, 2011, at 11:40 PM, Keith Wiley wrote:
>> I would really appreciate any help people can offer on the following matters.
>> When running a streaming job, -D, -files, -libjars, and -archives don't seem work, but -jobconf, -file, -cacheFile, and -cacheArchive do.  With the first four parameters anywhere in command I always get a "Streaming Command Failed!" error.  The last four work though.  Note that some of those parameters (-files) do work when I a run a Hadoop job in the normal framework, just not when I specify the streaming jar.
> There are some issues with how the streaming jar processes the command line, especially in 0.20, in that they need to be in the correct order.  In general, the -D's need to be *before* the rest of the streaming params.  This is what works for me:
> hadoop  \
>        jar \
>         `ls $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar` \
>        -Dmapred.reduce.tasks.speculative.execution=false \
>        -Dmapred.map.tasks.speculative.execution=false \
>        -Dmapred.job.name="oh noes aw is doing perl again" \
>        -input ${ATTEMPTIN} \
>        -output ${ATTEMPTOUT} \
>        -mapper map.pl \
>        -reducer reduce.pl  \
>        -file jobsvs-map1.pl \
>        -file jobsvs-reduce1.pl

I'll give that a shot today.  Thanks.  I hate deprication warnings, they make me feel so guilty.

>> How do I force a single record (input file) to be processed by a single mapper to get maximum parallelism?

>> I don't understand exactly what that means and how to go about doing it.  In the normal Hadoop framework I have achieved this goal by setting mapred.max.split.size small enough that only one input record fits (about 6MBs), but I tried that with my streaming job ala "-jobconf mapred.max.split.size=X" where X is a very low number, about as many as a single streaming input record (which in the streaming case is not 6MB, but merely ~100 bytes, just a filename referenced ala -cacheFile), but it didn't work, it sent multiple records to each mapper anyway.
> What you actually want to do is set mapred.min.split.size set to an extremely high value.  

I agree except that method I described helps force parallelism.  Setting mapred.max.split.size to a size slightly larger than a single record does a very good job of forcing 1-to-1 parallelism.  Forcing it to just larger than two records forces 2-to-1, etc.  It is very nice to be able to achieve perfect parallelism...but it didn't work with streaming.

I have since discovered that in the case of streaming, mapred.map.tasks is a good way to achieve this goal.  Ironically, if I recall correctly, this seemingly obvious method for setting the number mappers did not work so well in my original nonstreaming case, which is why I resorted to the rather contrived method of calculating and setting mapred.max.split.size instead.

>> Achieving 1-to-1 parallelism between map tasks, nodes, and input records is very import because my map tasks take a very long time to run, upwards of an hour.  I cannot have them queueing up on a small number of nodes while there are numerous unused nodes (task slots) available to be doing work.
> If all the task slots are in use, why would you care if they are queueing up?  Also keep in mind that if a node fails, that work will need to get re-done anyway.
Because all slots are not in use.  It's a very larger cluster and it's excruciating that Hadoop partially serializes a job by piling multiple map tasks onto a single map in a queue even when the cluster is massively underutilized.  This occurs when the input records are significantly smaller than the block size (6MB vs 64MB in my case, give me about a 32x serialization cost!!!).  To put it differently, if I let Hadoop do it its own stupid way, the job takes 32 times longer than it should take if it evenly distributed the map tasks across the nodes.  Packing the input files into larger sequence fils does not help with this problem.  The input splits are calculated from the individual files and thus, I still get this undesirable packing effect.

Thanks a lot.  Lots of stuff to think about in you post.  I appreciate it.


Keith Wiley     [EMAIL PROTECTED]     keithwiley.com    music.keithwiley.com

"It's a fine line between meticulous and obsessive-compulsive and a slippery
rope between obsessive-compulsive and debilitatingly slow."
                                           --  Keith Wiley