|
|
-
Re: Processing xml documents using StreamXmlRecordReadermadhu phatak 2012-06-21, 07:07
Hi,
Jira for the new API code https://issues.apache.org/jira/browse/HADOOP-8521 On Tue, Jun 19, 2012 at 6:11 PM, madhu phatak <[EMAIL PROTECTED]> wrote: > Hi, > Yes you have the class, but it's for old API. > > Please find the code below for ported classes for new API. I have not > tested the code,try to use these classes and let me know if its working for > you. > > > StreamInputFormat (new API) > > package org.apache.hadoop.streaming; > import java.io.IOException; > import java.lang.reflect.Constructor; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FSDataInputStream; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.Reporter; > import org.apache.hadoop.mapreduce.InputSplit; > import org.apache.hadoop.mapreduce.RecordReader; > import org.apache.hadoop.mapreduce.TaskAttemptContext; > import org.apache.hadoop.mapreduce.lib.input.FileSplit; > import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; > import org.apache.hadoop.streaming.StreamUtil; > > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > > > > /** An input format that selects a RecordReader based on a JobConf > property. > * This should be used only for non-standard record reader such as > * StreamXmlRecordReader. For all other standard > * record readers, the appropriate input format classes should be used. > */ > public class StreamInputFormat extends KeyValueTextInputFormat { > > @Override > public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, > TaskAttemptContext context) throws IOException { > > Configuration conf = context.getConfiguration(); > String c = conf.get("stream.recordreader.class"); > if (c == null || c.indexOf("LineRecordReader") >= 0) { > return super.createRecordReader(genericSplit, context); > } > > // handling non-standard record reader (likely StreamXmlRecordReader) > FileSplit split = (FileSplit) genericSplit; > //LOG.info("getRecordReader start.....split=" + split); > context.setStatus(split.toString()); > context.progress(); > > // Open the file and seek to the start of the split > FileSystem fs = split.getPath().getFileSystem(conf); > FSDataInputStream in = fs.open(split.getPath()); > > // Factory dispatch based on available params.. > Class readerClass; > > { > readerClass = StreamUtil.goodClassOrNull(conf, c, null); > if (readerClass == null) { > throw new RuntimeException("Class not found: " + c); > } > } > Constructor ctor; > try { > ctor = readerClass.getConstructor(new Class[] { > FSDataInputStream.class, > FileSplit.class, > TaskAttemptContext.class, Configuration.class, FileSystem.class }); > } catch (NoSuchMethodException nsm) { > throw new RuntimeException(nsm); > } > > RecordReader<Text, Text> reader; > try { > reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { > in, split, > context, > conf, fs }); > } catch (Exception nsm) { > throw new RuntimeException(nsm); > } > return reader; https://github.com/zinnia-phatak-dev/Nectar |