|
|
-
Re: Creating custom input split.lSteve Lewis 2011-04-09, 16:20
This is a custom splitter for extracting XML Tags from XML documents - to
use is subclass as follows. It assumes the document is pretty printed with the start and end tags on separate lines. You may use this as an example public class MyTagInputFormat extends XMLTagInputFormat { public MyTagInputFormat() { super("MyTag"); } } package org.systemsbiology.hadoop; /** * User: steven * Date: 3/7/11 */ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.util.*; import java.io.*; /** * org.systemsbiology.xtandem.hadoop.MzXMLInputFormat * Splitter that reads scan tags from a MzXML file which is * nice enough to put the begin and end tags on separate lines */ public class XMLTagInputFormat extends FileInputFormat<Text, Text> { public static final XMLTagInputFormat[] EMPTY_ARRAY = {}; private final String m_BaseTag; private final String m_StartTag; private final String m_EndTag; public XMLTagInputFormat(final String pBaseTag) { m_BaseTag = pBaseTag; m_StartTag = "<" + pBaseTag; m_EndTag = "</" + pBaseTag + ">"; } public String getStartTag() { return m_StartTag; } public String getBaseTag() { return m_BaseTag; } public String getEndTag() { return m_EndTag; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { return new MyWholeFileReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { return true; } /** * Custom RecordReader which returns the entire file as a * single value with the name as a key * Value is the entire file * Key is the file name */ public class MyWholeFileReader extends RecordReader<Text, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long end; private long current; private LineReader in; private Text key = null; private Text value = null; private Text buffer = new Text(); public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { in = new LineReader(fileIn, job); } current = start; if (key == null) { key = new Text(); } key.set(split.getPath().getName()); if (value == null) { value = new Text(); } } /** * look for a <scan tag then read until it closes * @return true if there is data * @throws java.io.IOException */ public boolean nextKeyValue() throws IOException { int newSize = 0; StringBuilder sb = new StringBuilder(); newSize = in.readLine(buffer); String str = null; while (newSize > 0) { str = buffer.toString(); if(str.contains(getStartTag())) break; newSize = in.readLine(buffer); } if(newSize == 0) { key = null; value = null; return false; } while (newSize > 0) { str = buffer.toString(); sb.append(str); sb.append("\n"); if(str.contains(getEndTag())) break; newSize = in.readLine(buffer); } String s = sb.toString(); value.set(s); if (sb.length() == 0) { key = null; value = null; return false; } else { return true; } } @Override public Text getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() { return ((float)current - start)/ (start - end) ; } public synchronized void close() throws IOException { if (in != null) { in.close(); } } } } On Sat, Apr 9, 2011 at 1:18 AM, Harsh J <[EMAIL PROTECTED]> wrote: Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com |