Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
HDFS >> mail # user >> Reducer not getting called


Copy link to this message
-
Reducer not getting called
Hi,

I have a SequenceFile which contains several jpeg images with (image name, image bytes) as key-value pairs. My objective is to count the no. of images by grouping them by the source, something like this :

Nikon Coolpix  100
Sony Cybershot 251
N82 100
The MR code is :

package com.hadoop.basics;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.drew.imaging.ImageMetadataReader;
import com.drew.imaging.ImageProcessingException;
import com.drew.metadata.Directory;
import com.drew.metadata.Metadata;
import com.drew.metadata.exif.ExifIFD0Directory;

public class ImageSummary extends Configured implements Tool {

            public static class ImageSourceMapper extends
                                    Mapper<Text, BytesWritable, Text, IntWritable> {

                        private static int tagId = 272;
                        private static final IntWritable one = new IntWritable(1);

                        public void map(Text imageName, BytesWritable imageBytes,
                                                Context context) throws IOException, InterruptedException {
                                    // TODO Auto-generated method stub

                                    System.out.println("In the map method, image is "
                                                            + imageName.toString());

                                    byte[] imageInBytes = imageBytes.getBytes();
                                    ByteArrayInputStream bais = new ByteArrayInputStream(imageInBytes);
                                    BufferedInputStream bis = new BufferedInputStream(bais);

                                    Metadata imageMD = null;

                                    try {
                                                imageMD = ImageMetadataReader.readMetadata(bis, true);
                                    } catch (ImageProcessingException e) {
                                                // TODO Auto-generated catch block
                                                System.out.println("Got an ImageProcessingException !");
                                                e.printStackTrace();
                                    }

                                    Directory exifIFD0Directory = imageMD
                                                            .getDirectory(ExifIFD0Directory.class);

                                    String imageSource = exifIFD0Directory.getString(tagId);

                                    System.out.println(imageName.toString() + " is taken using "
                                                            + imageSource);

                                    context.write(new Text(imageSource), one);

                                    System.out.println("Returning from the map method");
                        }
            }

            public static class ImageSourceReducer extends
                                    Reducer<Text, IntWritable, Text, IntWritable> {

                        public void reduce(Text imageSource, Iterator<IntWritable> counts,
                                                Context context) throws IOException, InterruptedException {
                                    // TODO Auto-generated method stub

                                    System.out.println("In the reduce method");

                                    int finalCount = 0;

                                    while (counts.hasNext()) {
                                                finalCount += counts.next().get();
                                    }

                                    context.write(imageSource, new IntWritable(finalCount));

                                    System.out.println("Returning from the reduce method");
                        }

            }

            public static void main(String[] args) throws Exception {
                        ToolRunner.run(new ImageSummary(), args);
            }

            @Override
            public int run(String[] args) throws Exception {
                        // TODO Auto-generated method stub

                        System.out.println("In ImageSummary.run(...)");

                        Configuration configuration = getConf();

                        Job job = new Job(configuration, "Image_Source");
                        job.setJarByClass(getClass());

                        job.setInputFormatClass(SequenceFileInputFormat.class);
                        job.setOutputFormatClass(TextOutputFormat.class);

                        job.setMapperClass(ImageSourceMapper.class);
                        job.setCombinerClass(ImageSourceReducer.class);
                        job.setReducerClass(ImageSourceReducer.class);

                        job.setMapOutputKeyClass(Text.class);
                        job.setMapOutputValueClass(IntWritable.class);

                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(IntWritable.class);

                        SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
                        TextOutputFormat.setOutputPath(job, new Path(args[1]));

                        System.out.println("Submitting job");

                        jo
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB