Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> New to hadoop, trying to write a customary file split


Copy link to this message
-
Re: New to hadoop, trying to write a customary file split
Look at this sample
============================================package org.systemsbiology.hadoop;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;

import java.io.*;
import java.util.*;

/**
 * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
 * Splitter that reads scan tags from an XML file
 * No assumption is made about lines but tage and end tags MUST look like
<MyTag </MyTag> with no embedded spaces
 * usually you will subclass and hard code the tag you want to split on
 */
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
    public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
    private static final double SPLIT_SLOP = 1.1;   // 10% slop
    public static final int BUFFER_SIZE = 4096;

    private final String m_BaseTag;
    private final String m_StartTag;
    private final String m_EndTag;
    private String m_Extension;

    public XMLTagInputFormat(final String pBaseTag) {
        m_BaseTag = pBaseTag;
        m_StartTag = "<" + pBaseTag;
        m_EndTag = "</" + pBaseTag + ">";

    }

    public String getExtension() {
        return m_Extension;
    }

    public void setExtension(final String pExtension) {
        m_Extension = pExtension;
    }

    public boolean isSplitReadable(InputSplit split) {
        if (!(split instanceof FileSplit))
            return true;
        FileSplit fsplit = (FileSplit) split;
        Path path1 = fsplit.getPath();
        return isPathAcceptable(path1);
    }

    protected boolean isPathAcceptable(final Path pPath1) {
        String path = pPath1.toString().toLowerCase();
        if(path.startsWith("part-r-"))
            return true;
        String extension = getExtension();
        if (extension != null && path.endsWith(extension.toLowerCase()))
            return true;
        if (extension != null && path.endsWith(extension.toLowerCase() +
".gz"))
            return true;
        if (extension == null )
            return true;
        return false;
    }

    public String getStartTag() {
        return m_StartTag;
    }

    public String getBaseTag() {
        return m_BaseTag;
    }

    public String getEndTag() {
        return m_EndTag;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                                                       TaskAttemptContext
context) {
        if (isSplitReadable(split))
            return new MyXMLFileReader();
        else
            return NullRecordReader.INSTANCE; // do not read
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        String fname = file.getName().toLowerCase();
        if(fname.endsWith(".gz"))
            return false;
        return true;
    }

    /**
     * Generate the list of files and make them into FileSplits.
     * This needs to be copied to insert a filter on acceptable data
     */
    @Override
    public List<InputSplit> getSplits(JobContext job
    ) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        for (FileStatus file : listStatus(job)) {
            Path path = file.getPath();
            if (!isPathAcceptable(path))   // filter acceptable data
                continue;
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            long length = file.getLen();
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
            if ((length != 0) && isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                long splitSize = computeSplitSize(blockSize, minSize,
maxSize);

                long bytesRemaining = length;
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
                    splits.add(new FileSplit(path, length - bytesRemaining,
splitSize,
                            blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
                            blkLocations[blkLocations.length -
1].getHosts()));
                }
            }
            else if (length != 0) {
                splits.add(new FileSplit(path, 0, length,
blkLocations[0].getHosts()));
            }
            else {
                //Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));
            }
        }
    //    LOG.debug("Total # of splits: " + splits.size());
        return splits;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single m_Value with the name as a m_Key
     * Value is the entire file
     * Key is the file name
     */
    public class MyXMLFileReader extends RecordReader<Text, Text> {

        private CompressionCodecFactory compressionCodecs = null;
        private long m_Start;
        private long m_End;
        private long m_Current;
        private BufferedReader m_Input;
        private Text m_Key;
        private Text m_Value = null;
        private char[] m_Buffer = new char[BUFFER_SIZE];
        StringBuilder m_Sb = new StringBuilder();

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws
IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            m_Sb.setLength(0);
            m_Start
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB