Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Avro, mail # user - Error using Avro 1.7.4 Specific API with .mapreduce


+
Ramachandran, Karthik 2013-03-01, 02:58
Copy link to this message
-
Re: Error using Avro 1.7.4 Specific API with .mapreduce
Erick Tryzelaar 2013-03-01, 17:52
We figured it out. The actual problem was excluded from the email. Here's
the full mapper:

    @Override
    protected void map(AvroKey<NetworkRecord> key, NullWritable value,
Context context)
            throws IOException, InterruptedException {
        startTime.setTime(key.datum().getStartEpoch());
        avroKey.datum(DateUtils.truncate(startTime,
Calendar.HOUR_OF_DAY).getTime());
        avroValue.datum(key);
        context.write(avroKey, avroValue);
    }

The problem was in our setting of the avroValue. The fix was changing the
setting of the datum to this:

avroValue.datum(key.datum());

-e

On Thu, Feb 28, 2013 at 6:58 PM, Ramachandran, Karthik <
[EMAIL PROTECTED]> wrote:

>  Hi,
>
>  I'm trying to write a Mapper (using the .mapreduce) api that takes an
> Avro 1.7.4 file. I'm using the specific API (the maven avro plugin) to
> generate my objects from an
> .avdl file.
>
>  I've verified that the Avro file is generated correctly and can use the
> avro-to-json function in the Avro tools jar to read the file back.
>
>  *The error I'm receiving is : *
>
>  java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to org.lab41.cyprus.domain.NetworkRecord
>     at
> org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45)
>     at
> org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23)
>     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>     at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\
>
>  *
> *
> *The driver class is as follows : *
>
>  public class RollupAvroFiles extends Configured implements Tool{
>
>     @Override
>     public int run(String[] args) throws Exception {
>         Configuration conf = getConf();
>         String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
>
>         String input, output;
>         if (otherArgs.length == 2) {
>             input = otherArgs[0];
>             output = otherArgs[1];
>         } else {
>             return 1;
>         }
>
>          /** configure Job **/
>         Job job = new Job(conf, "RollupAvroFiles");
>         job.setJarByClass(RollupAvroFiles.class);
>         job.setUserClassesTakesPrecedence(true);
>         job.setNumReduceTasks(1);
>
>          FileInputFormat.setInputPaths(job, new Path(input));
>         job.setInputFormatClass(AvroKeyInputFormat.class);
>         AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$);
>
>
>         job.setMapperClass(RollupAvroFilesMapper.class);
>         AvroJob.setMapOutputKeySchema(job,
> Schema.create(Schema.Type.LONG));
>         AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);
>
>          job.setMapOutputKeyClass(AvroKey.class);
>         job.setMapOutputValueClass(AvroValue.class);
>
>          job.setReducerClass(RollupAvroFilesReducer.class);
>         AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);
>
>          job.setOutputFormatClass(AvroKeyOutputFormat.class);
>         FileOutputFormat.setOutputPath(job, new Path(output));
>
>         return job.waitForCompletion(true) ? 0 : 1;
>     }
>
>      public static void main(String[] args) throws Exception {
>         int exitCode = ToolRunner.run(new RollupAvroFiles(), args);
>         System.exit(exitCode);
>     }
>
> }
>
>  *And the Mapper class reads : *
>
>  public class RollupAvroFilesMapper
>         extends Mapper<AvroKey<NetworkRecord>, NullWritable,
> AvroKey<Long>, AvroValue<NetworkRecord>> {
>
>
>
>      @Override
>     protected void setup(Context context) throws IOException,
> InterruptedException {
>     }
>
>      @Override
>     protected void map(AvroKey<NetworkRecord> key, NullWritable value,
> Context context)
>             throws IOException, InterruptedException {
> NetworkRecord = key.datum();
> ….
>     }
>  }
>
>  Any thoughts would be appreicated