Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> Cross join/product in Map/Reduce


Copy link to this message
-
Cross join/product in Map/Reduce
Hi,
I am using the following code to generate cross product in hadoop.

package com.example.hadoopexamples.joinnew;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinMapper extends Mapper<LongWritable, Text, Text,
NullWritable> {
private List<String> inputWords;
private String secondFilePath ;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
secondFilePath = context.getConfiguration().get("secondFilePath");
inputWords = new ArrayList<String>();

}

@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
List<String> inputWordList = getWords(value.toString());
inputWords.addAll(inputWordList);
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fsDataInputStream = fs.open(new Path(secondFilePath));
BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream));

String line;
while((line= bufferedReader.readLine())!=null)
{
System.out.println("inside while");
List<String> words = getWords(line);
for(String word : words)
{
System.out.println("inside first loop");

for(String inputWord : inputWords)
{
if(!inputWord.equals(word))
{
Text pair = new Text(word+","+inputWord);
context.write(pair, NullWritable.get());
}
}
}
}
}

private List<String> getWords(String inputLine)
{
List<String> words = new ArrayList<String>();
StringTokenizer stringTokenizer = new StringTokenizer(inputLine.toString());
while(stringTokenizer.hasMoreTokens())
{
String token = stringTokenizer.nextToken();
words.add(token);
}

return words;

}
}

*Driver class*
*
*
package com.example.hadoopexamples.joinnew;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JoinTester
{
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException
{
Configuration configuration = new Configuration();
configuration.set("secondFilePath", args[1]);
Job job=new Job(configuration);
job.setMapperClass(JoinMapper.class);
job.setJarByClass(JoinTester.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(Reducer.class);
//job.setOutputValueGroupingComparator(FirstComparator.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
 }

}

It uses HDFS streaming of second data file .I got this idea from this
thread
http://search-hadoop.com/m/FNqzV1DrOEp/cross+product&subj=Re+Cross+Join. Is
this is a best practice or there is better way of doing cross product in
Hadoop?

--
https://github.com/zinnia-phatak-dev/Nectar
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB