|
Mohammad Tariq
2012-06-18, 21:19
madhu phatak
2012-06-19, 10:58
Mohammad Tariq
2012-06-19, 11:05
Mohammad Tariq
2012-06-19, 11:19
madhu phatak
2012-06-19, 12:13
Mohammad Tariq
2012-06-19, 12:24
Mohammad Tariq
2012-06-19, 12:28
madhu phatak
2012-06-19, 12:41
madhu phatak
2012-06-21, 07:07
Mohammad Tariq
2012-06-21, 07:12
|
-
Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-18, 21:19
Hello list,
Could anyone, who has written MapReduce jobs to process xml documents stored in there cluster using "StreamXmlRecordReader" share his/her experience??...or if you can provide me some pointers addressing that..Many thanks. Regards, Mohammad Tariq +
Mohammad Tariq 2012-06-18, 21:19
-
Re: Processing xml documents using StreamXmlRecordReadermadhu phatak 2012-06-19, 10:58
Hi,
Set the following properties in driver class jobConf.set("stream.recordreader.class", "org.apache.hadoop.streaming.StreamXmlRecordReader"); jobConf.set("stream.recordreader.begin", "start-tag"); jobConf.set("stream.recordreader.end", "end-tag"); jobConf.setInputFormat(StreamInputFormat,class); In Mapper, xml record will come as key of type Text,so your mapper will look like public class MyMapper<K,V> implements Mapper<Text,Text,K,V> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: > Hello list, > > Could anyone, who has written MapReduce jobs to process xml > documents stored in there cluster using "StreamXmlRecordReader" share > his/her experience??...or if you can provide me some pointers > addressing that..Many thanks. > > Regards, > Mohammad Tariq > -- https://github.com/zinnia-phatak-dev/Nectar +
madhu phatak 2012-06-19, 10:58
-
Re: Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-19, 11:05
Hello Madhu,
Thanks for the response. Actually I was trying to use the new API (Job). Have you tried that. I was not able to set the InputFormat using the Job API. Regards, Mohammad Tariq On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak <[EMAIL PROTECTED]> wrote: > Hi, > Set the following properties in driver class > > jobConf.set("stream.recordreader.class", > "org.apache.hadoop.streaming.StreamXmlRecordReader"); > jobConf.set("stream.recordreader.begin", > "start-tag"); > jobConf.set("stream.recordreader.end", > "end-tag"); > jobConf.setInputFormat(StreamInputFormat,class); > > In Mapper, xml record will come as key of type Text,so your mapper will > look like > > public class MyMapper<K,V> implements Mapper<Text,Text,K,V> > > > On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: >> >> Hello list, >> >> Could anyone, who has written MapReduce jobs to process xml >> documents stored in there cluster using "StreamXmlRecordReader" share >> his/her experience??...or if you can provide me some pointers >> addressing that..Many thanks. >> >> Regards, >> Mohammad Tariq > > > > > -- > https://github.com/zinnia-phatak-dev/Nectar > +
Mohammad Tariq 2012-06-19, 11:05
-
Re: Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-19, 11:19
My driver function looks like this -
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = new Job(); conf.set("stream.recordreader.class", "org.apache.hadoop.streaming.StreamXmlRecordReader"); conf.set("stream.recordreader.begin", "<info>"); conf.set("stream.recordreader.end", "</info>"); job.setInputFormatClass(StreamInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/mapin/demo.xml")); FileOutputFormat.setOutputPath(job, new Path("/mapout/demo")); job.waitForCompletion(true); } Could you please out my mistake?? Regards, Mohammad Tariq On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: > Hello Madhu, > > Thanks for the response. Actually I was trying to use the > new API (Job). Have you tried that. I was not able to set the > InputFormat using the Job API. > > Regards, > Mohammad Tariq > > > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak <[EMAIL PROTECTED]> wrote: >> Hi, >> Set the following properties in driver class >> >> jobConf.set("stream.recordreader.class", >> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >> jobConf.set("stream.recordreader.begin", >> "start-tag"); >> jobConf.set("stream.recordreader.end", >> "end-tag"); >> jobConf.setInputFormat(StreamInputFormat,class); >> >> In Mapper, xml record will come as key of type Text,so your mapper will >> look like >> >> public class MyMapper<K,V> implements Mapper<Text,Text,K,V> >> >> >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: >>> >>> Hello list, >>> >>> Could anyone, who has written MapReduce jobs to process xml >>> documents stored in there cluster using "StreamXmlRecordReader" share >>> his/her experience??...or if you can provide me some pointers >>> addressing that..Many thanks. >>> >>> Regards, >>> Mohammad Tariq >> >> >> >> >> -- >> https://github.com/zinnia-phatak-dev/Nectar >> +
Mohammad Tariq 2012-06-19, 11:19
-
Re: Processing xml documents using StreamXmlRecordReadermadhu phatak 2012-06-19, 12:13
Seems like StreamInputFormat not yet ported to new API.That's why you are
not able to set as InputFormatClass. You can file a jira for this issue. On Tue, Jun 19, 2012 at 4:49 PM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: > My driver function looks like this - > > public static void main(String[] args) throws IOException, > InterruptedException, ClassNotFoundException { > // TODO Auto-generated method stub > > Configuration conf = new Configuration(); > Job job = new Job(); > conf.set("stream.recordreader.class", > "org.apache.hadoop.streaming.StreamXmlRecordReader"); > conf.set("stream.recordreader.begin", "<info>"); > conf.set("stream.recordreader.end", "</info>"); > job.setInputFormatClass(StreamInputFormat.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(IntWritable.class); > FileInputFormat.addInputPath(job, new > Path("/mapin/demo.xml")); > FileOutputFormat.setOutputPath(job, new > Path("/mapout/demo")); > job.waitForCompletion(true); > } > > Could you please out my mistake?? > > Regards, > Mohammad Tariq > > > On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <[EMAIL PROTECTED]> > wrote: > > Hello Madhu, > > > > Thanks for the response. Actually I was trying to use the > > new API (Job). Have you tried that. I was not able to set the > > InputFormat using the Job API. > > > > Regards, > > Mohammad Tariq > > > > > > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak <[EMAIL PROTECTED]> > wrote: > >> Hi, > >> Set the following properties in driver class > >> > >> jobConf.set("stream.recordreader.class", > >> "org.apache.hadoop.streaming.StreamXmlRecordReader"); > >> jobConf.set("stream.recordreader.begin", > >> "start-tag"); > >> jobConf.set("stream.recordreader.end", > >> "end-tag"); > >> jobConf.setInputFormat(StreamInputFormat,class); > >> > >> In Mapper, xml record will come as key of type Text,so your mapper will > >> look like > >> > >> public class MyMapper<K,V> implements Mapper<Text,Text,K,V> > >> > >> > >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <[EMAIL PROTECTED]> > wrote: > >>> > >>> Hello list, > >>> > >>> Could anyone, who has written MapReduce jobs to process xml > >>> documents stored in there cluster using "StreamXmlRecordReader" share > >>> his/her experience??...or if you can provide me some pointers > >>> addressing that..Many thanks. > >>> > >>> Regards, > >>> Mohammad Tariq > >> > >> > >> > >> > >> -- > >> https://github.com/zinnia-phatak-dev/Nectar > >> > -- https://github.com/zinnia-phatak-dev/Nectar +
madhu phatak 2012-06-19, 12:13
-
Re: Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-19, 12:24
Thanks Madhu. I'll do that.
Regards, Mohammad Tariq On Tue, Jun 19, 2012 at 5:43 PM, madhu phatak <[EMAIL PROTECTED]> wrote: > Seems like StreamInputFormat not yet ported to new API.That's why you are > not able to set as InputFormatClass. You can file a jira for this issue. > > > On Tue, Jun 19, 2012 at 4:49 PM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: >> >> My driver function looks like this - >> >> public static void main(String[] args) throws IOException, >> InterruptedException, ClassNotFoundException { >> // TODO Auto-generated method stub >> >> Configuration conf = new Configuration(); >> Job job = new Job(); >> conf.set("stream.recordreader.class", >> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >> conf.set("stream.recordreader.begin", "<info>"); >> conf.set("stream.recordreader.end", "</info>"); >> job.setInputFormatClass(StreamInputFormat.class); >> job.setOutputKeyClass(Text.class); >> job.setOutputValueClass(IntWritable.class); >> FileInputFormat.addInputPath(job, new >> Path("/mapin/demo.xml")); >> FileOutputFormat.setOutputPath(job, new >> Path("/mapout/demo")); >> job.waitForCompletion(true); >> } >> >> Could you please out my mistake?? >> >> Regards, >> Mohammad Tariq >> >> >> On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <[EMAIL PROTECTED]> >> wrote: >> > Hello Madhu, >> > >> > Thanks for the response. Actually I was trying to use the >> > new API (Job). Have you tried that. I was not able to set the >> > InputFormat using the Job API. >> > >> > Regards, >> > Mohammad Tariq >> > >> > >> > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak <[EMAIL PROTECTED]> >> > wrote: >> >> Hi, >> >> Set the following properties in driver class >> >> >> >> jobConf.set("stream.recordreader.class", >> >> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >> >> jobConf.set("stream.recordreader.begin", >> >> "start-tag"); >> >> jobConf.set("stream.recordreader.end", >> >> "end-tag"); >> >> >> >> jobConf.setInputFormat(StreamInputFormat,class); >> >> >> >> In Mapper, xml record will come as key of type Text,so your mapper >> >> will >> >> look like >> >> >> >> public class MyMapper<K,V> implements Mapper<Text,Text,K,V> >> >> >> >> >> >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <[EMAIL PROTECTED]> >> >> wrote: >> >>> >> >>> Hello list, >> >>> >> >>> Could anyone, who has written MapReduce jobs to process xml >> >>> documents stored in there cluster using "StreamXmlRecordReader" share >> >>> his/her experience??...or if you can provide me some pointers >> >>> addressing that..Many thanks. >> >>> >> >>> Regards, >> >>> Mohammad Tariq >> >> >> >> >> >> >> >> >> >> -- >> >> https://github.com/zinnia-phatak-dev/Nectar >> >> > > > > > -- > https://github.com/zinnia-phatak-dev/Nectar > +
Mohammad Tariq 2012-06-19, 12:24
-
Re: Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-19, 12:28
But I have downloaded "hadoop-streaming-0.20.205.0.jar" and it
contains StreamXmlRecordReader.class file. This means it should support StreamInputFormat. Regards, Mohammad Tariq On Tue, Jun 19, 2012 at 5:54 PM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: > Thanks Madhu. I'll do that. > > Regards, > Mohammad Tariq > > > On Tue, Jun 19, 2012 at 5:43 PM, madhu phatak <[EMAIL PROTECTED]> wrote: >> Seems like StreamInputFormat not yet ported to new API.That's why you are >> not able to set as InputFormatClass. You can file a jira for this issue. >> >> >> On Tue, Jun 19, 2012 at 4:49 PM, Mohammad Tariq <[EMAIL PROTECTED]> wrote: >>> >>> My driver function looks like this - >>> >>> public static void main(String[] args) throws IOException, >>> InterruptedException, ClassNotFoundException { >>> // TODO Auto-generated method stub >>> >>> Configuration conf = new Configuration(); >>> Job job = new Job(); >>> conf.set("stream.recordreader.class", >>> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >>> conf.set("stream.recordreader.begin", "<info>"); >>> conf.set("stream.recordreader.end", "</info>"); >>> job.setInputFormatClass(StreamInputFormat.class); >>> job.setOutputKeyClass(Text.class); >>> job.setOutputValueClass(IntWritable.class); >>> FileInputFormat.addInputPath(job, new >>> Path("/mapin/demo.xml")); >>> FileOutputFormat.setOutputPath(job, new >>> Path("/mapout/demo")); >>> job.waitForCompletion(true); >>> } >>> >>> Could you please out my mistake?? >>> >>> Regards, >>> Mohammad Tariq >>> >>> >>> On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <[EMAIL PROTECTED]> >>> wrote: >>> > Hello Madhu, >>> > >>> > Thanks for the response. Actually I was trying to use the >>> > new API (Job). Have you tried that. I was not able to set the >>> > InputFormat using the Job API. >>> > >>> > Regards, >>> > Mohammad Tariq >>> > >>> > >>> > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak <[EMAIL PROTECTED]> >>> > wrote: >>> >> Hi, >>> >> Set the following properties in driver class >>> >> >>> >> jobConf.set("stream.recordreader.class", >>> >> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >>> >> jobConf.set("stream.recordreader.begin", >>> >> "start-tag"); >>> >> jobConf.set("stream.recordreader.end", >>> >> "end-tag"); >>> >> >>> >> jobConf.setInputFormat(StreamInputFormat,class); >>> >> >>> >> In Mapper, xml record will come as key of type Text,so your mapper >>> >> will >>> >> look like >>> >> >>> >> public class MyMapper<K,V> implements Mapper<Text,Text,K,V> >>> >> >>> >> >>> >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <[EMAIL PROTECTED]> >>> >> wrote: >>> >>> >>> >>> Hello list, >>> >>> >>> >>> Could anyone, who has written MapReduce jobs to process xml >>> >>> documents stored in there cluster using "StreamXmlRecordReader" share >>> >>> his/her experience??...or if you can provide me some pointers >>> >>> addressing that..Many thanks. >>> >>> >>> >>> Regards, >>> >>> Mohammad Tariq >>> >> >>> >> >>> >> >>> >> >>> >> -- >>> >> https://github.com/zinnia-phatak-dev/Nectar >>> >> >> >> >> >> >> -- >> https://github.com/zinnia-phatak-dev/Nectar >> +
Mohammad Tariq 2012-06-19, 12:28
-
Re: Processing xml documents using StreamXmlRecordReadermadhu phatak 2012-06-19, 12:41
Hi,
Yes you have the class, but it's for old API. Please find the code below for ported classes for new API. I have not tested the code,try to use these classes and let me know if its working for you. StreamInputFormat (new API) package org.apache.hadoop.streaming; import java.io.IOException; import java.lang.reflect.Constructor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.streaming.StreamUtil; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** An input format that selects a RecordReader based on a JobConf property. * This should be used only for non-standard record reader such as * StreamXmlRecordReader. For all other standard * record readers, the appropriate input format classes should be used. */ public class StreamInputFormat extends KeyValueTextInputFormat { @Override public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException { Configuration conf = context.getConfiguration(); String c = conf.get("stream.recordreader.class"); if (c == null || c.indexOf("LineRecordReader") >= 0) { return super.createRecordReader(genericSplit, context); } // handling non-standard record reader (likely StreamXmlRecordReader) FileSplit split = (FileSplit) genericSplit; //LOG.info("getRecordReader start.....split=" + split); context.setStatus(split.toString()); context.progress(); // Open the file and seek to the start of the split FileSystem fs = split.getPath().getFileSystem(conf); FSDataInputStream in = fs.open(split.getPath()); // Factory dispatch based on available params.. Class readerClass; { readerClass = StreamUtil.goodClassOrNull(conf, c, null); if (readerClass == null) { throw new RuntimeException("Class not found: " + c); } } Constructor ctor; try { ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class, TaskAttemptContext.class, Configuration.class, FileSystem.class }); } catch (NoSuchMethodException nsm) { throw new RuntimeException(nsm); } RecordReader<Text, Text> reader; try { reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { in, split, context, conf, fs }); } catch (Exception nsm) { throw new RuntimeException(nsm); } return reader; } } StreamXmlRecordReader /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.streaming; import java.io.*; import java.util.regex.*; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** A way to interpret XML fragments as Mapper input records. * Values are XML subtrees delimited by configurable tags. * Keys could be the value of a certain attribute in the XML subtree, * but this is left to the stream processor application. * * The name-value properties that StreamXmlRecordReader understands are: * String begin (chars marking beginning of record) * String end (chars marking end of record) * int maxrec (maximum record size) * int lookahead(maximum lookahead to sync CDATA) * boolean slowmatch */ public class StreamXmlRecordReader extends StreamBaseRecordReader { public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, TaskAttemptContext context, Configuration conf, FileSystem fs) throws IOException { super(in, split, context, conf, fs); beginMark_ = checkJobGet(CONF_NS + "begin"); endMark_ = checkJobGet(CONF_NS + "end"); +
madhu phatak 2012-06-19, 12:41
-
Re: Processing xml documents using StreamXmlRecordReadermadhu phatak 2012-06-21, 07:07
Hi,
Jira for the new API code https://issues.apache.org/jira/browse/HADOOP-8521 On Tue, Jun 19, 2012 at 6:11 PM, madhu phatak <[EMAIL PROTECTED]> wrote: > Hi, > Yes you have the class, but it's for old API. > > Please find the code below for ported classes for new API. I have not > tested the code,try to use these classes and let me know if its working for > you. > > > StreamInputFormat (new API) > > package org.apache.hadoop.streaming; > import java.io.IOException; > import java.lang.reflect.Constructor; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FSDataInputStream; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.Reporter; > import org.apache.hadoop.mapreduce.InputSplit; > import org.apache.hadoop.mapreduce.RecordReader; > import org.apache.hadoop.mapreduce.TaskAttemptContext; > import org.apache.hadoop.mapreduce.lib.input.FileSplit; > import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; > import org.apache.hadoop.streaming.StreamUtil; > > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > > > > /** An input format that selects a RecordReader based on a JobConf > property. > * This should be used only for non-standard record reader such as > * StreamXmlRecordReader. For all other standard > * record readers, the appropriate input format classes should be used. > */ > public class StreamInputFormat extends KeyValueTextInputFormat { > > @Override > public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, > TaskAttemptContext context) throws IOException { > > Configuration conf = context.getConfiguration(); > String c = conf.get("stream.recordreader.class"); > if (c == null || c.indexOf("LineRecordReader") >= 0) { > return super.createRecordReader(genericSplit, context); > } > > // handling non-standard record reader (likely StreamXmlRecordReader) > FileSplit split = (FileSplit) genericSplit; > //LOG.info("getRecordReader start.....split=" + split); > context.setStatus(split.toString()); > context.progress(); > > // Open the file and seek to the start of the split > FileSystem fs = split.getPath().getFileSystem(conf); > FSDataInputStream in = fs.open(split.getPath()); > > // Factory dispatch based on available params.. > Class readerClass; > > { > readerClass = StreamUtil.goodClassOrNull(conf, c, null); > if (readerClass == null) { > throw new RuntimeException("Class not found: " + c); > } > } > Constructor ctor; > try { > ctor = readerClass.getConstructor(new Class[] { > FSDataInputStream.class, > FileSplit.class, > TaskAttemptContext.class, Configuration.class, FileSystem.class }); > } catch (NoSuchMethodException nsm) { > throw new RuntimeException(nsm); > } > > RecordReader<Text, Text> reader; > try { > reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { > in, split, > context, > conf, fs }); > } catch (Exception nsm) { > throw new RuntimeException(nsm); > } > return reader; https://github.com/zinnia-phatak-dev/Nectar +
madhu phatak 2012-06-21, 07:07
-
Re: Processing xml documents using StreamXmlRecordReaderMohammad Tariq 2012-06-21, 07:12
Hello Madhu,
I really appreciate your efforts. I am sorry I did not respond back.Actually I was struggling with it, so had anything to let you know.Many thanks. Regards, Mohammad Tariq On Thu, Jun 21, 2012 at 12:37 PM, madhu phatak <[EMAIL PROTECTED]> wrote: > Hi, > Jira for the new API code > https://issues.apache.org/jira/browse/HADOOP-8521 > > > On Tue, Jun 19, 2012 at 6:11 PM, madhu phatak <[EMAIL PROTECTED]> wrote: >> >> Hi, >> Yes you have the class, but it's for old API. >> >> Please find the code below for ported classes for new API. I have not >> tested the code,try to use these classes and let me know if its working for >> you. >> >> >> StreamInputFormat (new API) >> >> package org.apache.hadoop.streaming; >> import java.io.IOException; >> import java.lang.reflect.Constructor; >> >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.FSDataInputStream; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.Reporter; >> import org.apache.hadoop.mapreduce.InputSplit; >> import org.apache.hadoop.mapreduce.RecordReader; >> import org.apache.hadoop.mapreduce.TaskAttemptContext; >> import org.apache.hadoop.mapreduce.lib.input.FileSplit; >> import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; >> import org.apache.hadoop.streaming.StreamUtil; >> >> /** >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> >> >> >> /** An input format that selects a RecordReader based on a JobConf >> property. >> * This should be used only for non-standard record reader such as >> * StreamXmlRecordReader. For all other standard >> * record readers, the appropriate input format classes should be used. >> */ >> public class StreamInputFormat extends KeyValueTextInputFormat { >> >> @Override >> public RecordReader<Text, Text> createRecordReader(InputSplit >> genericSplit, >> TaskAttemptContext context) throws IOException { >> >> Configuration conf = context.getConfiguration(); >> String c = conf.get("stream.recordreader.class"); >> if (c == null || c.indexOf("LineRecordReader") >= 0) { >> return super.createRecordReader(genericSplit, context); >> } >> >> // handling non-standard record reader (likely StreamXmlRecordReader) >> FileSplit split = (FileSplit) genericSplit; >> //LOG.info("getRecordReader start.....split=" + split); >> context.setStatus(split.toString()); >> context.progress(); >> >> // Open the file and seek to the start of the split >> FileSystem fs = split.getPath().getFileSystem(conf); >> FSDataInputStream in = fs.open(split.getPath()); >> >> // Factory dispatch based on available params.. >> Class readerClass; >> >> { >> readerClass = StreamUtil.goodClassOrNull(conf, c, null); >> if (readerClass == null) { >> throw new RuntimeException("Class not found: " + c); >> } >> } >> Constructor ctor; >> try { >> ctor = readerClass.getConstructor(new Class[] { >> FSDataInputStream.class, >> FileSplit.class, >> TaskAttemptContext.class, Configuration.class, FileSystem.class }); >> } catch (NoSuchMethodException nsm) { +
Mohammad Tariq 2012-06-21, 07:12
|