Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro >> mail # user >> AVRO threatening to ruin Christmas

Copy link to this message
AVRO threatening to ruin Christmas
Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R
example, and outlined in Package org.apache.avro.mapred under "For jobs
whose input is an Avro data file and which use an AvroMapper, but whose
reducer is a non-Avro Reducer and whose output is a non-Avro format:".
Clearly I have misunderstood something while attempting to follow those

The test code does not include a mapper so the job setup is not like
what I'm trying to achieve: AVRO format into Mapper, Text out of
Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
and GroupingComparator used in the working M/R code that reads .tsv
rather than AVRO.)

The current stumbling block is "AvroFlowWritable cannot be cast to
org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
class. I think my existing reducer would work fine if I could use it
with the AvroMapper it throws the above exception.
>From setup:


        NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);

        AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
        AvroJob.setMapperClass(conf, AvroFlowMapper.class);
        AvroJob.setReducerClass(conf, AvroFlowReducer.class);

        Schema afwSchema ReflectData.get().getSchema(AvroFlowWritable.class);
        Schema pairSchema Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
        AvroJob.setMapOutputSchema(conf, pairSchema);
     * ------------------
     * *** Mapper ***
     * ------------------

public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
AvroFlowWritable>> {

        Long[] keepIps;

// Configure removed

        public void map(K datum, AvroCollector<Pair<Long,
                AvroFlowWritable>> collector, Reporter reporter)
                throws IOException {
            GenericRecord record = (GenericRecord) datum;
            AvroFlowWritable afw = new AvroFlowWritable(record);

            if (isKeeper(afw)) {

                 Long testKey;
                if (inKeeperIpList(afw.getSrcIp())) {

                    testKey = new Long(afw.getDstIp());

                } else {

                    testKey = new Long(afw.getSrcIp());
                collector.collect(new Pair<Long,
AvroFlowWritable>(testKey, afw));

     * ------------------
     * *** Reducer ***
     * ------------------

    public static class AvroFlowReducer extends AvroReducer<Long,
AvroFlowWritable, Text> {
        public void reduce(Long key, Iterable<AvroFlowWritable> values,
AvroCollector<Text> collector, Reporter reporter) throws IOException {

            Iterator iter = values.iterator();
            while (iter.hasNext()) {

                AvroFlowWritable afw = (AvroFlowWritable) iter.next();
                collector.collect(new Text(afw.toString()));

On 12/20/2012 12:32 PM, Terry Healy wrote:
> I'm just getting started using AVRO within Map/Reduce and trying to
> convert some existing non-AVRO code to use AVRO input. So far the data
> that previously was stored in tab delimited files has been converted to
> .avro successfully as checked with avro-tools.
> Where I'm getting hung up extending beyond my book-based examples is in
> attempting to read from AVRO (using generic records) where the mapper
> output is NOT in AVRO format. I can't seem to reconcile extending
> AvroMapper and NOT using AvroCollector.
> Here are snippets of code that show my non-AVRO M/R code and my
> [failing] attempts to make this change. If anyone can help me along it
> would be very much appreciated.
> -Terry
> <code>
> Pre-Avro version: (Works fine with .tsv input format)
>     public static class HdFlowMapper extends MapReduceBase