Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - All datanodes are bad IOException when trying to implement multithreading serialization


Copy link to this message
-
RE: All datanodes are bad IOException when trying to implement multithreading serialization
java8964 java8964 2013-09-30, 21:35
Not exactly know what  you are trying to do, but it seems like the memory is your bottle neck, and you think you have enough CPU resource, so you want to use multi-thread to utilize CPU resources?
You can start multi-threads in your mapper, as if you think your mapper logic is very cpu intensive, and want to make it faster by multi-threads. But reading next split in the current mapper doesn't sounds like a good idea. why you want to do that?  What happen if that split is being allocated to another mapper task?
If you have more CPU resources than your memory resource in the cluster, it just means your cluster's resource is not well-balanced. If you cannot fix that in physical level, leave it as is.
If you think it makes sense to use multi-thread in the mapper logic, go ahead using it, but only consuming the current split. If you think the split is too small for the current mapper, change your block size for the files for this kind of mapper. In HDFS, the block size is at file level. You can set it be yourself.
Yong

From: [EMAIL PROTECTED]
Date: Sun, 29 Sep 2013 21:12:40 -0500
Subject: Re: All datanodes are bad IOException when trying to implement multithreading serialization
To: [EMAIL PROTECTED]

Thanks Sonai, Felix, I have researched into combined file format before.
The problem I am trying to solve here is that I want to reduce the number of mappers running concurrently on a single node. Normally, on a machine with 8 GB of RAM and 8 Cores, I need to run 8 JVMs(mapper) to exploit 8 core CPU resources. However, this limits the heap size of each JVM(mapper) to 1 GB. I want to be able to use 2-4 JVMs (mappers) concurrently and still use the 8 cores (this will allow me to set the heap size of each JVM to 2-4GB). The additional heap memory is important for my application. This means that I use multithreading within a mapper to use more than 1 core per JVM.
The modifications I made was trying to have a single mapper read key,val pairs from the same input split concurrently. As a result, I could process the input split using two or three threads working on different portions of the input split.
Sorry if I had not made this clear in the previous email. I have written my own implementation of Mapper's run method to accomplish this, but I also need to have LineRecordReader reading from the input split concurrently. That's why I modified the LineRecordReader in the way I attached

 java.io.IOException: Unexpected checksum mismatch while writing blk_-8687605593694081588_1014 from /1\
92.168.102.99:40057

        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.verifyChunks(BlockReceiver.java:221)        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:447)

        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:532)        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:398)

        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:107)        at java.lang.Thread.run(Thread.java:619)

I suspect this might be related to threading for HDFS. May be I can't read a file in HDFS in a multithreaded fashion (one thread from the beginning and another one from the middle of the file for example)?
Any suggestions?
Thanks a lot!
Yunming
On Sun, Sep 29, 2013 at 8:58 PM, Felix Chern <[EMAIL PROTECTED]> wrote:

The number of mappers usually is same as the number of the files you fed to it.
To reduce the number you can use CombineFileInputFormat.
I recently wrote an article about it. You can take a look if this fits your needs.
http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/
Felix
On Sep 29, 2013, at 6:45 PM, yunming zhang <[EMAIL PROTECTED]> wrote:
I am actually trying to reduce the number of mappers because my application takes up a lot of memory (in the order of 1-2 GB ram per mapper).  I want to be able to use a few mappers but still maintain good CPU utilization through multithreading within a single mapper. Multithreaded Mapper does't work because it duplicates in memory data structures.
Thanks
Yunming

On Sun, Sep 29, 2013 at 6:59 PM, Sonal Goyal <[EMAIL PROTECTED]> wrote:
Wouldn't you rather just change your split size so that you can have more mappers work on your input? What else are you doing in the mappers?
Sent from my iPad
On Sep 30, 2013, at 2:22 AM, yunming zhang <[EMAIL PROTECTED]> wrote:

Hi,
I was playing with Hadoop code trying to have a single Mapper support reading a input split using multiple threads. I am getting All datanodes are bad IOException, and I am not sure what is the issue.
The reason for this work is that I suspect my computation was slow because it takes too long to create the Text() objects from inputsplit using a single thread. I tried to modify the LineRecordReader (since I am mostly using TextInputFormat) to provide additional methods to retrieve lines from the input split  getCurrentKey2(), getCurrentValue2(), nextKeyValue2(). I created a second FSDataInputStream, and second LineReader object for getCurrentKey2(), getCurrentValue2() to read from. Essentially I am trying to open the input split twice with different start points (one in the very beginning, the other in the middle of the split) to read from input split in parallel using two threads.  
In the org.apache.hadoop.mapreduce.mapper.run() method, I modified it to read simultaneously using getCurrentKey() and getCurrentKey2() using Thread 1 and Thread 2 (both threads running at the same tim

      Thread 1:       while(context.nextKeyValue()){

                  map(context.getCurrentKey(), context.getCurrentValue(), context);        }
      Thread 2:        while(context.nextKeyValue2()){                map(context.getCurrentKey2(), context.getCurrentValue2(), context);

                //System.out.println("two iter");        }
Howev