Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro, mail # user - AVRO threatening to ruin Christmas


Copy link to this message
-
Re: AVRO threatening to ruin Christmas
Russell Jurney 2012-12-21, 18:56
Since the holidays are involved, I suggest you try Pig and the AvroStorage
UDF to load the data, and another UDF to store the data. What format are
you writing in?

Instructions for using avrostorage with Pig 0.10 are here:
http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/
 On Dec 21, 2012 9:56 AM, "Terry Healy" <[EMAIL PROTECTED]> wrote:

> Going crazy here trying to reconcile this. Found links to some aspects ,
> partially implemented in the 'The Definitive Guide' AVRO weather M/R
> example, and outlined in Package org.apache.avro.mapred under "For jobs
> whose input is an Avro data file and which use an AvroMapper, but whose
> reducer is a non-Avro Reducer and whose output is a non-Avro format:".
> Clearly I have misunderstood something while attempting to follow those
> instructions.
>
> The test code does not include a mapper so the job setup is not like
> what I'm trying to achieve: AVRO format into Mapper, Text out of
> Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
> and GroupingComparator used in the working M/R code that reads .tsv
> rather than AVRO.)
>
> The current stumbling block is "AvroFlowWritable cannot be cast to
> org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
> class. I think my existing reducer would work fine if I could use it
> with the AvroMapper it throws the above exception.
>
>
> From setup:
>
>         conf.setOutputFormat(TextOutputFormat.class);
>         conf.setOutputKeyClass(LongPair.class);
>         conf.setOutputValueClass(AvroFlowWritable.class);
>
>         NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);
>
>         AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
>         AvroJob.setMapperClass(conf, AvroFlowMapper.class);
>         AvroJob.setReducerClass(conf, AvroFlowReducer.class);
>
>         Schema afwSchema > ReflectData.get().getSchema(AvroFlowWritable.class);
>         Schema pairSchema > Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
>         AvroJob.setMapOutputSchema(conf, pairSchema);
>
>
>     /*
>      * ------------------
>      * *** Mapper ***
>      * ------------------
>      */
>
> public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
> AvroFlowWritable>> {
>
>         Long[] keepIps;
>
>         // Configure removed
>
>         @Override
>         public void map(K datum, AvroCollector<Pair<Long,
>                 AvroFlowWritable>> collector, Reporter reporter)
>                 throws IOException {
>
>
>             GenericRecord record = (GenericRecord) datum;
>             AvroFlowWritable afw = new AvroFlowWritable(record);
>
>             if (isKeeper(afw)) {
>
>                  Long testKey;
>                 if (inKeeperIpList(afw.getSrcIp())) {
>
>                     testKey = new Long(afw.getDstIp());
>
>                 } else {
>
>                     testKey = new Long(afw.getSrcIp());
>                 }
>                 collector.collect(new Pair<Long,
> AvroFlowWritable>(testKey, afw));
>             }
>         }
>     }
>
>  /*
>      * ------------------
>      * *** Reducer ***
>      * ------------------
>      */
>
>     public static class AvroFlowReducer extends AvroReducer<Long,
> AvroFlowWritable, Text> {
>
>
>         @Override
>         public void reduce(Long key, Iterable<AvroFlowWritable> values,
> AvroCollector<Text> collector, Reporter reporter) throws IOException {
>
>             Iterator iter = values.iterator();
>             while (iter.hasNext()) {
>
>                 AvroFlowWritable afw = (AvroFlowWritable) iter.next();
>                 //
>                 collector.collect(new Text(afw.toString()));
>             }
>         }
>    }
>
>
>
> On 12/20/2012 12:32 PM, Terry Healy wrote:
> > I'm just getting started using AVRO within Map/Reduce and trying to
> > convert some existing non-AVRO code to use AVRO input. So far the data
> > that previously was stored in tab delimited files has been converted to