|
|
valentina kroshilina 2010-01-08, 20:04
I have LongWritable, IncidentWritable key-value pair as output from one job, that I want to read as input in my second job, where IncidentWritable is custom Writable(see code below).
How do I read IncidentWritable in my custom Reader? I don't know how to convert byte[] to IncidentWritable.
Code I use:
public class IncidentWritable implements Writable { ...
public void write(DataOutput out) throws IOException { out.writeInt(getId()); out.writeInt(getStatus());
}
public void readFields(DataInput in) throws IOException { setId(in.readInt()); setStatus(in.readInt());
}
...
}
public class KeyLongWritableValueIncidentWritableInputFormat extends FileInputFormat<LongWritable, IncidentWritable> { protected boolean isSplitable(JobContext context, Path file) { return true; } public org.apache.hadoop.mapred.RecordReader<LongWritable, IncidentWritable> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { return new *KeyLongWritableValueIncidentWr**itableReader*((FileSplit) inputSplit, jobConf); } }
public class KeyLongWritableValueIncidentWritableReader<LongWritable, IncidentWritable> extends RecordReader<LongWritable, IncidentWritable> { ... public synchronized boolean nextKeyValue() throws IOException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) return false; if (key == null) { key = new LongWritable(); } if (value == null) { value = new IncidentWritable(); } int pos = findSeparator(line, 0, lineLen, this.separator); *setKeyValue(key, value, line, lineLen, pos);* return true; }
public static void *setKeyValue*(LongWritable key, IncidentWritable value, byte[] line, int lineLen, int pos) { if (pos == -1) { Text tmp = new Text(); tmp.set(line, 0, lineLen); *key = new LongWritable(Long.parseLong(tmp.toString()));* value = new IncidentWritable(); } else {
Text tmp = new Text(); tmp.set(line, 0, pos); key = new LongWritable(Long.parseLong(tmp.toString())); tmp.set(line, pos + 1, lineLen - pos - 1); *value = //no idea how to deserialize here* } } }
Thanks, Valentina
Jeff Zhang 2010-01-09, 20:06
Hi valentine,
I am not sure what's your first job's OutputFormat. But I suggest you use SequenceFileOutputFormat which will write to SequenceFile as the intermediate data store format.
Here's my example: (MyType is a custom writable type)
@Override public int run(String[] args) throws Exception { JobConf job=new JobConf(MyReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MyType.class); job.setOutputFormat(SequenceFileOutputFormat.class); JobClient.runJob(job); JobConf job2=new JobConf(MyReducer.class); FileInputFormat.setInputPaths(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); job2.setInputFormat(SequenceFileInputFormat.class);
job2.setMapperClass(IdentityMapper.class); job2.setNumReduceTasks(0); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(MyType.class); JobClient.runJob(job2); return 1; } static class MyType implements Writable{
private LongWritable first; private Text text; public MyType(){ first=new LongWritable(); text=new Text(); } public MyType(LongWritable first,Text text){ this.first=first; this.text=text; } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); text.readFields(in); }
@Override public void write(DataOutput out) throws IOException { first.write(out); text.write(out); } }
-----Original Message----- From: valentina kroshilina [mailto:[EMAIL PROTECTED]] Sent: 2010年1月8日 12:05 To: [EMAIL PROTECTED] Subject: custom InputFormat
I have LongWritable, IncidentWritable key-value pair as output from one job, that I want to read as input in my second job, where IncidentWritable is custom Writable(see code below).
How do I read IncidentWritable in my custom Reader? I don't know how to convert byte[] to IncidentWritable.
Code I use:
public class IncidentWritable implements Writable { ...
public void write(DataOutput out) throws IOException { out.writeInt(getId()); out.writeInt(getStatus());
}
public void readFields(DataInput in) throws IOException { setId(in.readInt()); setStatus(in.readInt());
}
...
}
public class KeyLongWritableValueIncidentWritableInputFormat extends FileInputFormat<LongWritable, IncidentWritable> { protected boolean isSplitable(JobContext context, Path file) { return true; } public org.apache.hadoop.mapred.RecordReader<LongWritable, IncidentWritable> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { return new *KeyLongWritableValueIncidentWr**itableReader*((FileSplit) inputSplit, jobConf); } }
public class KeyLongWritableValueIncidentWritableReader<LongWritable, IncidentWritable> extends RecordReader<LongWritable, IncidentWritable> { ... public synchronized boolean nextKeyValue() throws IOException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) return false; if (key == null) { key = new LongWritable(); } if (value == null) { value = new IncidentWritable(); } int pos = findSeparator(line, 0, lineLen, this.separator); *setKeyValue(key, value, line, lineLen, pos);* return true; }
public static void *setKeyValue*(LongWritable key, IncidentWritable value, byte[] line, int lineLen, int pos) { if (pos == -1) { Text tmp = new Text(); tmp.set(line, 0, lineLen); *key = new LongWritable(Long.parseLong(tmp.toString()));* value = new IncidentWritable(); } else {
Text tmp = new Text(); tmp.set(line, 0, pos); key = new LongWritable(Long.parseLong(tmp.toString())); tmp.set(line, pos + 1, lineLen - pos - 1); *value = //no idea how to deserialize here* } } }
Thanks, Valentina
valentina kroshilina 2010-01-12, 17:33
Hello Jeff,
Your suggestion solved it. Thx a lot.
Valentina
2010/1/9 Jeff Zhang <[EMAIL PROTECTED]>
> Hi valentine, > > I am not sure what's your first job's OutputFormat. But I suggest you use > SequenceFileOutputFormat which will write to SequenceFile as the > intermediate data store format. > > Here's my example: (MyType is a custom writable type) > > @Override > public int run(String[] args) throws Exception { > JobConf job=new JobConf(MyReducer.class); > > FileInputFormat.setInputPaths(job, new Path(args[0])); > FileOutputFormat.setOutputPath(job, new Path(args[1])); > > job.setMapperClass(MyMapper.class); > job.setReducerClass(MyReducer.class); > job.setMapOutputKeyClass(Text.class); > job.setMapOutputValueClass(Text.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(MyType.class); > job.setOutputFormat(SequenceFileOutputFormat.class); > > JobClient.runJob(job); > > JobConf job2=new JobConf(MyReducer.class); > FileInputFormat.setInputPaths(job2, new Path(args[1])); > FileOutputFormat.setOutputPath(job2, new Path(args[2])); > > job2.setInputFormat(SequenceFileInputFormat.class); > > job2.setMapperClass(IdentityMapper.class); > job2.setNumReduceTasks(0); > job2.setMapOutputKeyClass(Text.class); > job2.setMapOutputValueClass(MyType.class); > > JobClient.runJob(job2); > return 1; > } > > > static class MyType implements Writable{ > > private LongWritable first; > > private Text text; > > public MyType(){ > first=new LongWritable(); > text=new Text(); > } > > public MyType(LongWritable first,Text text){ > this.first=first; > this.text=text; > } > > @Override > public void readFields(DataInput in) throws IOException { > first.readFields(in); > text.readFields(in); > } > > @Override > public void write(DataOutput out) throws IOException { > first.write(out); > text.write(out); > } > > } > > > > -----Original Message----- > From: valentina kroshilina [mailto:[EMAIL PROTECTED]] > Sent: 2010年1月8日 12:05 > To: [EMAIL PROTECTED] > Subject: custom InputFormat > > I have LongWritable, IncidentWritable key-value pair as output from one > job, that I want to read as input in my second job, where IncidentWritable > is custom Writable(see code below). > > How do I read IncidentWritable in my custom Reader? I don't know how to > convert byte[] to IncidentWritable. > > Code I use: > > public class IncidentWritable implements Writable > { > ... > > public void write(DataOutput out) throws IOException > { > out.writeInt(getId()); > out.writeInt(getStatus()); > > } > > public void readFields(DataInput in) throws IOException > { > setId(in.readInt()); > setStatus(in.readInt()); > > } > > ... > > } > > public class KeyLongWritableValueIncidentWritableInputFormat extends > FileInputFormat<LongWritable, IncidentWritable> > { > protected boolean isSplitable(JobContext context, Path file) { > return true; > } > public org.apache.hadoop.mapred.RecordReader<LongWritable, > IncidentWritable> getRecordReader(org.apache.hadoop.mapred.InputSplit > inputSplit, JobConf jobConf, Reporter reporter) throws IOException > { > return new > *KeyLongWritableValueIncidentWr**itableReader*((FileSplit) > inputSplit, jobConf); > } > } > > public class KeyLongWritableValueIncidentWritableReader<LongWritable, valentina kroshilina
|
|