Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HDFS, mail # user - Re: Writing data to hbase from reducer


Copy link to this message
-
Re: Writing data to hbase from reducer
jamal sasha 2013-08-28, 17:57
Eh
So here is my attempt:
But its not working :(

Exception in thread "main"
org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
at
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:128)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:889)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:500)
at org.rdf.RdfFormatter.run(RdfFormatter.java:142)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.rdf.RdfFormatter.main(RdfFormatter.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

package org.random_scripts;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
//import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class RdfFormatter extends Configured implements Tool {
private static final String OUTPUT_TABLE = "edges";
public static class RDFMapper extends Mapper<LongWritable, Text, Text,
Text>{
 private static String edge = "likes";
  protected void map(LongWritable key, Text value , Context context) throws
IOException, InterruptedException{
 String token = "";
if (value.toString().contains(edge) == true){
token = "@EDGE";
}
else token= "@VERTEX";
String[] chunks = value.toString().split("\\s+");
Text valued = new Text(token + " " +chunks[1]+ " " + chunks[2]);
context.write(new Text(chunks[0]), valued);
}
}

 public static class RDFReducer extends TableReducer< Text, Text, Text> {

        /**
         * The reduce method fill the TestCars table with all csv data,
         * compute some counters and save those counters into the
TestBrandsSizes table.
         * So we use two different HBase table as output for the reduce
method.
         */
private static String edge = "http://www.franz.com/simple#has-category";
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context
context) throws IOException, InterruptedException {
           // Map<String, Integer> statsSizeCounters = new HashMap<String,
Integer>();
            String vertex = key.toString();

            // We are receiving all models,size grouped by brand.
            for (Text value : values) {

                String[] valueSplitted = value.toString().split("@EDGE");
                if (valueSplitted.length == 2) {
                    //String model = valueSplitted[0];
                    String  edgeInfo= valueSplitted[1];
                    String[] edgeChunks = edgeInfo.split("\\s+");

                    // Fill the TestCars table
                    ImmutableBytesWritable putTable = new
ImmutableBytesWritable(Bytes.toBytes("Edges"));
                    byte[] putKey = Bytes.toBytes(vertex);
                    byte[] putFamily = Bytes.toBytes("edge");
                    Put put = new Put(putKey);
                    put.add(putFamily,
Bytes.toBytes(edgeChunks[0]),Bytes.toBytes(edgeChunks[1]));
                   context.write(null,put);
            }
        }
 }
}
  @Override
public int run(String [] args) throws Exception{
//Configuration conf = new Configuration();

       Job job = new Job(getConf());
       //job.setOutputValueClass(Text.class);
       //conf.set("delimiter", "\\s+");

       job.setJarByClass(RdfFormatter.class);

       job.setMapperClass(RDFMapper.class);
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(Text.class);

       job.setInputFormatClass(TextInputFormat.class);
       FileInputFormat.addInputPath(job, new Path(args[0]));

       //job.setReducerClass(RDFReducer.class);
       TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE,
RDFReducer.class, job);

       job.setOutputFormatClass(TextOutputFormat.class);
      // MultipleOutputs.addNamedOutput(job, "vertext",
TextOutputFormat.class, keyClass, valueClass)
       //FileOutputFormat.setOutputPath(job, new Path(args[1]));
       job.submit();
       long start = new Date().getTime();
       job.waitForCompletion(true);
       long end = new Date().getTime();
       System.out.println("Job took " + ((end-s