Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> muti-thread mapreduce


Copy link to this message
-
Re: muti-thread mapreduce
Thank you all.  In fact, I don't expect that this way can help to enhance
the performance.
 I  need to process 3 different logs (with different format). I just want
to sart all these 3 logs processing at the same time , all in just this one
program. but  I can give different separator to each thread to create maps
to  handle different logs.
2012/12/13 Yang <[EMAIL PROTECTED]>

> but I do have run across some situations where I could benefit from
> multi-threading: if your hadoop mapper is prone to random access IO (such
> as looking up a TFile, or HBase, which ultimately makes a network call and
> then looks into a file segment), having multiple threads could utilize the
> CPU while IO is going on
>
>
> On Wed, Dec 12, 2012 at 10:47 AM, Harsh J <[EMAIL PROTECTED]> wrote:
>
>> Exactly - A job is already designed to be properly parallel w.r.t. its
>> input, and this would just add additional overheads of job setup and
>> scheduling. If your per-record processing requires threaded work,
>> consider using the MultithreadedMapper/Reducer classes instead.
>>
>> On Wed, Dec 12, 2012 at 10:53 PM, Yang <[EMAIL PROTECTED]> wrote:
>> > I think it won't help much, since in a hadoop cluster, people already
>> > allocate "SLOTS" to be the number of cores, supposedly the inherent
>> > parallelism can be already exploited, since different mappers/reducers
>> are
>> > completely independent.
>> >
>> >
>> > On Wed, Dec 12, 2012 at 2:09 AM, Yu Yang <[EMAIL PROTECTED]> wrote:
>> >>
>> >> Dears,
>> >>
>> >> I suddenly got this idea to do mapreduce job in a muti-thread way.
>> >> I don't know if it can work. Could anyone give me some advices?
>> >> Here is the java code:
>> >>
>> >>
>> >> import java.io.IOException;
>> >> import org.apache.hadoop.conf.Configuration;
>> >> import org.apache.hadoop.fs.Path;
>> >> import org.apache.hadoop.io.LongWritable;
>> >> import org.apache.hadoop.io.Text;
>> >> import org.apache.hadoop.mapreduce.Job;
>> >> import org.apache.hadoop.mapreduce.Mapper;
>> >> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> >> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
>> >> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>> >> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
>> >>
>> >> public class LogProcessApp extends Thread {
>> >>
>> >>  private static String sep;
>> >>  private String x2;
>> >>  private String x3;
>> >>
>> >>  public LogProcessApp(String arg1,String arg2,String arg3){
>> >>   sep=arg1;
>> >>   x2=arg2;
>> >>   x3=arg3;
>> >>  }
>> >>
>> >>  public static class CM extends Mapper<LongWritable, Text, Text, Text>{
>> >>   private Text keyvar = new Text();
>> >>   private Text valvar = new Text();
>> >>   public void map(LongWritable key, Text value, Context context)
>> >>     throws IOException, InterruptedException {
>> >>    String line = value.toString();
>> >>    try{
>> >>     String data[] = line.split(sep);
>> >>     keyvar.set(data[0]);
>> >>     valvar.set(data[1]);
>> >>     context.write(keyvar,valvar);
>> >>    } catch (Exception e) {
>> >>     return;
>> >>    }
>> >>   }
>> >>  }
>> >>
>> >>  public void run(){
>> >>   Configuration conf = new Configuration();
>> >>   Job job = null;
>> >>   try {
>> >>    job = new Job(conf);
>> >>   } catch (IOException e1) {
>> >>    // TODO Auto-generated catch block
>> >>    e1.printStackTrace();
>> >>   }
>> >>
>> >>   job.setJobName("XXXJob");
>> >>   job.setJarByClass(CMR.class);
>> >>
>> >>   job.setOutputKeyClass(Text.class);
>> >>   job.setOutputValueClass(Text.class);
>> >>
>> >>   job.setMapperClass(CM.class);
>> >>
>> >>   job.setInputFormatClass(TextInputFormat.class);
>> >>   job.setOutputFormatClass(TextOutputFormat.class);
>> >>
>> >>   try {
>> >>    FileInputFormat.addInputPath(job, new Path(x2));
>> >>   } catch (IOException e) {
>> >>    // TODO Auto-generated catch block
>> >>    e.printStackTrace();
>> >>   }
>> >>   FileOutputFormat.setOutputPath(job, new Path(x3));