Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> Creating custom input split.l


Copy link to this message
-
Re: Creating custom input split.l
This is a custom splitter for extracting XML Tags from XML documents - to
use is subclass as follows. It assumes the document
is pretty printed with the start and end tags on separate lines.  You may
use this as an example

public class MyTagInputFormat extends XMLTagInputFormat
{
     public MyTagInputFormat() {
              super("MyTag");
     }
}

package org.systemsbiology.hadoop;

/**
 * User: steven
 * Date: 3/7/11
 */

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;

import java.io.*;

/**
 * org.systemsbiology.xtandem.hadoop.MzXMLInputFormat
 * Splitter that reads scan tags from a MzXML file which is
 * nice enough to put the begin and end tags on separate lines
 */
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
    public static final XMLTagInputFormat[] EMPTY_ARRAY = {};

    private final String m_BaseTag;
    private final String m_StartTag;
    private final String m_EndTag;

    public XMLTagInputFormat(final String pBaseTag) {
        m_BaseTag = pBaseTag;
        m_StartTag = "<"  + pBaseTag;
        m_EndTag = "</"  + pBaseTag + ">";

    }

    public String getStartTag() {
        return m_StartTag;
    }

    public String getBaseTag() {
        return m_BaseTag;
    }

    public String getEndTag() {
        return m_EndTag;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
        return new MyWholeFileReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return true;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single value with the name as a key
     * Value is the entire file
     * Key is the file name
     */
    public   class MyWholeFileReader extends RecordReader<Text, Text> {

        private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long end;
        private long current;
              private LineReader in;
        private Text key = null;
        private Text value = null;
        private Text buffer = new Text();

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws
IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            start = split.getStart();
            end = start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            if (codec != null) {
                in = new LineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            }
            else {
                in = new LineReader(fileIn, job);
            }
            current = start;
            if (key == null) {
                key = new Text();
            }
            key.set(split.getPath().getName());
            if (value == null) {
                value = new Text();
            }

        }

        /**
         * look for a <scan tag then read until it closes
         * @return   true if there is data
         * @throws java.io.IOException
         */
        public boolean nextKeyValue() throws IOException {
            int newSize = 0;
            StringBuilder sb = new StringBuilder();
            newSize = in.readLine(buffer);
            String str = null;
            while (newSize > 0) {
                 str = buffer.toString();
                 if(str.contains(getStartTag()))
                     break;
                newSize = in.readLine(buffer);
             }
            if(newSize == 0)   {
                key = null;
                 value = null;
                 return false;

            }
             while (newSize > 0) {
                str = buffer.toString();
                 sb.append(str);
                 sb.append("\n");
                 if(str.contains(getEndTag()))
                     break;
                 newSize = in.readLine(buffer);
             }

            String s = sb.toString();
            value.set(s);

            if (sb.length() == 0) {
                key = null;
                value = null;
                return false;
            }
            else {
                return true;
            }
        }

        @Override
        public Text getCurrentKey() {
            return key;
        }

        @Override
        public Text getCurrentValue() {
            return value;
        }

        /**
         * Get the progress within the split
         */
        public float getProgress() {
            return ((float)current - start)/ (start - end) ;
        }

        public synchronized void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }
    }
}
On Sat, Apr 9, 2011 at 1:18 AM, Harsh J <[EMAIL PROTECTED]> wrote:
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com