|
|
-
Extending ArrayWritable, Using Combiner and Spill Failed error
vipul sharma 2011-08-18, 20:44
I am working on a large application where I need to pass a big array of intwritables from mapper to reducer. Obviously I want to use combiner and I am extending ArrayWritable. Here is the signature of my mapper, reducer and IntArrayWritable *Mapper:* public class AppMapper extends Mapper<LongWritable, Text, Text, IntArrayWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(outkey, new IntArrayWritable(array_of_intwritables)); } } *Reducer:* public class AppReducer extends Reducer<Text, IntArrayWritable, Text, Text> {
@Override public void reduce(Text key, Iterable<IntArrayWritable> values, Context context) throws IOException, InterruptedException { context.write(key, new Text(some_stuff));
*IntArrayWritable:* public class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class); } public IntArrayWritable(IntWritable[] values) { super(IntWritable.class, values); } } *Here is how my Job looks like:* private Job AppRunner(Path inputPath, Path outputPath, Configuration conf) throws IOException { Job job = new Job(conf);
job.setJobName(String.format("App Job: %s => %s", inputPath, outputPath)); job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntArrayWritable.class);
job.setMapperClass(CommonNeighborsMapper.class); job.setCombinerClass(CommonNeighborsReducer.class); job.setReducerClass(CommonNeighborsReducer.class);
job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(28); return job; }
* Error Trace:
*java.io.IOException: Spill failed at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1070) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1051) at java.io.DataOutputStream.writeInt(DataOutputStream.java:183) at org.apache.hadoop.io.IntWritable.write(IntWritable.java:42) at org.apache.hadoop.io.ArrayWritable.write(ArrayWritable.java:98) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:926) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:574) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at com.test.app.AppMapper.map(AppMapper.java:67) at com.test.app.neighbors.AppMapper.map(AppMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) Caused by: java.io.IOException: wrong value class: class org.apache.hadoop.io.Text is not class com.test.app.mr.writables.IntArrayWritable at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:175) at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1042) at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1363) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at com.test.app.neighbors.AppReducer.reduce(AppReducer.java:54) at com.test.app.neighbors.AppReducer.reduce(AppReducer.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1384) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1291) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:712) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1199)
* *I think the error is due to using combiner. Since combiner is output data in Text and Reducer is expecting IntArrayWritable. If I remove combiner everything works. What am I doing wrong and how can I get the combiner to work? Any help is greatly appreciated. * *-- Vipul Sharma sharmavipul AT gmail DOT com
-
Re: Extending ArrayWritable, Using Combiner and Spill Failed error
John Armstrong 2011-08-18, 20:51
On Thu, 18 Aug 2011 13:44:22 -0700, vipul sharma <[EMAIL PROTECTED]> wrote: > *I think the error is due to using combiner. Since combiner is output data > in Text and Reducer is expecting IntArrayWritable. If I remove combiner > everything works. What am I doing wrong and how can I get the combiner to > work? Any help is greatly appreciated. It's not immediately apparent, but combiners may run "tiered". That is, the mapper output may be run through a bunch of combiners, and THEIR output may be run through another tier of combiners, and so on. Thus the combiner input/output types must be exactly the same. You'll have to write another Combiner class, which handles the combining, and then have your Reducer class do the combining AND the conversion into output values.
In the absence of chainreducers, I'd suggest creating an abstract MyCombinerReducer class which implements the combining algorithm, then derive both the Combiner and Reducer from it so they can both call the same combining code.
-
Re: Extending ArrayWritable, Using Combiner and Spill Failed error
Harsh J 2011-08-19, 04:32
Yep, as John points out, your trouble is related to what your reducer/combiner is emitting, from your snippet below:
On Fri, Aug 19, 2011 at 2:14 AM, vipul sharma <[EMAIL PROTECTED]> wrote: > @Override > public void reduce(Text key, Iterable<IntArrayWritable> values, Context > context) throws IOException, InterruptedException { > context.write(key, new Text(some_stuff));
Since these emits would occur on the map side, you lose the ArrayWritable typing there cause your value has now become Text.
-- Harsh J
-
Re: Extending ArrayWritable, Using Combiner and Spill Failed error
Lance Norskog 2011-08-19, 05:03
Combiners are supposed to emit exactly what they receive. Hadoop should look at what the combiner spits out and blow up.
On Thu, Aug 18, 2011 at 9:32 PM, Harsh J <[EMAIL PROTECTED]> wrote: > Yep, as John points out, your trouble is related to what your > reducer/combiner is emitting, from your snippet below: > > On Fri, Aug 19, 2011 at 2:14 AM, vipul sharma <[EMAIL PROTECTED]> wrote: >> @Override >> public void reduce(Text key, Iterable<IntArrayWritable> values, Context >> context) throws IOException, InterruptedException { >> context.write(key, new Text(some_stuff)); > > Since these emits would occur on the map side, you lose the > ArrayWritable typing there cause your value has now become Text. > > -- > Harsh J >
-- Lance Norskog [EMAIL PROTECTED]
|
|