Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro, mail # user - Mapper not called


Copy link to this message
-
Re: Mapper not called
Harsh J 2013-08-01, 16:35
I've often found the issue behind such an observance to be that the
input files lack an .avro extension. Is that true in your case? Can
you retry after a rename if yes?

On Wed, Jul 31, 2013 at 1:02 AM, Anna Lahoud <[EMAIL PROTECTED]> wrote:
> I am following directions on
> http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.html
> to write a job that takes Avro files as input and outputs non-Avro files, I
> created the following job. I should note that I have tried different
> variations of ordering the setInput/OutputPath lines, the AvroJob lines, and
> the reduce task settings. It always results the same: the job runs with 0
> mappers and 1 reducer (which gets no data so is essentially an emtpy
> SequenceFile). It always says there are 10 input files so that's not the
> issue. There is an @Override statement on my map and my reduce so that's not
> the issue. And I believe I have correctly followed the Avro input/non-Avro
> output instructions mentioned in the link above. Any other ideas would be
> welcome!!!
>
>
> public class MyAvroJob extends Configured implements Tool {
>
> @Override
> public int run(String[] args) throws Exception {
>
> JobConf job = new JobConf(getConf(), this.getClass());
>
> FileInputFormat.setInputPaths(job, new Path(args[0]));
> FileOutputFormat.setOutputPath(job, new Path(args[1]));
>
> AvroJob.setMapperClass(job, MyAvroMapper.class);
> AvroJob.setInputSchema(job, MySchema.SCHEMA$);
> AvroJob.setMapOutputSchema(job,
> Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING)));
>
> job.setReducerClass(MyNonAvroReducer.class);
> job.setOutputFormat(SequenceFileOutputFormat.class);
> job.setOutputKeyClass(Text.class);
> job.setOutputValueClass(Text.class);
> job.setNumReduceTasks(1);
>
> return JobClient.runJob(job).isSuccessful();
> }
>
> public static class MyAvroMapper extends AvroMapper<MySchema, Pair<String,
> String>> {
>
> @Override
> public void map(MySchema in, AvroCollector<Pair<String, String>> collector,
> Reporter reporter) throws IOException {
>
> List<MyThings> things = in.getRecords();
> ...
> collector.collect(new Pair<String, String>( newKey, newValue));
> }
> }
>
> public static class MyNonAvroReducer extends MapReduceBase implements
> Reducer<AvroKey<String>, AvroValue<String>, Text, Text> {
>
> @Override
> public void reduce(AvroKey<String> key, Iterator<AvroValue<String>> values,
> OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
> while (values.hasNext()) {
>   output.collect(new Text(key.datum()), new Text(values.next().datum()));
> }
> }
> }
>
> public static void main(String[] args) throws Exception {
> ToolRunner.run(new MyAvroJob(), args);
>
> }
>
>
>
>
> -Anna
>
>
>
>
>

--
Harsh J