|
Terry Healy
2012-12-21, 17:56
Russell Jurney
2012-12-21, 18:56
Doug Cutting
2012-12-21, 19:09
Terry Healy
2012-12-21, 19:44
|
-
AVRO threatening to ruin ChristmasTerry Healy 2012-12-21, 17:56
Going crazy here trying to reconcile this. Found links to some aspects ,
partially implemented in the 'The Definitive Guide' AVRO weather M/R example, and outlined in Package org.apache.avro.mapred under "For jobs whose input is an Avro data file and which use an AvroMapper, but whose reducer is a non-Avro Reducer and whose output is a non-Avro format:". Clearly I have misunderstood something while attempting to follow those instructions. The test code does not include a mapper so the job setup is not like what I'm trying to achieve: AVRO format into Mapper, Text out of Reducer. (I've eliminated attempting to use the Partitioner, Comparator, and GroupingComparator used in the working M/R code that reads .tsv rather than AVRO.) The current stumbling block is "AvroFlowWritable cannot be cast to org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my class. I think my existing reducer would work fine if I could use it with the AvroMapper it throws the above exception. >From setup: conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongPair.class); conf.setOutputValueClass(AvroFlowWritable.class); NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema); AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA); AvroJob.setMapperClass(conf, AvroFlowMapper.class); AvroJob.setReducerClass(conf, AvroFlowReducer.class); Schema afwSchema ReflectData.get().getSchema(AvroFlowWritable.class); Schema pairSchema Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema); AvroJob.setMapOutputSchema(conf, pairSchema); /* * ------------------ * *** Mapper *** * ------------------ */ public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long, AvroFlowWritable>> { Long[] keepIps; // Configure removed @Override public void map(K datum, AvroCollector<Pair<Long, AvroFlowWritable>> collector, Reporter reporter) throws IOException { GenericRecord record = (GenericRecord) datum; AvroFlowWritable afw = new AvroFlowWritable(record); if (isKeeper(afw)) { Long testKey; if (inKeeperIpList(afw.getSrcIp())) { testKey = new Long(afw.getDstIp()); } else { testKey = new Long(afw.getSrcIp()); } collector.collect(new Pair<Long, AvroFlowWritable>(testKey, afw)); } } } /* * ------------------ * *** Reducer *** * ------------------ */ public static class AvroFlowReducer extends AvroReducer<Long, AvroFlowWritable, Text> { @Override public void reduce(Long key, Iterable<AvroFlowWritable> values, AvroCollector<Text> collector, Reporter reporter) throws IOException { Iterator iter = values.iterator(); while (iter.hasNext()) { AvroFlowWritable afw = (AvroFlowWritable) iter.next(); // collector.collect(new Text(afw.toString())); } } } On 12/20/2012 12:32 PM, Terry Healy wrote: > I'm just getting started using AVRO within Map/Reduce and trying to > convert some existing non-AVRO code to use AVRO input. So far the data > that previously was stored in tab delimited files has been converted to > .avro successfully as checked with avro-tools. > > Where I'm getting hung up extending beyond my book-based examples is in > attempting to read from AVRO (using generic records) where the mapper > output is NOT in AVRO format. I can't seem to reconcile extending > AvroMapper and NOT using AvroCollector. > > Here are snippets of code that show my non-AVRO M/R code and my > [failing] attempts to make this change. If anyone can help me along it > would be very much appreciated. > > -Terry > > <code> > Pre-Avro version: (Works fine with .tsv input format) > > public static class HdFlowMapper extends MapReduceBase
-
Re: AVRO threatening to ruin ChristmasRussell Jurney 2012-12-21, 18:56
Since the holidays are involved, I suggest you try Pig and the AvroStorage
UDF to load the data, and another UDF to store the data. What format are you writing in? Instructions for using avrostorage with Pig 0.10 are here: http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/ On Dec 21, 2012 9:56 AM, "Terry Healy" <[EMAIL PROTECTED]> wrote: > Going crazy here trying to reconcile this. Found links to some aspects , > partially implemented in the 'The Definitive Guide' AVRO weather M/R > example, and outlined in Package org.apache.avro.mapred under "For jobs > whose input is an Avro data file and which use an AvroMapper, but whose > reducer is a non-Avro Reducer and whose output is a non-Avro format:". > Clearly I have misunderstood something while attempting to follow those > instructions. > > The test code does not include a mapper so the job setup is not like > what I'm trying to achieve: AVRO format into Mapper, Text out of > Reducer. (I've eliminated attempting to use the Partitioner, Comparator, > and GroupingComparator used in the working M/R code that reads .tsv > rather than AVRO.) > > The current stumbling block is "AvroFlowWritable cannot be cast to > org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my > class. I think my existing reducer would work fine if I could use it > with the AvroMapper it throws the above exception. > > > From setup: > > conf.setOutputFormat(TextOutputFormat.class); > conf.setOutputKeyClass(LongPair.class); > conf.setOutputValueClass(AvroFlowWritable.class); > > NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema); > > AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA); > AvroJob.setMapperClass(conf, AvroFlowMapper.class); > AvroJob.setReducerClass(conf, AvroFlowReducer.class); > > Schema afwSchema > ReflectData.get().getSchema(AvroFlowWritable.class); > Schema pairSchema > Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema); > AvroJob.setMapOutputSchema(conf, pairSchema); > > > /* > * ------------------ > * *** Mapper *** > * ------------------ > */ > > public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long, > AvroFlowWritable>> { > > Long[] keepIps; > > // Configure removed > > @Override > public void map(K datum, AvroCollector<Pair<Long, > AvroFlowWritable>> collector, Reporter reporter) > throws IOException { > > > GenericRecord record = (GenericRecord) datum; > AvroFlowWritable afw = new AvroFlowWritable(record); > > if (isKeeper(afw)) { > > Long testKey; > if (inKeeperIpList(afw.getSrcIp())) { > > testKey = new Long(afw.getDstIp()); > > } else { > > testKey = new Long(afw.getSrcIp()); > } > collector.collect(new Pair<Long, > AvroFlowWritable>(testKey, afw)); > } > } > } > > /* > * ------------------ > * *** Reducer *** > * ------------------ > */ > > public static class AvroFlowReducer extends AvroReducer<Long, > AvroFlowWritable, Text> { > > > @Override > public void reduce(Long key, Iterable<AvroFlowWritable> values, > AvroCollector<Text> collector, Reporter reporter) throws IOException { > > Iterator iter = values.iterator(); > while (iter.hasNext()) { > > AvroFlowWritable afw = (AvroFlowWritable) iter.next(); > // > collector.collect(new Text(afw.toString())); > } > } > } > > > > On 12/20/2012 12:32 PM, Terry Healy wrote: > > I'm just getting started using AVRO within Map/Reduce and trying to > > convert some existing non-AVRO code to use AVRO input. So far the data > > that previously was stored in tab delimited files has been converted to
-
Re: AVRO threatening to ruin ChristmasDoug Cutting 2012-12-21, 19:09
On Fri, Dec 21, 2012 at 9:56 AM, Terry Healy <[EMAIL PROTECTED]> wrote:
> "For jobs > whose input is an Avro data file and which use an AvroMapper, but whose > reducer is a non-Avro Reducer and whose output is a non-Avro format:". [ ...] > public static class AvroFlowReducer extends AvroReducer It looks to me like your reducer is an Avro reducer when in this case you should instead subclass org.apache.hadoop,mapred.Reducer. An example of this is in TestSequenceFileReader#testNonAvroReducer: http://s.apache.org/testnonavroreducer I hope this helps. Doug
-
Re: AVRO threatening to ruin ChristmasTerry Healy 2012-12-21, 19:44
Doug-
I have tried to use that Test as a reference, but being new to this I can't reconcile the "missing" mapper or bridge the gap in my mind between a SequenceFile input rather than an Avro file. So I still wind up with the "AvroFlowWritable cannot be cast to org.apache.avro.generic.IndexedRecord" exception. Have a good Christmas - maybe I'll get an Avro book. -TH On 12/21/2012 02:09 PM, Doug Cutting wrote: > On Fri, Dec 21, 2012 at 9:56 AM, Terry Healy <[EMAIL PROTECTED]> wrote: >> "For jobs >> whose input is an Avro data file and which use an AvroMapper, but whose >> reducer is a non-Avro Reducer and whose output is a non-Avro format:". > [ ...] >> public static class AvroFlowReducer extends AvroReducer > > It looks to me like your reducer is an Avro reducer when in this case > you should instead subclass org.apache.hadoop,mapred.Reducer. > > An example of this is in TestSequenceFileReader#testNonAvroReducer: > > http://s.apache.org/testnonavroreducer > > I hope this helps. > > Doug > |