|
|
+
Mohammad Tariq 2012-06-18, 21:19
+
madhu phatak 2012-06-19, 10:58
+
Mohammad Tariq 2012-06-19, 11:05
+
Mohammad Tariq 2012-06-19, 11:19
+
madhu phatak 2012-06-19, 12:13
+
Mohammad Tariq 2012-06-19, 12:24
+
Mohammad Tariq 2012-06-19, 12:28
-
Re: Processing xml documents using StreamXmlRecordReadermadhu phatak 2012-06-19, 12:41
Hi,
Yes you have the class, but it's for old API. Please find the code below for ported classes for new API. I have not tested the code,try to use these classes and let me know if its working for you. StreamInputFormat (new API) package org.apache.hadoop.streaming; import java.io.IOException; import java.lang.reflect.Constructor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.streaming.StreamUtil; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** An input format that selects a RecordReader based on a JobConf property. * This should be used only for non-standard record reader such as * StreamXmlRecordReader. For all other standard * record readers, the appropriate input format classes should be used. */ public class StreamInputFormat extends KeyValueTextInputFormat { @Override public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); String c = conf.get("stream.recordreader.class"); if (c == null || c.indexOf("LineRecordReader") >= 0) { return super.createRecordReader(genericSplit, context); } // handling non-standard record reader (likely StreamXmlRecordReader) FileSplit split = (FileSplit) genericSplit; //LOG.info("getRecordReader start.....split=" + split); context.setStatus(split.toString()); context.progress(); // Open the file and seek to the start of the split FileSystem fs = split.getPath().getFileSystem(conf); FSDataInputStream in = fs.open(split.getPath()); // Factory dispatch based on available params.. Class readerClass; { readerClass = StreamUtil.goodClassOrNull(conf, c, null); if (readerClass == null) { throw new RuntimeException("Class not found: " + c); } } Constructor ctor; try { ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class, TaskAttemptContext.class, Configuration.class, FileSystem.class }); } catch (NoSuchMethodException nsm) { throw new RuntimeException(nsm); } RecordReader<Text, Text> reader; try { reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { in, split, context, conf, fs }); } catch (Exception nsm) { throw new RuntimeException(nsm); } return reader; } } StreamXmlRecordReader /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.streaming; import java.io.*; import java.util.regex.*; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** A way to interpret XML fragments as Mapper input records. * Values are XML subtrees delimited by configurable tags. * Keys could be the value of a certain attribute in the XML subtree, * but this is left to the stream processor application. * * The name-value properties that StreamXmlRecordReader understands are: * String begin (chars marking beginning of record) * String end (chars marking end of record) * int maxrec (maximum record size) * int lookahead(maximum lookahead to sync CDATA) * boolean slowmatch */ public class StreamXmlRecordReader extends StreamBaseRecordReader { public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, TaskAttemptContext context, Configuration conf, FileSystem fs) throws IOException { super(in, split, context, conf, fs); beginMark_ = checkJobGet(CONF_NS + "begin"); endMark_ = checkJobGet(CONF_NS + "end"); +
madhu phatak 2012-06-21, 07:07
+
Mohammad Tariq 2012-06-21, 07:12
|