Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # dev >> Understanding the MapOutput


Copy link to this message
-
Re: Understanding the MapOutput
Hi Pedro,

It sounds like you're on the right track, but I don't really have time
to help much beyond pointing you in the right direction. Time to put
on your debugging hat :) Maybe do some testing with a small job like
"sleep -mt 1 -rt 1 -m 1 -r 1" - a sleep job with 1 mapper and 1
reducer. If I recall correctly it generates a single map output
record... otherwise you could do a "sort" of 1 line of text. Then you
can easily add debug output to diagnose what your issue is.

-Todd

On Fri, Nov 4, 2011 at 11:10 AM, Pedro Costa <[EMAIL PROTECTED]> wrote:
> I've looked to the MapOutputServlet class. But the problem is the following:
>
> MapOutput can be compressed or not. When I'm talking about uncompressed
> mapoutput, using the index mechanism of the MapOutputServlet, it works for
> me. The map tasks generates digests for each partition, and it match with
> the digests produce by the reduce.
>
> Let me explain what I've updated in the code of MR at my own version. A map
> task (MT) is producing a digest for each partition of data generated. So,
> if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and
> Hash2.
>
> Now, when a reduce task (RT) fetch the map output, it will generate another
> digest using the index mechanism of the MapOutputServlet and compares with
> the respective digest generated by the map task.
>
> As you can see in my explanation, when I'm talking about uncompressed map
> output, the index mechanism is really useful.
>
> But I've also tried to do the same with compressed map output. And it
> doesn't work. That's the reason that I'm trying now with the IFile.Reader
> class.
>
> As you can see, I'm in a big dilemma and  I don't know what to do.
>
> I will show you my code. This 2 methods are trying to generate digests from
> the map and the reduce side. At the end, they give different results, and I
> don't know why. These 2 methods  are my first tentative to generate digests
> from compressed map output
>
>
> [code]
> // this method is trying to generate a digest from the compressed map
> output on the map side.
> public synchronized String generateHash(FileSystem fs, Path filename,
> Decompressor decompressor, int offset, int mapOutputLength) {
>  LOG.debug("Opening file2: " + filename);
>
> MessageDigest md = null;
>  String digest = null;
> DecompressorStream decodec = null;
> FSDataInputStream input = null;
>
> try {
> input = fs.open(filename);
> decodec = new DecompressorStream(input, decompressor);
>  md = MessageDigest.getInstance("SHA-1");
> System.out.println("ABC");
>  byte[] buffer;
> int size;
> while (mapOutputLength > 0) {
>  // the case that the bytes read is small the the default size.
> // We don't want that the message digest contains trash.
>  size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024);
> System.out.println("mapOutputLength: " + mapOutputLength + " Size: " +
> size);
>
> if(size == 0)
> break;
>
> buffer = new byte[size];
>  size = decodec.read(buffer, offset, size);
> System.out.println("read: " + size  + "\ndata: " + new String(buffer));
>  mapOutputLength -= size;
>
> if(size > 0)
> md.update(buffer);
>  else
> if(size == -1)
> break;
>  }
> System.out.println("DFG");
> digest = hashIt(md);
>  } catch (NoSuchAlgorithmException e) {
> //TODO Auto-generated catch block
>  e.printStackTrace();
> } catch (IOException e) {
> // TODO Auto-generated catch block
>  e.printStackTrace();
> } finally {
> if(input!= null)
>  try {
> input.close();
> } catch (IOException e) {
>  // TODO Auto-generated catch block
> e.printStackTrace();
> }
>  }
>
> return digest;
> }
> [/code]
>
>
>
> [code]
> // this method is trying to generate the digest from the map output
> compressed sent by the reduce
> public synchronized String generateHash(byte[] data, Decompressor
> decompressor, int offset, int mapOutputLength) {
>  MessageDigest md = null;
> String digest = null;
> DecompressorStream decodec = null;
>  ByteArrayInputStream bis = null;
> try {
> bis = new ByteArrayInputStream(data);

Todd Lipcon
Software Engineer, Cloudera
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB