Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - XmlInputFormat Hadoop -Mapreduce


Copy link to this message
-
XmlInputFormat Hadoop -Mapreduce
Ranjini Rathinam 2013-12-17, 12:12
Hi,

I have attached the code. Please verify.

Please suggest . I am using hadoop 0.20 version.
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
//import org.apache.hadoop.mapreduce.lib.input.XmlInputFormat;

public class ParserDriverMain {

public static void main(String[] args) {
try {
runJob(args[0], args[1]);

} catch (IOException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
}

}

//The code is mostly self explanatory. You need to define the starting and
ending tag of to split a record from the xml file and it can be defined in
the following lines

//conf.set("xmlinput.start", "<startingTag>");
//conf.set("xmlinput.end", "</endingTag>");
public static void runJob(String input,String output ) throws IOException {

Configuration conf = new Configuration();

conf.set("xmlinput.start", "<Employee>");
conf.set("xmlinput.end", "</Employee>");
conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");

Job job = new Job(conf, "jobName");

input="/user/hduser/Ran/";
output="/user/task/Sales/";
FileInputFormat.setInputPaths(job, input);
job.setJarByClass(ParserDriverMain.class);
job.setMapperClass(MyParserMapper.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(XmlInputFormatNew.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
try {

job.waitForCompletion(true);

} catch (InterruptedException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
}

}

}

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;
import java.io.Reader;
import java.io.StringReader;

/**
 *
 * @author root
 */
public class MyParserMapper extends Mapper<LongWritable, Text,
NullWritable, Text> {

    @Override
    public void map(LongWritable key, Text value1,Context context)throws
IOException, InterruptedException {

                    String xmlString = value1.toString();
             System.out.println("xmlString===="+xmlString);
                     SAXBuilder builder = new SAXBuilder();
                    Reader in = new StringReader(xmlString);
                String value="";
                    try {

                        Document doc = builder.build(in);
                        Element root = doc.getRootElement();

                        //String tag1
=root.getChild("tag").getChild("tag1").getTextTrim() ;

                       // String tag2
=root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim();
                         valueroot.getChild("id").getChild("ename").getChild("dept").getChild("sal").getChild("location").getTextTrim();
                             context.write(NullWritable.get(), new
Text(value));
                            } catch (JDOMException ex) {

Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null,
ex);
                       } catch (IOException ex) {

Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null,
ex);
                    }

                }

            }
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormatNew extends  TextInputFormat {

  public static final String START_TAG_KEY = "<Employee>";
  public static final String END_TAG_KEY = "</Employee>";

    @Override
    public RecordReader<LongWritable,Text> createRecordReader(InputSplit
is, TaskAttemptContext tac)  {

        return new XmlRecordReader();

    }
  public static class XmlRecordReader extends
RecordReader<LongWritable,Text> {
    private  byte[] startTag;
    private  byte[] endTag;
    private  long start;
    private  long end;
    private  FSDataInputStream fsin;
    private  DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

           @Override
        public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedE