|
|
-
AVRO threatening to ruin ChristmasTerry Healy 2012-12-21, 17:56
Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R example, and outlined in Package org.apache.avro.mapred under "For jobs whose input is an Avro data file and which use an AvroMapper, but whose reducer is a non-Avro Reducer and whose output is a non-Avro format:". Clearly I have misunderstood something while attempting to follow those instructions. The test code does not include a mapper so the job setup is not like what I'm trying to achieve: AVRO format into Mapper, Text out of Reducer. (I've eliminated attempting to use the Partitioner, Comparator, and GroupingComparator used in the working M/R code that reads .tsv rather than AVRO.) The current stumbling block is "AvroFlowWritable cannot be cast to org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my class. I think my existing reducer would work fine if I could use it with the AvroMapper it throws the above exception. >From setup: conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongPair.class); conf.setOutputValueClass(AvroFlowWritable.class); NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema); AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA); AvroJob.setMapperClass(conf, AvroFlowMapper.class); AvroJob.setReducerClass(conf, AvroFlowReducer.class); Schema afwSchema ReflectData.get().getSchema(AvroFlowWritable.class); Schema pairSchema Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema); AvroJob.setMapOutputSchema(conf, pairSchema); /* * ------------------ * *** Mapper *** * ------------------ */ public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long, AvroFlowWritable>> { Long[] keepIps; // Configure removed @Override public void map(K datum, AvroCollector<Pair<Long, AvroFlowWritable>> collector, Reporter reporter) throws IOException { GenericRecord record = (GenericRecord) datum; AvroFlowWritable afw = new AvroFlowWritable(record); if (isKeeper(afw)) { Long testKey; if (inKeeperIpList(afw.getSrcIp())) { testKey = new Long(afw.getDstIp()); } else { testKey = new Long(afw.getSrcIp()); } collector.collect(new Pair<Long, AvroFlowWritable>(testKey, afw)); } } } /* * ------------------ * *** Reducer *** * ------------------ */ public static class AvroFlowReducer extends AvroReducer<Long, AvroFlowWritable, Text> { @Override public void reduce(Long key, Iterable<AvroFlowWritable> values, AvroCollector<Text> collector, Reporter reporter) throws IOException { Iterator iter = values.iterator(); while (iter.hasNext()) { AvroFlowWritable afw = (AvroFlowWritable) iter.next(); // collector.collect(new Text(afw.toString())); } } } On 12/20/2012 12:32 PM, Terry Healy wrote: > I'm just getting started using AVRO within Map/Reduce and trying to > convert some existing non-AVRO code to use AVRO input. So far the data > that previously was stored in tab delimited files has been converted to > .avro successfully as checked with avro-tools. > > Where I'm getting hung up extending beyond my book-based examples is in > attempting to read from AVRO (using generic records) where the mapper > output is NOT in AVRO format. I can't seem to reconcile extending > AvroMapper and NOT using AvroCollector. > > Here are snippets of code that show my non-AVRO M/R code and my > [failing] attempts to make this change. If anyone can help me along it > would be very much appreciated. > > -Terry > > <code> > Pre-Avro version: (Works fine with .tsv input format) > > public static class HdFlowMapper extends MapReduceBase |