Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Avro, mail # user - Output from AVRO mapper


Copy link to this message
-
Output from AVRO mapper
Terry Healy 2012-12-20, 17:32
I'm just getting started using AVRO within Map/Reduce and trying to
convert some existing non-AVRO code to use AVRO input. So far the data
that previously was stored in tab delimited files has been converted to
.avro successfully as checked with avro-tools.

Where I'm getting hung up extending beyond my book-based examples is in
attempting to read from AVRO (using generic records) where the mapper
output is NOT in AVRO format. I can't seem to reconcile extending
AvroMapper and NOT using AvroCollector.

Here are snippets of code that show my non-AVRO M/R code and my
[failing] attempts to make this change. If anyone can help me along it
would be very much appreciated.

-Terry

<code>
Pre-Avro version: (Works fine with .tsv input format)

    public static class HdFlowMapper extends MapReduceBase
            implements Mapper<Text, HdFlowWritable, LongPair,
HdFlowWritable> {
        @Override
        public void map(Text key, HdFlowWritable value,
                OutputCollector<LongPair, HdFlowWritable> output,
                Reporter reporter) throws IOException {

...//
                outKey = new LongPair(value.getSrcIp(), value.getFirst());

                HdFlowWritable outValue = value; // pass it all through
                output.collect(outKey, outValue);
}

AVRO attempt:
        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(LongPair.class);
        conf.setOutputValueClass(AvroFlowWritable.class);

        SCHEMA = new Schema.Parser().parse(NetflowSchema);
        AvroJob.setInputSchema(conf, SCHEMA);
        //AvroJob.setOutputSchema(conf, SCHEMA);
        AvroJob.setMapperClass(conf, AvroFlowMapper.class);
        AvroJob.setReducerClass(conf, AvroFlowReducer.class);

....

     public static class AvroFlowMapper<K> extends AvroMapper<K,
OutputCollector> {
        @Override
** IDE: "Method does not override or implement a method from a supertype"

        public void map(K datum, OutputCollector<LongPair,
AvroFlowWritable> collector, Reporter reporter) throws IOException {
            GenericRecord record = (GenericRecord) datum;
            afw = new AvroFlowWritable(record);
   // ...
            collector.collect(outKey, afw);
}

</code>
+
Russell Jurney 2012-12-22, 00:42
+
Terry Healy 2012-12-22, 18:33