Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HDFS >> mail # user >> Reducer not getting called


Copy link to this message
-
Reducer not getting called
Hi,

I have a SequenceFile which contains several jpeg images with (image name, image bytes) as key-value pairs. My objective is to count the no. of images by grouping them by the source, something like this :

Nikon Coolpix  100
Sony Cybershot 251
N82 100
The MR code is :

package com.hadoop.basics;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.drew.imaging.ImageMetadataReader;
import com.drew.imaging.ImageProcessingException;
import com.drew.metadata.Directory;
import com.drew.metadata.Metadata;
import com.drew.metadata.exif.ExifIFD0Directory;

public class ImageSummary extends Configured implements Tool {

            public static class ImageSourceMapper extends
                                    Mapper<Text, BytesWritable, Text, IntWritable> {

                        private static int tagId = 272;
                        private static final IntWritable one = new IntWritable(1);

                        public void map(Text imageName, BytesWritable imageBytes,
                                                Context context) throws IOException, InterruptedException {
                                    // TODO Auto-generated method stub

                                    System.out.println("In the map method, image is "
                                                            + imageName.toString());

                                    byte[] imageInBytes = imageBytes.getBytes();
                                    ByteArrayInputStream bais = new ByteArrayInputStream(imageInBytes);
                                    BufferedInputStream bis = new BufferedInputStream(bais);

                                    Metadata imageMD = null;

                                    try {
                                                imageMD = ImageMetadataReader.readMetadata(bis, true);
                                    } catch (ImageProcessingException e) {
                                                // TODO Auto-generated catch block
                                                System.out.println("Got an ImageProcessingException !");
                                                e.printStackTrace();
                                    }

                                    Directory exifIFD0Directory = imageMD
                                                            .getDirectory(ExifIFD0Directory.class);

                                    String imageSource = exifIFD0Directory.getString(tagId);

                                    System.out.println(imageName.toString() + " is taken using "
                                                            + imageSource);

                                    context.write(new Text(imageSource), one);

                                    System.out.println("Returning from the map method");
                        }
            }

            public static class ImageSourceReducer extends
                                    Reducer<Text, IntWritable, Text, IntWritable> {

                        public void reduce(Text imageSource, Iterator<IntWritable> counts,
                                                Context context) throws IOException, InterruptedException {
                                    // TODO Auto-generated method stub

                                    System.out.println("In the reduce method");

                                    int finalCount = 0;

                                    while (counts.hasNext()) {
                                                finalCount += counts.next().get();
                                    }

                                    context.write(imageSource, new IntWritable(finalCount));

                                    System.out.println("Returning from the reduce method");
                        }

            }

            public static void main(String[] args) throws Exception {
                        ToolRunner.run(new ImageSummary(), args);
            }

            @Override
            public int run(String[] args) throws Exception {
                        // TODO Auto-generated method stub

                        System.out.println("In ImageSummary.run(...)");

                        Configuration configuration = getConf();

                        Job job = new Job(configuration, "Image_Source");
                        job.setJarByClass(getClass());

                        job.setInputFormatClass(SequenceFileInputFormat.class);
                        job.setOutputFormatClass(TextOutputFormat.class);

                        job.setMapperClass(ImageSourceMapper.class);
                        job.setCombinerClass(ImageSourceReducer.class);
                        job.setReducerClass(ImageSourceReducer.class);

                        job.setMapOutputKeyClass(Text.class);
                        job.setMapOutputValueClass(IntWritable.class);

                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(IntWritable.class);

                        SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
                        TextOutputFormat.setOutputPath(job, new Path(args[1]));

                        System.out.println("Submitting job");

                        jo