|
Pedro Costa
2011-11-04, 15:37
Todd Lipcon
2011-11-04, 16:46
Pedro Costa
2011-11-04, 17:04
Todd Lipcon
2011-11-04, 17:25
Pedro Costa
2011-11-04, 18:10
Todd Lipcon
2011-11-04, 18:27
Pedro Costa
2011-11-04, 18:29
|
-
Understanding the MapOutputPedro Costa 2011-11-04, 15:37
Hi,
I'm trying to understand the structure of the map output file. Here's an example of a mapoutput file that contains 2 partitions: [code] <FF><FF><FF><FF>^@^@716banana banana apple banana carrot carrot apple banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot apple carrot apple apple carrot banana apple ^Mbanana apple <FF><FF><DF>|<8E><B7> [/code] 1 - I would like to understand what are the ASCII characters parts. What they means? 2 - What type of file is a map output? Is it a SequenceFileOutputFormat, or a TextOutputFormat? 3 - I've a small program that runs independently of the MR that has the goal to digest each partition and give the correspondent hash. How do I know where each partition starts? -- Thanks, PSC
-
Re: Understanding the MapOutputTodd Lipcon 2011-11-04, 16:46
Hi Pedro,
The format is called IFile. Check out the source for more info on the format - it's fairly simple. The partition starts are recorded in a separate index file next to the output file. I don't think you'll find significant docs on this format since it's MR-internal - the code is your best resource. -Todd On Fri, Nov 4, 2011 at 8:37 AM, Pedro Costa <[EMAIL PROTECTED]> wrote: > Hi, > > I'm trying to understand the structure of the map output file. Here's an > example of a mapoutput file that contains 2 partitions: > > [code] > <FF><FF><FF><FF>^@^@716banana banana apple banana carrot carrot apple > banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot apple > carrot apple apple carrot banana apple ^Mbanana apple <FF><FF><DF>|<8E><B7> > [/code] > > 1 - I would like to understand what are the ASCII characters parts. What > they means? > > 2 - What type of file is a map output? Is it a SequenceFileOutputFormat, or > a TextOutputFormat? > > 3 - I've a small program that runs independently of the MR that has the > goal to digest each partition and give the correspondent hash. How do I > know where each partition starts? > > > -- > Thanks, > PSC > -- Todd Lipcon Software Engineer, Cloudera
-
Re: Understanding the MapOutputPedro Costa 2011-11-04, 17:04
1- I think that IFIle.reader can only read the whole map output file. I
want to read a partition of the map output. How can I do that? How do I set the size of a partition in the I 2 - I know that map output is composed by blocks. What is the size of a block? Is it 64MB by default? 2011/11/4 Todd Lipcon <[EMAIL PROTECTED]> > Hi Pedro, > > The format is called IFile. Check out the source for more info on the > format - it's fairly simple. The partition starts are recorded in a > separate index file next to the output file. > > I don't think you'll find significant docs on this format since it's > MR-internal - the code is your best resource. > > -Todd > > On Fri, Nov 4, 2011 at 8:37 AM, Pedro Costa <[EMAIL PROTECTED]> wrote: > > Hi, > > > > I'm trying to understand the structure of the map output file. Here's an > > example of a mapoutput file that contains 2 partitions: > > > > [code] > > <FF><FF><FF><FF>^@^@716banana banana apple banana carrot carrot apple > > banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot apple > > carrot apple apple carrot banana apple ^Mbanana apple > <FF><FF><DF>|<8E><B7> > > [/code] > > > > 1 - I would like to understand what are the ASCII characters parts. What > > they means? > > > > 2 - What type of file is a map output? Is it a SequenceFileOutputFormat, > or > > a TextOutputFormat? > > > > 3 - I've a small program that runs independently of the MR that has the > > goal to digest each partition and give the correspondent hash. How do I > > know where each partition starts? > > > > > > -- > > Thanks, > > PSC > > > > > > -- > Todd Lipcon > Software Engineer, Cloudera > -- Thanks,
-
Re: Understanding the MapOutputTodd Lipcon 2011-11-04, 17:25
On Fri, Nov 4, 2011 at 10:04 AM, Pedro Costa <[EMAIL PROTECTED]> wrote:
> 1- I think that IFIle.reader can only read the whole map output file. I > want to read a partition of the map output. How can I do that? How do I set > the size of a partition in the I Look at the code for MapOutputServlet - it uses the index mechanism to find a particular partition. > > 2 - I know that map output is composed by blocks. What is the size of a > block? Is it 64MB by default? Nope, it doesn't use blocks. That's HDFS you're thinking of. -Todd > 2011/11/4 Todd Lipcon <[EMAIL PROTECTED]> > >> Hi Pedro, >> >> The format is called IFile. Check out the source for more info on the >> format - it's fairly simple. The partition starts are recorded in a >> separate index file next to the output file. >> >> I don't think you'll find significant docs on this format since it's >> MR-internal - the code is your best resource. >> >> -Todd >> >> On Fri, Nov 4, 2011 at 8:37 AM, Pedro Costa <[EMAIL PROTECTED]> wrote: >> > Hi, >> > >> > I'm trying to understand the structure of the map output file. Here's an >> > example of a mapoutput file that contains 2 partitions: >> > >> > [code] >> > <FF><FF><FF><FF>^@^@716banana banana apple banana carrot carrot apple >> > banana 0apple carrot carrot carrot banana carrot carrot 5^N4carrot apple >> > carrot apple apple carrot banana apple ^Mbanana apple >> <FF><FF><DF>|<8E><B7> >> > [/code] >> > >> > 1 - I would like to understand what are the ASCII characters parts. What >> > they means? >> > >> > 2 - What type of file is a map output? Is it a SequenceFileOutputFormat, >> or >> > a TextOutputFormat? >> > >> > 3 - I've a small program that runs independently of the MR that has the >> > goal to digest each partition and give the correspondent hash. How do I >> > know where each partition starts? >> > >> > >> > -- >> > Thanks, >> > PSC >> > >> >> >> >> -- >> Todd Lipcon >> Software Engineer, Cloudera >> > > > > -- > Thanks, > -- Todd Lipcon Software Engineer, Cloudera
-
Re: Understanding the MapOutputPedro Costa 2011-11-04, 18:10
I've looked to the MapOutputServlet class. But the problem is the following:
MapOutput can be compressed or not. When I'm talking about uncompressed mapoutput, using the index mechanism of the MapOutputServlet, it works for me. The map tasks generates digests for each partition, and it match with the digests produce by the reduce. Let me explain what I've updated in the code of MR at my own version. A map task (MT) is producing a digest for each partition of data generated. So, if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and Hash2. Now, when a reduce task (RT) fetch the map output, it will generate another digest using the index mechanism of the MapOutputServlet and compares with the respective digest generated by the map task. As you can see in my explanation, when I'm talking about uncompressed map output, the index mechanism is really useful. But I've also tried to do the same with compressed map output. And it doesn't work. That's the reason that I'm trying now with the IFile.Reader class. As you can see, I'm in a big dilemma and I don't know what to do. I will show you my code. This 2 methods are trying to generate digests from the map and the reduce side. At the end, they give different results, and I don't know why. These 2 methods are my first tentative to generate digests from compressed map output [code] // this method is trying to generate a digest from the compressed map output on the map side. public synchronized String generateHash(FileSystem fs, Path filename, Decompressor decompressor, int offset, int mapOutputLength) { LOG.debug("Opening file2: " + filename); MessageDigest md = null; String digest = null; DecompressorStream decodec = null; FSDataInputStream input = null; try { input = fs.open(filename); decodec = new DecompressorStream(input, decompressor); md = MessageDigest.getInstance("SHA-1"); System.out.println("ABC"); byte[] buffer; int size; while (mapOutputLength > 0) { // the case that the bytes read is small the the default size. // We don't want that the message digest contains trash. size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); System.out.println("mapOutputLength: " + mapOutputLength + " Size: " + size); if(size == 0) break; buffer = new byte[size]; size = decodec.read(buffer, offset, size); System.out.println("read: " + size + "\ndata: " + new String(buffer)); mapOutputLength -= size; if(size > 0) md.update(buffer); else if(size == -1) break; } System.out.println("DFG"); digest = hashIt(md); } catch (NoSuchAlgorithmException e) { //TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(input!= null) try { input.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return digest; } [/code] [code] // this method is trying to generate the digest from the map output compressed sent by the reduce public synchronized String generateHash(byte[] data, Decompressor decompressor, int offset, int mapOutputLength) { MessageDigest md = null; String digest = null; DecompressorStream decodec = null; ByteArrayInputStream bis = null; try { bis = new ByteArrayInputStream(data); decodec = new DecompressorStream(bis, decompressor); md = MessageDigest.getInstance("SHA-1"); int size; byte[] buffer; while (mapOutputLength > 0) { // the case that the bytes read is small the the default size. // We don't want that the message digest contains trash. size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); if(size == 0) break; buffer = new byte[size]; decodec.read(buffer, offset, size); md.update(buffer); mapOutputLength -= size; } digest = hashIt(md); } catch (NoSuchAlgorithmException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(bis!= null) try { bis.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return digest; } [/code] 2011/11/4 Todd Lipcon <[EMAIL PROTECTED]> Thanks,
-
Re: Understanding the MapOutputTodd Lipcon 2011-11-04, 18:27
Hi Pedro,
It sounds like you're on the right track, but I don't really have time to help much beyond pointing you in the right direction. Time to put on your debugging hat :) Maybe do some testing with a small job like "sleep -mt 1 -rt 1 -m 1 -r 1" - a sleep job with 1 mapper and 1 reducer. If I recall correctly it generates a single map output record... otherwise you could do a "sort" of 1 line of text. Then you can easily add debug output to diagnose what your issue is. -Todd On Fri, Nov 4, 2011 at 11:10 AM, Pedro Costa <[EMAIL PROTECTED]> wrote: > I've looked to the MapOutputServlet class. But the problem is the following: > > MapOutput can be compressed or not. When I'm talking about uncompressed > mapoutput, using the index mechanism of the MapOutputServlet, it works for > me. The map tasks generates digests for each partition, and it match with > the digests produce by the reduce. > > Let me explain what I've updated in the code of MR at my own version. A map > task (MT) is producing a digest for each partition of data generated. So, > if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and > Hash2. > > Now, when a reduce task (RT) fetch the map output, it will generate another > digest using the index mechanism of the MapOutputServlet and compares with > the respective digest generated by the map task. > > As you can see in my explanation, when I'm talking about uncompressed map > output, the index mechanism is really useful. > > But I've also tried to do the same with compressed map output. And it > doesn't work. That's the reason that I'm trying now with the IFile.Reader > class. > > As you can see, I'm in a big dilemma and I don't know what to do. > > I will show you my code. This 2 methods are trying to generate digests from > the map and the reduce side. At the end, they give different results, and I > don't know why. These 2 methods are my first tentative to generate digests > from compressed map output > > > [code] > // this method is trying to generate a digest from the compressed map > output on the map side. > public synchronized String generateHash(FileSystem fs, Path filename, > Decompressor decompressor, int offset, int mapOutputLength) { > LOG.debug("Opening file2: " + filename); > > MessageDigest md = null; > String digest = null; > DecompressorStream decodec = null; > FSDataInputStream input = null; > > try { > input = fs.open(filename); > decodec = new DecompressorStream(input, decompressor); > md = MessageDigest.getInstance("SHA-1"); > System.out.println("ABC"); > byte[] buffer; > int size; > while (mapOutputLength > 0) { > // the case that the bytes read is small the the default size. > // We don't want that the message digest contains trash. > size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); > System.out.println("mapOutputLength: " + mapOutputLength + " Size: " + > size); > > if(size == 0) > break; > > buffer = new byte[size]; > size = decodec.read(buffer, offset, size); > System.out.println("read: " + size + "\ndata: " + new String(buffer)); > mapOutputLength -= size; > > if(size > 0) > md.update(buffer); > else > if(size == -1) > break; > } > System.out.println("DFG"); > digest = hashIt(md); > } catch (NoSuchAlgorithmException e) { > //TODO Auto-generated catch block > e.printStackTrace(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } finally { > if(input!= null) > try { > input.close(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > return digest; > } > [/code] > > > > [code] > // this method is trying to generate the digest from the map output > compressed sent by the reduce > public synchronized String generateHash(byte[] data, Decompressor > decompressor, int offset, int mapOutputLength) { > MessageDigest md = null; > String digest = null; > DecompressorStream decodec = null; > ByteArrayInputStream bis = null; > try { > bis = new ByteArrayInputStream(data); Todd Lipcon Software Engineer, Cloudera
-
Re: Understanding the MapOutputPedro Costa 2011-11-04, 18:29
Thank you for your help.
2011/11/4 Todd Lipcon <[EMAIL PROTECTED]> > Hi Pedro, > > It sounds like you're on the right track, but I don't really have time > to help much beyond pointing you in the right direction. Time to put > on your debugging hat :) Maybe do some testing with a small job like > "sleep -mt 1 -rt 1 -m 1 -r 1" - a sleep job with 1 mapper and 1 > reducer. If I recall correctly it generates a single map output > record... otherwise you could do a "sort" of 1 line of text. Then you > can easily add debug output to diagnose what your issue is. > > -Todd > > On Fri, Nov 4, 2011 at 11:10 AM, Pedro Costa <[EMAIL PROTECTED]> wrote: > > I've looked to the MapOutputServlet class. But the problem is the > following: > > > > MapOutput can be compressed or not. When I'm talking about uncompressed > > mapoutput, using the index mechanism of the MapOutputServlet, it works > for > > me. The map tasks generates digests for each partition, and it match with > > the digests produce by the reduce. > > > > Let me explain what I've updated in the code of MR at my own version. A > map > > task (MT) is producing a digest for each partition of data generated. So, > > if MT1 produces 2 partitions, on uncompressed data, it produces Hash1 and > > Hash2. > > > > Now, when a reduce task (RT) fetch the map output, it will generate > another > > digest using the index mechanism of the MapOutputServlet and compares > with > > the respective digest generated by the map task. > > > > As you can see in my explanation, when I'm talking about uncompressed map > > output, the index mechanism is really useful. > > > > But I've also tried to do the same with compressed map output. And it > > doesn't work. That's the reason that I'm trying now with the IFile.Reader > > class. > > > > As you can see, I'm in a big dilemma and I don't know what to do. > > > > I will show you my code. This 2 methods are trying to generate digests > from > > the map and the reduce side. At the end, they give different results, > and I > > don't know why. These 2 methods are my first tentative to generate > digests > > from compressed map output > > > > > > [code] > > // this method is trying to generate a digest from the compressed map > > output on the map side. > > public synchronized String generateHash(FileSystem fs, Path filename, > > Decompressor decompressor, int offset, int mapOutputLength) { > > LOG.debug("Opening file2: " + filename); > > > > MessageDigest md = null; > > String digest = null; > > DecompressorStream decodec = null; > > FSDataInputStream input = null; > > > > try { > > input = fs.open(filename); > > decodec = new DecompressorStream(input, decompressor); > > md = MessageDigest.getInstance("SHA-1"); > > System.out.println("ABC"); > > byte[] buffer; > > int size; > > while (mapOutputLength > 0) { > > // the case that the bytes read is small the the default size. > > // We don't want that the message digest contains trash. > > size = mapOutputLength < (60 * 1024) ? mapOutputLength : (60*1024); > > System.out.println("mapOutputLength: " + mapOutputLength + " Size: " + > > size); > > > > if(size == 0) > > break; > > > > buffer = new byte[size]; > > size = decodec.read(buffer, offset, size); > > System.out.println("read: " + size + "\ndata: " + new String(buffer)); > > mapOutputLength -= size; > > > > if(size > 0) > > md.update(buffer); > > else > > if(size == -1) > > break; > > } > > System.out.println("DFG"); > > digest = hashIt(md); > > } catch (NoSuchAlgorithmException e) { > > //TODO Auto-generated catch block > > e.printStackTrace(); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } finally { > > if(input!= null) > > try { > > input.close(); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > } > > > > return digest; > > } > > [/code] > > > > > > > > [code] > > // this method is trying to generate the digest from the map output Thanks, |