Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro >> mail # user >> AVRO threatening to ruin Christmas


Copy link to this message
-
AVRO threatening to ruin Christmas
Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R
example, and outlined in Package org.apache.avro.mapred under "For jobs
whose input is an Avro data file and which use an AvroMapper, but whose
reducer is a non-Avro Reducer and whose output is a non-Avro format:".
Clearly I have misunderstood something while attempting to follow those
instructions.

The test code does not include a mapper so the job setup is not like
what I'm trying to achieve: AVRO format into Mapper, Text out of
Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
and GroupingComparator used in the working M/R code that reads .tsv
rather than AVRO.)

The current stumbling block is "AvroFlowWritable cannot be cast to
org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
class. I think my existing reducer would work fine if I could use it
with the AvroMapper it throws the above exception.
>From setup:

        conf.setOutputFormat(TextOutputFormat.class);
        conf.setOutputKeyClass(LongPair.class);
        conf.setOutputValueClass(AvroFlowWritable.class);

        NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);

        AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
        AvroJob.setMapperClass(conf, AvroFlowMapper.class);
        AvroJob.setReducerClass(conf, AvroFlowReducer.class);

        Schema afwSchema ReflectData.get().getSchema(AvroFlowWritable.class);
        Schema pairSchema Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
        AvroJob.setMapOutputSchema(conf, pairSchema);
    /*
     * ------------------
     * *** Mapper ***
     * ------------------
     */

public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
AvroFlowWritable>> {

        Long[] keepIps;

// Configure removed

        @Override
        public void map(K datum, AvroCollector<Pair<Long,
                AvroFlowWritable>> collector, Reporter reporter)
                throws IOException {
            GenericRecord record = (GenericRecord) datum;
            AvroFlowWritable afw = new AvroFlowWritable(record);

            if (isKeeper(afw)) {

                 Long testKey;
                if (inKeeperIpList(afw.getSrcIp())) {

                    testKey = new Long(afw.getDstIp());

                } else {

                    testKey = new Long(afw.getSrcIp());
                }
                collector.collect(new Pair<Long,
AvroFlowWritable>(testKey, afw));
            }
        }
    }

 /*
     * ------------------
     * *** Reducer ***
     * ------------------
     */

    public static class AvroFlowReducer extends AvroReducer<Long,
AvroFlowWritable, Text> {
        @Override
        public void reduce(Long key, Iterable<AvroFlowWritable> values,
AvroCollector<Text> collector, Reporter reporter) throws IOException {

            Iterator iter = values.iterator();
            while (iter.hasNext()) {

                AvroFlowWritable afw = (AvroFlowWritable) iter.next();
//
                collector.collect(new Text(afw.toString()));
            }
        }
   }

On 12/20/2012 12:32 PM, Terry Healy wrote:
> I'm just getting started using AVRO within Map/Reduce and trying to
> convert some existing non-AVRO code to use AVRO input. So far the data
> that previously was stored in tab delimited files has been converted to
> .avro successfully as checked with avro-tools.
>
> Where I'm getting hung up extending beyond my book-based examples is in
> attempting to read from AVRO (using generic records) where the mapper
> output is NOT in AVRO format. I can't seem to reconcile extending
> AvroMapper and NOT using AvroCollector.
>
> Here are snippets of code that show my non-AVRO M/R code and my
> [failing] attempts to make this change. If anyone can help me along it
> would be very much appreciated.
>
> -Terry
>
> <code>
> Pre-Avro version: (Works fine with .tsv input format)
>
>     public static class HdFlowMapper extends MapReduceBase
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB