|
|
-
Re: Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-21, 07:12
Hello Madhu,
I really appreciate your efforts. I am sorry I did not respond back.Actually I was struggling with it, so had anything to let you know.Many thanks. Regards, Mohammad Tariq On Thu, Jun 21, 2012 at 12:37 PM, madhu phatak <[EMAIL PROTECTED]> wrote: > Hi, > Jira for the new API code > https://issues.apache.org/jira/browse/HADOOP-8521 > > > On Tue, Jun 19, 2012 at 6:11 PM, madhu phatak <[EMAIL PROTECTED]> wrote: >> >> Hi, >> Yes you have the class, but it's for old API. >> >> Please find the code below for ported classes for new API. I have not >> tested the code,try to use these classes and let me know if its working for >> you. >> >> >> StreamInputFormat (new API) >> >> package org.apache.hadoop.streaming; >> import java.io.IOException; >> import java.lang.reflect.Constructor; >> >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.FSDataInputStream; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.Reporter; >> import org.apache.hadoop.mapreduce.InputSplit; >> import org.apache.hadoop.mapreduce.RecordReader; >> import org.apache.hadoop.mapreduce.TaskAttemptContext; >> import org.apache.hadoop.mapreduce.lib.input.FileSplit; >> import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; >> import org.apache.hadoop.streaming.StreamUtil; >> >> /** >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> >> >> >> /** An input format that selects a RecordReader based on a JobConf >> property. >> * This should be used only for non-standard record reader such as >> * StreamXmlRecordReader. For all other standard >> * record readers, the appropriate input format classes should be used. >> */ >> public class StreamInputFormat extends KeyValueTextInputFormat { >> >> @Override >> public RecordReader<Text, Text> createRecordReader(InputSplit >> genericSplit, >> TaskAttemptContext context) throws IOException { >> >> Configuration conf = context.getConfiguration(); >> String c = conf.get("stream.recordreader.class"); >> if (c == null || c.indexOf("LineRecordReader") >= 0) { >> return super.createRecordReader(genericSplit, context); >> } >> >> // handling non-standard record reader (likely StreamXmlRecordReader) >> FileSplit split = (FileSplit) genericSplit; >> //LOG.info("getRecordReader start.....split=" + split); >> context.setStatus(split.toString()); >> context.progress(); >> >> // Open the file and seek to the start of the split >> FileSystem fs = split.getPath().getFileSystem(conf); >> FSDataInputStream in = fs.open(split.getPath()); >> >> // Factory dispatch based on available params.. >> Class readerClass; >> >> { >> readerClass = StreamUtil.goodClassOrNull(conf, c, null); >> if (readerClass == null) { >> throw new RuntimeException("Class not found: " + c); >> } >> } >> Constructor ctor; >> try { >> ctor = readerClass.getConstructor(new Class[] { >> FSDataInputStream.class, >> FileSplit.class, >> TaskAttemptContext.class, Configuration.class, FileSystem.class }); >> } catch (NoSuchMethodException nsm) { |