Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> custom InputFormat


Copy link to this message
-
RE: custom InputFormat
Hi valentine,

I am not sure what's your first job's OutputFormat. But I suggest you use
SequenceFileOutputFormat which will write to SequenceFile as the
intermediate data store format.

Here's my example:  (MyType is a custom writable type)

@Override
public int run(String[] args) throws Exception {
JobConf job=new JobConf(MyReducer.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MyType.class);
job.setOutputFormat(SequenceFileOutputFormat.class);

JobClient.runJob(job);

JobConf job2=new JobConf(MyReducer.class);
FileInputFormat.setInputPaths(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));

job2.setInputFormat(SequenceFileInputFormat.class);

job2.setMapperClass(IdentityMapper.class);
job2.setNumReduceTasks(0);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(MyType.class);

JobClient.runJob(job2);
return 1;
}
static class MyType implements Writable{

private LongWritable first;

private Text text;

public MyType(){
first=new LongWritable();
text=new Text();
}

public MyType(LongWritable first,Text text){
this.first=first;
this.text=text;
}

@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
text.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
first.write(out);
text.write(out);
}

}

-----Original Message-----
From: valentina kroshilina [mailto:[EMAIL PROTECTED]]
Sent: 2010年1月8日 12:05
To: [EMAIL PROTECTED]
Subject: custom InputFormat

I have  LongWritable, IncidentWritable key-value pair as output from one
job, that I want to read as input in my second job, where IncidentWritable
is custom Writable(see code below).

How do I read IncidentWritable in my custom Reader? I don't know how to
convert byte[] to IncidentWritable.

Code I use:

public class IncidentWritable implements Writable
{
    ...

    public void write(DataOutput out) throws IOException
    {
        out.writeInt(getId());
        out.writeInt(getStatus());

    }

    public void readFields(DataInput in) throws IOException
    {
        setId(in.readInt());
        setStatus(in.readInt());

    }

    ...

}

public class KeyLongWritableValueIncidentWritableInputFormat    extends
FileInputFormat<LongWritable, IncidentWritable>
{
    protected boolean isSplitable(JobContext context, Path file) {
        return true;
    }
    public org.apache.hadoop.mapred.RecordReader<LongWritable,
IncidentWritable> getRecordReader(org.apache.hadoop.mapred.InputSplit
inputSplit, JobConf jobConf, Reporter reporter) throws IOException
    {
        return new
*KeyLongWritableValueIncidentWr**itableReader*((FileSplit)
inputSplit, jobConf);
    }
}

public class KeyLongWritableValueIncidentWritableReader<LongWritable,
IncidentWritable> extends RecordReader<LongWritable, IncidentWritable> {
    ...
    public synchronized boolean nextKeyValue()
       throws IOException {
       byte[] line = null;
       int lineLen = -1;
       if (lineRecordReader.nextKeyValue()) {
         innerValue = lineRecordReader.getCurrentValue();
         line = innerValue.getBytes();
         lineLen = innerValue.getLength();
       } else {
         return false;
       }
       if (line == null)
         return false;
       if (key == null) {
         key = new LongWritable();
       }
       if (value == null) {
         value = new IncidentWritable();
       }
       int pos = findSeparator(line, 0, lineLen, this.separator);
       *setKeyValue(key, value, line, lineLen, pos);*
       return true;
     }

     public static void *setKeyValue*(LongWritable key, IncidentWritable
value, byte[] line,
         int lineLen, int pos) {
         if (pos == -1) {
            Text tmp = new Text();
            tmp.set(line, 0, lineLen);
            *key = new LongWritable(Long.parseLong(tmp.toString()));*
            value = new IncidentWritable();
         } else {

            Text tmp = new Text();
            tmp.set(line, 0, pos);
            key = new LongWritable(Long.parseLong(tmp.toString()));
            tmp.set(line, pos + 1, lineLen - pos - 1);
            *value  =  //no idea how to deserialize here*
         }
     }
}

Thanks,
Valentina