Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # dev >> how to define new InputFormat with streaming?


Copy link to this message
-
how to define new InputFormat with streaming?
Hi,

     my hadoop version is Hadoop 0.20.2-cdh3u3 and I want to define new InputFormat in hadoop book , but there is error
"class org.apache.hadoop.streaming.WholeFileInputFormat not org.apache.hadoop.mapred.InputFormat"

Hadoop version is 0.20, but the streaming still depend on 0.10 mapred api?

the detail:
*************************************************************************************************************************************************************
javac -classpath /usr/lib/hadoop/hadoop-core-0.20.2-cdh3u3.jar:/usr/lib/hadoop/lib/* -d class7 ./*.java
cd class7
jar uf /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar org/apache/hadoop/streaming/*.class

 hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar -inputformat WholeFileInputFormat -mapper xmlmappertest.py -file xmlmappertest.py -input /user/hdfs/tarcatalog -output /user/hive/external/catalog -jobconf mapred.map.tasks=108
13/03/15 16:27:51 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
Exception in thread "main" java.lang.RuntimeException: class org.apache.hadoop.streaming.WholeFileInputFormat not org.apache.hadoop.mapred.InputFormat
        at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1070)
        at org.apache.hadoop.mapred.JobConf.setInputFormat(JobConf.java:609)
        at org.apache.hadoop.streaming.StreamJob.setJobConf(StreamJob.java:707)
        at org.apache.hadoop.streaming.StreamJob.run(StreamJob.java:122)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:50)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
*****************************************************************the code from hadoop book*******************************************************************

WholeFileInputFormat.java
// cc WholeFileInputFormat An InputFormat for reading a whole file as a record
importjava.io.IOException;
importorg.apache.hadoop.fs.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.JobContext;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.*;
//vv WholeFileInputFormat
publicclassWholeFileInputFormat
    extendsFileInputFormat<NullWritable,BytesWritable>{
  
  @Override
  protectedbooleanisSplitable(JobContextcontext,Pathfile){
    returnfalse;
  }
  @Override
  publicRecordReader<NullWritable,BytesWritable>createRecordReader(
      InputSplitsplit,TaskAttemptContextcontext)throwsIOException,
      InterruptedException{
    WholeFileRecordReaderreader=newWholeFileRecordReader();
    reader.initialize(split,context);
    returnreader;
  }
}
//^^ WholeFileInputFormat
WholeFileRecordReader.java

// cc WholeFileRecordReader The RecordReader used by WholeFileInputFormat for reading a whole file as a record
importjava.io.IOException;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.BytesWritable;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
//vv WholeFileRecordReader
classWholeFileRecordReaderextendsRecordReader<NullWritable,BytesWritable>{
  
  privateFileSplitfileSplit;
  privateConfigurationconf;
  privateBytesWritablevalue=newBytesWritable();
  privatebooleanprocessed=false;
  @Override
  publicvoidinitialize(InputSplitsplit,TaskAttemptContextcontext)
      throwsIOException,InterruptedException{
    this.fileSplit=(FileSplit)split;
    this.conf=context.getConfiguration();
  }
  
  @Override
  publicbooleannextKeyValue()throwsIOException,InterruptedException{
    if(!processed){
      byte[]contents=newbyte[(int)fileSplit.getLength()];
      Pathfile=fileSplit.getPath();
      FileSystemfs=file.getFileSystem(conf);
      FSDataInputStreamin=null;
      try{
        in=fs.open(file);
        IOUtils.readFully(in,contents,0,contents.length);
        value.set(contents,0,contents.length);
      }finally{
        IOUtils.closeStream(in);
      }
      processed=true;
      returntrue;
    }
    returnfalse;
  }
  
  @Override
  publicNullWritablegetCurrentKey()throwsIOException,InterruptedException{
    returnNullWritable.get();
  }
  @Override
  publicBytesWritablegetCurrentValue()throwsIOException,
      InterruptedException{
    returnvalue;
  }
  @Override
  publicfloatgetProgress()throwsIOException{
    returnprocessed?1.0f:0.0f;
  }
  @Override
  publicvoidclose()throwsIOException{
    // do nothing
  }
}
//^^ WholeFileRecordReader
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB