|
|
-
Error using Avro 1.7.4 Specific API with .mapreduce
Ramachandran, Karthik 2013-03-01, 02:58
Hi,
I'm trying to write a Mapper (using the .mapreduce) api that takes an Avro 1.7.4 file. I'm using the specific API (the maven avro plugin) to generate my objects from an .avdl file.
I've verified that the Avro file is generated correctly and can use the avro-to-json function in the Avro tools jar to read the file back.
The error I'm receiving is :
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.lab41.cyprus.domain.NetworkRecord at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45) at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\ The driver class is as follows :
public class RollupAvroFiles extends Configured implements Tool{
@Override public int run(String[] args) throws Exception { Configuration conf = getConf(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
String input, output; if (otherArgs.length == 2) { input = otherArgs[0]; output = otherArgs[1]; } else { return 1; }
/** configure Job **/ Job job = new Job(conf, "RollupAvroFiles"); job.setJarByClass(RollupAvroFiles.class); job.setUserClassesTakesPrecedence(true); job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(input)); job.setInputFormatClass(AvroKeyInputFormat.class); AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$); job.setMapperClass(RollupAvroFilesMapper.class); AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.LONG)); AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);
job.setMapOutputKeyClass(AvroKey.class); job.setMapOutputValueClass(AvroValue.class);
job.setReducerClass(RollupAvroFilesReducer.class); AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);
job.setOutputFormatClass(AvroKeyOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : 1; }
public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new RollupAvroFiles(), args); System.exit(exitCode); }
}
And the Mapper class reads :
public class RollupAvroFilesMapper extends Mapper<AvroKey<NetworkRecord>, NullWritable, AvroKey<Long>, AvroValue<NetworkRecord>> {
@Override protected void setup(Context context) throws IOException, InterruptedException { }
@Override protected void map(AvroKey<NetworkRecord> key, NullWritable value, Context context) throws IOException, InterruptedException { NetworkRecord = key.datum(); …. } }
Any thoughts would be appreicated -- Karthik Ramachandran In-Q-Tel Software Developer, Lab41 Mobile: (571) 455-5576 Email: [EMAIL PROTECTED]
-
Re: Error using Avro 1.7.4 Specific API with .mapreduce
Erick Tryzelaar 2013-03-01, 17:52
We figured it out. The actual problem was excluded from the email. Here's the full mapper:
@Override protected void map(AvroKey<NetworkRecord> key, NullWritable value, Context context) throws IOException, InterruptedException { startTime.setTime(key.datum().getStartEpoch()); avroKey.datum(DateUtils.truncate(startTime, Calendar.HOUR_OF_DAY).getTime()); avroValue.datum(key); context.write(avroKey, avroValue); }
The problem was in our setting of the avroValue. The fix was changing the setting of the datum to this:
avroValue.datum(key.datum());
-e
On Thu, Feb 28, 2013 at 6:58 PM, Ramachandran, Karthik < [EMAIL PROTECTED]> wrote:
> Hi, > > I'm trying to write a Mapper (using the .mapreduce) api that takes an > Avro 1.7.4 file. I'm using the specific API (the maven avro plugin) to > generate my objects from an > .avdl file. > > I've verified that the Avro file is generated correctly and can use the > avro-to-json function in the Avro tools jar to read the file back. > > *The error I'm receiving is : * > > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to org.lab41.cyprus.domain.NetworkRecord > at > org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45) > at > org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\ > > * > * > *The driver class is as follows : * > > public class RollupAvroFiles extends Configured implements Tool{ > > @Override > public int run(String[] args) throws Exception { > Configuration conf = getConf(); > String[] otherArgs = new GenericOptionsParser(conf, > args).getRemainingArgs(); > > String input, output; > if (otherArgs.length == 2) { > input = otherArgs[0]; > output = otherArgs[1]; > } else { > return 1; > } > > /** configure Job **/ > Job job = new Job(conf, "RollupAvroFiles"); > job.setJarByClass(RollupAvroFiles.class); > job.setUserClassesTakesPrecedence(true); > job.setNumReduceTasks(1); > > FileInputFormat.setInputPaths(job, new Path(input)); > job.setInputFormatClass(AvroKeyInputFormat.class); > AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$); > > > job.setMapperClass(RollupAvroFilesMapper.class); > AvroJob.setMapOutputKeySchema(job, > Schema.create(Schema.Type.LONG)); > AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$); > > job.setMapOutputKeyClass(AvroKey.class); > job.setMapOutputValueClass(AvroValue.class); > > job.setReducerClass(RollupAvroFilesReducer.class); > AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$); > > job.setOutputFormatClass(AvroKeyOutputFormat.class); > FileOutputFormat.setOutputPath(job, new Path(output)); > > return job.waitForCompletion(true) ? 0 : 1; > } > > public static void main(String[] args) throws Exception { > int exitCode = ToolRunner.run(new RollupAvroFiles(), args); > System.exit(exitCode); > } > > } > > *And the Mapper class reads : * > > public class RollupAvroFilesMapper > extends Mapper<AvroKey<NetworkRecord>, NullWritable, > AvroKey<Long>, AvroValue<NetworkRecord>> { > > > > @Override > protected void setup(Context context) throws IOException, > InterruptedException { > } > > @Override > protected void map(AvroKey<NetworkRecord> key, NullWritable value, > Context context) > throws IOException, InterruptedException { > NetworkRecord = key.datum(); > …. > } > } > > Any thoughts would be appreicated
|
|