Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Pig >> mail # user >> Pig with CombinedFileInputFormat & CombineFileRecordReader (Not working Pig 0.8)


Copy link to this message
-
Re: Pig with CombinedFileInputFormat & CombineFileRecordReader (Not working Pig 0.8)
Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its
thing.
On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[EMAIL PROTECTED]> wrote:

> I tried this
> http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files
>
> Test Job  Details
> Input 7 Files * 51MB each
>
> HDFS Counters of Job
> Counter Map Reduce Total
> FileSystemCounters FILE_BYTES_READ 92 23 115
> HDFS_BYTES_READ 360,235,092 0 360,235,092
> FILE_BYTES_WRITTEN 116,558 116,349 232,907
> HDFS_BYTES_WRITTEN 0 9 9
>
> From Job Conf
> pig.maxCombinedSplitSize 205847916
> pig.splitCombination true
> set mapred.max.split.size 205847916 (With/Without)
>
> Map Task displays these logs from CombileSlpit
> Total Split Length:360233853
> Total Split Paths Length:7
>
> With 360MB of input data, and above conf there should have been two splits.
> However there was only one split and all 7 files were read from single map
> task
>
>
> From my loader prepareRecodReader does not use the PigSplit
>
>        @Override
> public void prepareToRead(RecordReader arg0, PigSplit arg1) throws
> IOException {
>  reader = (CombineFileRecordReader) (arg0);
> }
>
> Any suggestions ?
>
>
> On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[EMAIL PROTECTED]>
> wrote:
>
> > I am facing a issue of large number of small sized files (51MB each). A
> > typical M/R job in my working environment would take ateast 2000 files,
> > which is resulting in 2000 map tasks. Hence i thought of using
> > CombineFileInputFormat to reduce the problem. I had written a custom
> > implementation of CombineFileInputFormat along
> > with CombineFileRecordReader. Below is skeleton of the code.
> >
> >
> > public class CustomInputFormat extends
> CombineFileInputFormat<IntWritable,
> > RecordHolder> {
> > @Override
> > public RecordReader<IntWritable, RecordHolder> createRecordReader(final
> > InputSplit split,
> >  final TaskAttemptContext taskContext) throws IOException {
> > final CombineFileSplit fSplit = (CombineFileSplit) split;
> >  return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
> > taskContext,
> > (Class) CustomRecordReader.class);
> >  }
> >
> > public static class CustomRecordReader extends RecordReader<IntWritable,
> > RecordHolder> {
> >                 private IntWritable key = null;
> > private RecordHolder value = null;
> > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
> > context, Integer idx)
> >  throws IOException {
> > FileSplit fileSplit = new FileSplit(split.getPath(idx),
> > split.getOffset(idx), split.getLength(idx),
> >  split.getLocations());
> > Path path = fileSplit.getPath();
> > }
> >
> > @Override
> > public void close() throws IOException {
> > }
> >
> > @Override
> > public IntWritable getCurrentKey() throws IOException,
> > InterruptedException {
> >  return key;
> > }
> >
> > @Override
> > public RecordHolder getCurrentValue() throws IOException,
> > InterruptedException {
> >  return value;
> > }
> >
> > @Override
> >  public float getProgress() throws IOException, InterruptedException {
> > return 0;
> > }
> >
> > @Override
> > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
> > IOException, InterruptedException {
> >
> > }
> >
> > @Override
> > public boolean nextKeyValue() throws IOException, InterruptedException {
> >  boolean dataPresent = false;
> > if (currentIterator.hasNext()) {
> > Object nextRecord = currentIterator.next();
> >  key = new IntWritable(recordPosition++);
> > value = new RecordHolder(nextObject);
> > dataPresent = true;
> >  } else {
> > LOGGER.info("Reached end of File:" + dataPresent);
> > }
> >  return dataPresent;
> > }
> > }
> > }
> >
> > Within setLocation() of Loader i specified the following configurations
> > job.getConfiguration().set("mapred.max.split.size", "205847916");
> > job.getConfiguration().set("mapreduce.job.max.split.locations", "5");
> >
> > I used above inputform in my custom Pig Loader. I ran a pig script to
> load
> > data using custom loader and dump it.