Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Avro >> mail # user >> Mapper not called


Copy link to this message
-
Mapper not called
I am following directions on
http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.htmlto
write a job that takes Avro files as input and outputs non-Avro files,
I
created the following job. I should note that I have tried different
variations of ordering the setInput/OutputPath lines, the AvroJob lines,
and the reduce task settings. It always results the same: the job runs with
0 mappers and 1 reducer (which gets no data so is essentially an emtpy
SequenceFile). It always says there are 10 input files so that's not the
issue. There is an @Override statement on my map and my reduce so that's
not the issue. And I believe I have correctly followed the Avro
input/non-Avro output instructions mentioned in the link above. Any other
ideas would be welcome!!!
public class MyAvroJob extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

JobConf job = new JobConf(getConf(), this.getClass());

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

AvroJob.setMapperClass(job, MyAvroMapper.class);
AvroJob.setInputSchema(job, MySchema.SCHEMA$);
AvroJob.setMapOutputSchema(job,
Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING)));

job.setReducerClass(MyNonAvroReducer.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);

return JobClient.runJob(job).isSuccessful();
}

public static class MyAvroMapper extends AvroMapper<MySchema, Pair<String,
String>> {

@Override
public void map(MySchema in, AvroCollector<Pair<String, String>> collector,
Reporter reporter) throws IOException {

List<MyThings> things = in.getRecords();
...
collector.collect(new Pair<String, String>( newKey, newValue));
}
}

public static class MyNonAvroReducer extends MapReduceBase implements
Reducer<AvroKey<String>, AvroValue<String>, Text, Text> {

@Override
public void reduce(AvroKey<String> key, Iterator<AvroValue<String>> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
  output.collect(new Text(key.datum()), new Text(values.next().datum()));
}
}
}

public static void main(String[] args) throws Exception {
ToolRunner.run(new MyAvroJob(), args);

}
-Anna
+
Harsh J 2013-08-01, 16:35
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB