Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> Cross join/product in Map/Reduce


Copy link to this message
-
Cross join/product in Map/Reduce
Hi,
I am using the following code to generate cross product in hadoop.

package com.example.hadoopexamples.joinnew;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinMapper extends Mapper<LongWritable, Text, Text,
NullWritable> {
private List<String> inputWords;
private String secondFilePath ;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
secondFilePath = context.getConfiguration().get("secondFilePath");
inputWords = new ArrayList<String>();

}

@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
List<String> inputWordList = getWords(value.toString());
inputWords.addAll(inputWordList);
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fsDataInputStream = fs.open(new Path(secondFilePath));
BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(fsDataInputStream));

String line;
while((line= bufferedReader.readLine())!=null)
{
System.out.println("inside while");
List<String> words = getWords(line);
for(String word : words)
{
System.out.println("inside first loop");

for(String inputWord : inputWords)
{
if(!inputWord.equals(word))
{
Text pair = new Text(word+","+inputWord);
context.write(pair, NullWritable.get());
}
}
}
}
}

private List<String> getWords(String inputLine)
{
List<String> words = new ArrayList<String>();
StringTokenizer stringTokenizer = new StringTokenizer(inputLine.toString());
while(stringTokenizer.hasMoreTokens())
{
String token = stringTokenizer.nextToken();
words.add(token);
}

return words;

}
}

*Driver class*
*
*
package com.example.hadoopexamples.joinnew;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JoinTester
{
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException
{
Configuration configuration = new Configuration();
configuration.set("secondFilePath", args[1]);
Job job=new Job(configuration);
job.setMapperClass(JoinMapper.class);
job.setJarByClass(JoinTester.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(Reducer.class);
//job.setOutputValueGroupingComparator(FirstComparator.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
 }

}

It uses HDFS streaming of second data file .I got this idea from this
thread
http://search-hadoop.com/m/FNqzV1DrOEp/cross+product&subj=Re+Cross+Join. Is
this is a best practice or there is better way of doing cross product in
Hadoop?

--
https://github.com/zinnia-phatak-dev/Nectar