Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> XmlInputFormat Hadoop -Mapreduce


Copy link to this message
-
XmlInputFormat Hadoop -Mapreduce
Hi,

I have attached the code. Please verify.

Please suggest . I am using hadoop 0.20 version.
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
//import org.apache.hadoop.mapreduce.lib.input.XmlInputFormat;

public class ParserDriverMain {

public static void main(String[] args) {
try {
runJob(args[0], args[1]);

} catch (IOException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
}

}

//The code is mostly self explanatory. You need to define the starting and
ending tag of to split a record from the xml file and it can be defined in
the following lines

//conf.set("xmlinput.start", "<startingTag>");
//conf.set("xmlinput.end", "</endingTag>");
public static void runJob(String input,String output ) throws IOException {

Configuration conf = new Configuration();

conf.set("xmlinput.start", "<Employee>");
conf.set("xmlinput.end", "</Employee>");
conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");

Job job = new Job(conf, "jobName");

input="/user/hduser/Ran/";
output="/user/task/Sales/";
FileInputFormat.setInputPaths(job, input);
job.setJarByClass(ParserDriverMain.class);
job.setMapperClass(MyParserMapper.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(XmlInputFormatNew.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}
try {

job.waitForCompletion(true);

} catch (InterruptedException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ParserDriverMain.class.getName()).log(Level.SEVERE, null,
ex);
}

}

}

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;
import java.io.Reader;
import java.io.StringReader;

/**
 *
 * @author root
 */
public class MyParserMapper extends Mapper<LongWritable, Text,
NullWritable, Text> {

    @Override
    public void map(LongWritable key, Text value1,Context context)throws
IOException, InterruptedException {

                    String xmlString = value1.toString();
             System.out.println("xmlString===="+xmlString);
                     SAXBuilder builder = new SAXBuilder();
                    Reader in = new StringReader(xmlString);
                String value="";
                    try {

                        Document doc = builder.build(in);
                        Element root = doc.getRootElement();

                        //String tag1
=root.getChild("tag").getChild("tag1").getTextTrim() ;

                       // String tag2
=root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim();
                         valueroot.getChild("id").getChild("ename").getChild("dept").getChild("sal").getChild("location").getTextTrim();
                             context.write(NullWritable.get(), new
Text(value));
                            } catch (JDOMException ex) {

Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null,
ex);
                       } catch (IOException ex) {

Logger.getLogger(MyParserMapper.class.getName()).log(Level.SEVERE, null,
ex);
                    }

                }

            }
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormatNew extends  TextInputFormat {

  public static final String START_TAG_KEY = "<Employee>";
  public static final String END_TAG_KEY = "</Employee>";

    @Override
    public RecordReader<LongWritable,Text> createRecordReader(InputSplit
is, TaskAttemptContext tac)  {

        return new XmlRecordReader();

    }
  public static class XmlRecordReader extends
RecordReader<LongWritable,Text> {
    private  byte[] startTag;
    private  byte[] endTag;
    private  long start;
    private  long end;
    private  FSDataInputStream fsin;
    private  DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

           @Override
        public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedE
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB