Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # dev >> Understanding the MapOutput


Copy link to this message
-
Re: Understanding the MapOutput
Hi Pedro,

It sounds like you're on the right track, but I don't really have time
to help much beyond pointing you in the right direction. Time to put
on your debugging hat :) Maybe do some testing with a small job like
"sleep -mt 1 -rt 1 -m 1 -r 1" - a sleep job with 1 mapper and 1
reducer. If I recall correctly it generates a single map output
record... otherwise you could do a "sort" of 1 line of text. Then you
can easily add debug output to diagnose what your issue is.

-Todd

On Fri, Nov 4, 2011 at 11:10 AM, Pedro Costa <[EMAIL PROTECTED]> wrote:
> I've looked to the MapOutputServlet class. But the problem is the following:
>
> MapOutput can be compressed or not. When I'm talking about uncompressed
> mapoutput, using the index mechanism of the MapOutputServlet, it works for
> me. The map tasks generates digests for each partition, and it match with
> the digests produce by the reduce.
>
> Let me explain what I've updated in the code of MR at my own version. A map
> task (MT) is producing a digest for each partition of data generated. So,
> if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and
> Hash2.
>
> Now, when a reduce task (RT) fetch the map output, it will generate another
> digest using the index mechanism of the MapOutputServlet and compares with
> the respective digest generated by the map task.
>
> As you can see in my explanation, when I'm talking about uncompressed map
> output, the index mechanism is really useful.
>
> But I've also tried to do the same with compressed map output. And it
> doesn't work. That's the reason that I'm trying now with the IFile.Reader
> class.
>
> As you can see, I'm in a big dilemma and  I don't know what to do.
>
> I will show you my code. This 2 methods are trying to generate digests from
> the map and the reduce side. At the end, they give different results, and I
> don't know why. These 2 methods  are my first tentative to generate digests
> from compressed map output
>
>
> [code]
> // this method is trying to generate a digest from the compressed map
> output on the map side.
> public synchronized String generateHash(FileSystem fs, Path filename,
> Decompressor decompressor, int offset, int mapOutputLength) {
>  LOG.debug("Opening file2: " + filename);
>
> MessageDigest md = null;
>  String digest = null;
> DecompressorStream decodec = null;
> FSDataInputStream input = null;
>
> try {
> input = fs.open(filename);
> decodec = new DecompressorStream(input, decompressor);
>  md = MessageDigest.getInstance("SHA-1");
> System.out.println("ABC");
>  byte[] buffer;
> int size;
> while (mapOutputLength > 0) {
>  // the case that the bytes read is small the the default size.
> // We don't want that the message digest contains trash.
>  size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024);
> System.out.println("mapOutputLength: " + mapOutputLength + " Size: " +
> size);
>
> if(size == 0)
> break;
>
> buffer = new byte[size];
>  size = decodec.read(buffer, offset, size);
> System.out.println("read: " + size  + "\ndata: " + new String(buffer));
>  mapOutputLength -= size;
>
> if(size > 0)
> md.update(buffer);
>  else
> if(size == -1)
> break;
>  }
> System.out.println("DFG");
> digest = hashIt(md);
>  } catch (NoSuchAlgorithmException e) {
> //TODO Auto-generated catch block
>  e.printStackTrace();
> } catch (IOException e) {
> // TODO Auto-generated catch block
>  e.printStackTrace();
> } finally {
> if(input!= null)
>  try {
> input.close();
> } catch (IOException e) {
>  // TODO Auto-generated catch block
> e.printStackTrace();
> }
>  }
>
> return digest;
> }
> [/code]
>
>
>
> [code]
> // this method is trying to generate the digest from the map output
> compressed sent by the reduce
> public synchronized String generateHash(byte[] data, Decompressor
> decompressor, int offset, int mapOutputLength) {
>  MessageDigest md = null;
> String digest = null;
> DecompressorStream decodec = null;
>  ByteArrayInputStream bis = null;
> try {
> bis = new ByteArrayInputStream(data);

Todd Lipcon
Software Engineer, Cloudera