Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro, mail # user - Order of the schema in Union


Copy link to this message
-
Re: Order of the schema in Union
Scott Carey 2012-02-21, 17:39
As for why the union does not seem to match:
The Union schemas are not the same as the one in the error ‹ the one in the
error does not have a namespace.  It finds "AVRO_NCP_ICM"  but the union has
only  "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID".
The namespace and name must both match.

Is your output schema correct?  It looks like you are setting both your
MapOutputSchema and OutputSchema to be a Pair schema.  I suspect you only
want the Pair schema as a map output and reducer input, but cannot be sure
from the below.

>From the below, your reducer must create Pair objects and output them, and
maybe that is related to the error below.  It may also be related to the
combiner, does it happen without it?

On 2/12/12 11:01 PM, "Serge Blazhievsky" <[EMAIL PROTECTED]> wrote:

> Hi all,
>
> I am running into an interesting problem with Union. It seems that order of
> the schema in union must be in the same order as input path for different
> files.
>
> This does not look like right behavior. The code and exception are below.
>
> The moment I change the order in union it works.
>
>
> Thanks
> Serge
>
>
>    public int run(String[] strings) throws Exception {
>
>         JobConf job = new JobConf();
>
>
>         job.setNumMapTasks(map);
>         job.setNumReduceTasks(reduce);
>
>
>         // Uncomment to run locally in a single process
>         job.set("mapred.job.tracker", "local");
>
>         File file = new File(input);
>         DatumReader<GenericRecord> reader = new
> GenericDatumReader<GenericRecord>();
>         DataFileReader<GenericRecord> dataFileReader = new
> DataFileReader<GenericRecord>(file, reader);
>
>         Schema s = dataFileReader.getSchema();
>        
>        
>
>      
>        
>         File lfile = new File(linput);
>         DatumReader<GenericRecord> lreader = new
> GenericDatumReader<GenericRecord>();
>         DataFileReader<GenericRecord> ldataFileReader = new
> DataFileReader<GenericRecord>(lfile, lreader);
>
>         Schema s2 = ldataFileReader.getSchema();
>        
>      
>        
>        List<Schema> slist= new ArrayList<Schema>();
>        
>        slist.add(s2);
>        slist.add(s);
>        
>        
>        
>        System.out.println(s.toString(true));
>        System.out.println(s2.toString(true));
>        
>        
>        
>         Schema s_union=Schema.createUnion(slist);
>          
>        
>        
>         AvroJob.setInputSchema(job, s_union);
>
>
>
>         List<Schema.Field> fields = s.getFields();
>
>         List<Schema.Field> outfields = new ArrayList<Schema.Field>();
>
>
>         for (Schema.Field f : fields) {
>
>             outfields.add(new Schema.Field(f.name <http://f.name> (),
> Schema.create(Type.STRING), null, null));
>         }
>
>         boolean b = false;
>         Schema outschema = Schema.createRecord("AVRO_IVR_BY_CALLID",
> "AVRO_IVR_BY_CALLID", "merced", b);
>
>         outschema.setFields(outfields);
>
>        
>
>         Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
>
>
>         Schema OUT_SCHEMA = new Pair<String, GenericRecord>("", STRING_SCHEMA,
> new GenericData.Record(outschema), outschema).getSchema();
>
>        
>         AvroJob.setMapOutputSchema(job, OUT_SCHEMA);
>         AvroJob.setOutputSchema(job, OUT_SCHEMA);
>
>         AvroJob.setMapperClass(job, MapImpl.class);
>         AvroJob.setCombinerClass(job, ReduceImpl.class);
>         AvroJob.setReducerClass(job, ReduceImpl.class);
>
>        // FileInputFormat.setInputPaths(job, new Path(input));
>        
>        
>         FileInputFormat.addInputPath(job, new Path(linput));
>         FileInputFormat.addInputPath(job, new Path(input));
>        
>        
>        
>        
>        // MultipleInputs.addInputPath(job, new Path(input),
> AvroInputFormat<GenericRecord>.class, MapImpl.class);
>        
>         FileOutputFormat.setOutputPath(job, new Path(output));
>         FileOutputFormat.setCompressOutput(job, true);