Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Avro, mail # dev - Re: How to provide the multiple avro inputs to Two avroMappers and same mapper outputs to single avroReducer.


Copy link to this message
-
Re: How to provide the multiple avro inputs to Two avroMappers and same mapper outputs to single avroReducer.
Harsh J 2013-03-24, 18:06
One would ideally use the MultipleInputs Hadoop MR class to achieve
this but it lacks "avro support", same as the rest of the Hadoop MR
classes and there's nothing in Avro MR libs that complement it yet.

>From past, I recall basing own libraries/applications on the same
prefixes MultipleInputs uses in Configuration, for providing schemas,
etc. to each input
path's tagged mapper but this was hackery done to achieve some end. We
should ideally add an AvroMultipleInputs class to Avro's MR libs to
cover this gap. Do you mind opening a JIRA to discuss the
implementation further?

On Thu, Mar 21, 2013 at 4:28 PM, rajharireddy <[EMAIL PROTECTED]> wrote:
> How to provide the multiple avro inputs to Two avroMappers and same mapper
> outputs to single avroReducer.
>
> written code for single input to one avroMappers and same mapper outputs to
> single avroReducer.
>
>
>
>
> import java.io.File;
> import java.io.IOException;
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericData.Record;
> import org.apache.avro.mapred.AvroCollector;
> import org.apache.avro.mapred.AvroJob;
> import org.apache.avro.mapred.AvroMapper;
> import org.apache.avro.mapred.AvroReducer;
> import org.apache.avro.mapred.FsInput;
> import org.apache.avro.mapred.Pair;
> import org.apache.avro.util.Utf8;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.FSDataInputStream;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.IOUtils;
> import org.apache.hadoop.mapred.FileInputFormat;
> import org.apache.hadoop.mapred.FileOutputFormat;
> import org.apache.hadoop.mapred.JobClient;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
>
> public class Test extends Configured implements Tool {
>
>         static class TargetMapper extends AvroMapper<GenericRecord, Pair<Utf8,
> GenericRecord>> {
>             public void map(GenericRecord line, AvroCollector<Pair<Utf8,
> GenericRecord>> collector, Reporter reporter) throws IOException {
>                 try{
>                         collector.collect(new Pair<Utf8, GenericRecord>("data", line));
>                 }
>                 catch (Exception e) {
>                                 System.out.println("Error message "+ e.getMessage());
>                         }
>             }
>         }
>
>         static class TargetReducer extends AvroReducer<Utf8, GenericRecord,
> GenericRecord> {
>
>                 public void reduce(Utf8 key, Iterable<GenericRecord> values,
> AvroCollector<GenericRecord> collector, Reporter reporter) throws
> IOException
>             {
>                         try{
>                         Configuration conf = getConf();
>                         Path inputDir = new Path("/user/cts336339/avro");
>                             Schema output_schema = readOutputSchema(inputDir, conf);
>                             //System.out.println("output_schema " + output_schema);
>
>                         GenericRecord output = new GenericData.Record(output_schema); //
> Record for output
>                         System.out.println("inside for loop" + output);
>                         GenericRecord location = new
> GenericData.Record(output_schema.getField("Location").schema());        //Record
> for Location Field
>                         GenericRecord data = new
> GenericData.Record(output_schema.getField("Data").schema());            //Record for
> Data Field
>
>                         for (GenericRecord value : values){
>                                 //System.out.println("inside for loop" + value);

Harsh J