|
|
-
Output from AVRO mapperTerry Healy 2012-12-20, 17:32
I'm just getting started using AVRO within Map/Reduce and trying to
convert some existing non-AVRO code to use AVRO input. So far the data that previously was stored in tab delimited files has been converted to .avro successfully as checked with avro-tools. Where I'm getting hung up extending beyond my book-based examples is in attempting to read from AVRO (using generic records) where the mapper output is NOT in AVRO format. I can't seem to reconcile extending AvroMapper and NOT using AvroCollector. Here are snippets of code that show my non-AVRO M/R code and my [failing] attempts to make this change. If anyone can help me along it would be very much appreciated. -Terry <code> Pre-Avro version: (Works fine with .tsv input format) public static class HdFlowMapper extends MapReduceBase implements Mapper<Text, HdFlowWritable, LongPair, HdFlowWritable> { @Override public void map(Text key, HdFlowWritable value, OutputCollector<LongPair, HdFlowWritable> output, Reporter reporter) throws IOException { ...// outKey = new LongPair(value.getSrcIp(), value.getFirst()); HdFlowWritable outValue = value; // pass it all through output.collect(outKey, outValue); } AVRO attempt: conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongPair.class); conf.setOutputValueClass(AvroFlowWritable.class); SCHEMA = new Schema.Parser().parse(NetflowSchema); AvroJob.setInputSchema(conf, SCHEMA); //AvroJob.setOutputSchema(conf, SCHEMA); AvroJob.setMapperClass(conf, AvroFlowMapper.class); AvroJob.setReducerClass(conf, AvroFlowReducer.class); .... public static class AvroFlowMapper<K> extends AvroMapper<K, OutputCollector> { @Override ** IDE: "Method does not override or implement a method from a supertype" public void map(K datum, OutputCollector<LongPair, AvroFlowWritable> collector, Reporter reporter) throws IOException { GenericRecord record = (GenericRecord) datum; afw = new AvroFlowWritable(record); // ... collector.collect(outKey, afw); } </code> |