Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - New to hadoop, trying to write a customary file split


Copy link to this message
-
Re: New to hadoop, trying to write a customary file split
Steve Lewis 2011-07-11, 19:11
Look at this sample
============================================package org.systemsbiology.hadoop;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;

import java.io.*;
import java.util.*;

/**
 * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
 * Splitter that reads scan tags from an XML file
 * No assumption is made about lines but tage and end tags MUST look like
<MyTag </MyTag> with no embedded spaces
 * usually you will subclass and hard code the tag you want to split on
 */
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
    public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
    private static final double SPLIT_SLOP = 1.1;   // 10% slop
    public static final int BUFFER_SIZE = 4096;

    private final String m_BaseTag;
    private final String m_StartTag;
    private final String m_EndTag;
    private String m_Extension;

    public XMLTagInputFormat(final String pBaseTag) {
        m_BaseTag = pBaseTag;
        m_StartTag = "<" + pBaseTag;
        m_EndTag = "</" + pBaseTag + ">";

    }

    public String getExtension() {
        return m_Extension;
    }

    public void setExtension(final String pExtension) {
        m_Extension = pExtension;
    }

    public boolean isSplitReadable(InputSplit split) {
        if (!(split instanceof FileSplit))
            return true;
        FileSplit fsplit = (FileSplit) split;
        Path path1 = fsplit.getPath();
        return isPathAcceptable(path1);
    }

    protected boolean isPathAcceptable(final Path pPath1) {
        String path = pPath1.toString().toLowerCase();
        if(path.startsWith("part-r-"))
            return true;
        String extension = getExtension();
        if (extension != null && path.endsWith(extension.toLowerCase()))
            return true;
        if (extension != null && path.endsWith(extension.toLowerCase() +
".gz"))
            return true;
        if (extension == null )
            return true;
        return false;
    }

    public String getStartTag() {
        return m_StartTag;
    }

    public String getBaseTag() {
        return m_BaseTag;
    }

    public String getEndTag() {
        return m_EndTag;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                                                       TaskAttemptContext
context) {
        if (isSplitReadable(split))
            return new MyXMLFileReader();
        else
            return NullRecordReader.INSTANCE; // do not read
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        String fname = file.getName().toLowerCase();
        if(fname.endsWith(".gz"))
            return false;
        return true;
    }

    /**
     * Generate the list of files and make them into FileSplits.
     * This needs to be copied to insert a filter on acceptable data
     */
    @Override
    public List<InputSplit> getSplits(JobContext job
    ) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        for (FileStatus file : listStatus(job)) {
            Path path = file.getPath();
            if (!isPathAcceptable(path))   // filter acceptable data
                continue;
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            long length = file.getLen();
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
            if ((length != 0) && isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                long splitSize = computeSplitSize(blockSize, minSize,
maxSize);

                long bytesRemaining = length;
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
                    splits.add(new FileSplit(path, length - bytesRemaining,
splitSize,
                            blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
                            blkLocations[blkLocations.length -
1].getHosts()));
                }
            }
            else if (length != 0) {
                splits.add(new FileSplit(path, 0, length,
blkLocations[0].getHosts()));
            }
            else {
                //Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));
            }
        }
    //    LOG.debug("Total # of splits: " + splits.size());
        return splits;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single m_Value with the name as a m_Key
     * Value is the entire file
     * Key is the file name
     */
    public class MyXMLFileReader extends RecordReader<Text, Text> {

        private CompressionCodecFactory compressionCodecs = null;
        private long m_Start;
        private long m_End;
        private long m_Current;
        private BufferedReader m_Input;
        private Text m_Key;
        private Text m_Value = null;
        private char[] m_Buffer = new char[BUFFER_SIZE];
        StringBuilder m_Sb = new StringBuilder();

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws
IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            m_Sb.setLength(0);
            m_Start