Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop, mail # dev - how to define new InputFormat with streaming?


Copy link to this message
-
how to define new InputFormat with streaming?
springring 2013-03-15, 09:18
Hi,

     my hadoop version is Hadoop 0.20.2-cdh3u3 and I want to define new InputFormat in hadoop book , but there is error
"class org.apache.hadoop.streaming.WholeFileInputFormat not org.apache.hadoop.mapred.InputFormat"

Hadoop version is 0.20, but the streaming still depend on 0.10 mapred api?

the detail:
*************************************************************************************************************************************************************
javac -classpath /usr/lib/hadoop/hadoop-core-0.20.2-cdh3u3.jar:/usr/lib/hadoop/lib/* -d class7 ./*.java
cd class7
jar uf /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar org/apache/hadoop/streaming/*.class

 hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar -inputformat WholeFileInputFormat -mapper xmlmappertest.py -file xmlmappertest.py -input /user/hdfs/tarcatalog -output /user/hive/external/catalog -jobconf mapred.map.tasks=108
13/03/15 16:27:51 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
Exception in thread "main" java.lang.RuntimeException: class org.apache.hadoop.streaming.WholeFileInputFormat not org.apache.hadoop.mapred.InputFormat
        at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1070)
        at org.apache.hadoop.mapred.JobConf.setInputFormat(JobConf.java:609)
        at org.apache.hadoop.streaming.StreamJob.setJobConf(StreamJob.java:707)
        at org.apache.hadoop.streaming.StreamJob.run(StreamJob.java:122)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:50)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
*****************************************************************the code from hadoop book*******************************************************************

WholeFileInputFormat.java
// cc WholeFileInputFormat An InputFormat for reading a whole file as a record
importjava.io.IOException;
importorg.apache.hadoop.fs.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.JobContext;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.*;
//vv WholeFileInputFormat
publicclassWholeFileInputFormat
    extendsFileInputFormat<NullWritable,BytesWritable>{
  
  @Override
  protectedbooleanisSplitable(JobContextcontext,Pathfile){
    returnfalse;
  }
  @Override
  publicRecordReader<NullWritable,BytesWritable>createRecordReader(
      InputSplitsplit,TaskAttemptContextcontext)throwsIOException,
      InterruptedException{
    WholeFileRecordReaderreader=newWholeFileRecordReader();
    reader.initialize(split,context);
    returnreader;
  }
}
//^^ WholeFileInputFormat
WholeFileRecordReader.java

// cc WholeFileRecordReader The RecordReader used by WholeFileInputFormat for reading a whole file as a record
importjava.io.IOException;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.BytesWritable;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
//vv WholeFileRecordReader
classWholeFileRecordReaderextendsRecordReader<NullWritable,BytesWritable>{
  
  privateFileSplitfileSplit;
  privateConfigurationconf;
  privateBytesWritablevalue=newBytesWritable();
  privatebooleanprocessed=false;
  @Override
  publicvoidinitialize(InputSplitsplit,TaskAttemptContextcontext)
      throwsIOException,InterruptedException{
    this.fileSplit=(FileSplit)split;
    this.conf=context.getConfiguration();
  }
  
  @Override
  publicbooleannextKeyValue()throwsIOException,InterruptedException{
    if(!processed){
      byte[]contents=newbyte[(int)fileSplit.getLength()];
      Pathfile=fileSplit.getPath();
      FileSystemfs=file.getFileSystem(conf);
      FSDataInputStreamin=null;
      try{
        in=fs.open(file);
        IOUtils.readFully(in,contents,0,contents.length);
        value.set(contents,0,contents.length);
      }finally{
        IOUtils.closeStream(in);
      }
      processed=true;
      returntrue;
    }
    returnfalse;
  }
  
  @Override
  publicNullWritablegetCurrentKey()throwsIOException,InterruptedException{
    returnNullWritable.get();
  }
  @Override
  publicBytesWritablegetCurrentValue()throwsIOException,
      InterruptedException{
    returnvalue;
  }
  @Override
  publicfloatgetProgress()throwsIOException{
    returnprocessed?1.0f:0.0f;
  }
  @Override
  publicvoidclose()throwsIOException{
    // do nothing
  }
}
//^^ WholeFileRecordReader