-RE: Assignment of data splits to mappers
John Lilley 2013-06-14, 19:50
Thanks for taking the time to explain this!
I understand your point about contiguous blocks; they just aren't likely to exist. I am still curious about two things:
1) The map-per-block strategy. If we have a lot more blocks than containers, wouldn't there be some advantage to having fewer maps (which means fewer connections, less seeking etc)? Of course, increasing the block size would lead to the same thing and contiguous data to boot, but one doesn't always know the total data size.
2) The record-spanning-blocks issue. I understand that under most file formats, records *will* span blocks. But if it were simple to prevent them from spanning blocks, would that be of benefit?
From: Bertrand Dechoux [mailto:[EMAIL PROTECTED]]
Sent: Thursday, June 13, 2013 3:37 PM
To: [EMAIL PROTECTED]
Subject: Re: Assignment of data splits to mappers
The first question can be split (no pun intended) into two topics because there is actually two distinct steps. First, the InputFormat partitions the data source into InputSplits. Its implementation will determine the exact logic. Then the scheduler is responsible for ordering where/when the InputSplit should be processed. But it doesn't really deal with block itself. The InputSplit itself knows on which node the data would be local or not.
If there is no other choice, you (or more exactly the implementation) can choose to have several blocks per InputSplit. But of course, it open lots of issues. The default strategy is one block per InputSplit (and thus per map task because there is one map task per InputSplit). If you really need to put several blocks per InputSplit, the root cause might often be that the block size is not big enough. I think it is fair to assume that the 10000 block file your are referring to is not using a 512MB block size.
MultiFileInputFormat does make InputSplit with blocks that are unlikely to be on the same datanode. But that's a good decision in regard to the kind of data source it has to deal with. Anyway, two 'continuous' blocks are also very unlikely to be on the same datanode (and even less the same HDD, and even less really continuous). The only abstraction to tell whether record of data should be close one from the other is the block. That's why the idea is not really to optimize read of 'continuous' blocks on the same machine/HDD but to consider whether the block size is the right one.
HDFS and Hadoop MapReduce have been designed to work together but there is a clean abstraction between them. HDFS does not know about records and clients writing to HDFS (like MapReduce) do not often need to know the block boundaries explicitly. That's why the RecordReader provided by the InputSplit is responsible for interpreting the data into records. But of course, it has to know how to deal with records stored on the block boundary. It will happen. The advantage is that the record logic can not corrupt the storage and can be selected at read time. TextInputFormat, KeyValueTextInputFormat and NLineInputFormat have different strategies which is only possible due to this abstraction. And that's also why MapReduce can read/write to other kinds of 'datastorage', like HBase for example : because it is not tightly coupled with HDFS. But it does also bring drawbacks.
On Thu, Jun 13, 2013 at 7:57 PM, John Lilley <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> wrote:
When MR assigns data splits to map tasks, does it assign a set of non-contiguous blocks to one map? The reason I ask is, thinking through the problem, if I were the MR scheduler I would attempt to hand a map task a bunch of blocks that all exist on the same datanode, and then schedule the map task on that node. E.g. if I have an HDFS file with 10000 blocks and I want to create 1000 map tasks I'd like each map task to have 10 blocks, but those blocks are unlikely to be contiguous on a given datanode.
This is related to a question I had asked earlier, which is whether any benefit could be had by aligning data splits along block boundaries to avoid slopping reads of a block to the next block and requiring another datanode connection. The answer I got was that the extra connection overhead wasn't important. The reason I bring this up again is that comments in this discussion (https://issues.apache.org/jira/browse/HADOOP-3315) imply that doing an extra seek to the beginning of the file to read a magic number on open is a significant overhead, and this looks like a similar issue to me.