-Re: Best practices for jobs with large Map output
Chris Douglas 2011-04-18, 07:02
I don't understand your job, but the Writable interface is just a
format for record serialization. If your mapper generates
<URI,offset,length> tuples referencing into data written in HDFS, that
is sufficient to open the stream in the reducer using the FileSystem
API. Writing an OutputFormat that interprets that tuple as a range of
bytes to read from HDFS and write to a single stream should not
diverge too far from the OutputFormats bundled with Hadoop. You might
Again, it's not clear what your goal is or what you mean by "index".
Are the input records changed before being written by the reduce? Or
is the purpose of this job only to concatenate index files? -C
On Fri, Apr 15, 2011 at 8:35 PM, Shai Erera <[EMAIL PROTECTED]> wrote:
> bq. If you can change your job to handle metadata backed by a store in HDFS
> I have two Mappers, one that works with HDFS and one with GPFS. The GPFS one
> does exactly that -- it stores the indexes in GPFS (which all Mappers and
> Reducers see, as a shared location) and outputs just the pointer to that
> location. Then, Hadoop just merges key=LongWritable and value=Text, and
> indeed it works better (the job runs ~170% faster).
> The value (index) is a collection of files, though my Writable writes them
> as a single stream, so in essence I can make them look a single file. I've
> never worked with HDFS before (only GPFS), and HDFS is a new requirement.
> Can you point out some example code / classes I should use to achieve the
> same trick? Will I need, in my Mapper, to specifically call FileSystem API
> to store the index, or is a Writable enough?
> On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas <[EMAIL PROTECTED]> wrote:
>> On Fri, Apr 15, 2011 at 9:34 AM, Harsh J <[EMAIL PROTECTED]> wrote:
>> >> Is there absolutely no way to bypass the shuffle + sort phases? I don't
>> >> mind
>> >> writing some classes if that's what it takes ...
>> > Shuffle is an essential part of the Map to Reduce transition, it can't
>> > be 'bypassed' since a Reducer has to fetch all map-outputs to begin
>> > with. Sort/Group may be made dummy as you had done, but can't be
>> > disabled altogether AFAIK. The latter has been bought up on the lists
>> > before, if I remember right; but am not aware of an implementation
>> > alongside that could do that (just begin reducing merely partitioned,
>> > unsorted data).
>> The sort also effects the partitioning, so completely disabling the
>> sort (as above) will only work with 1 reducer.
>> If only grouping is important, then a bijective f(key) that is
>> inexpensive to sort is canonical. Though more efficient grouping
>> methods are possible, in practice this captures most of the possible
>> performance improvement.
>> If neither sorting nor grouping are important, then a comparator that
>> always asserts that its operands are equal will effect the
>> partitioning, but each reducer will receive all its records in one
>> iterator. Note also that the key portion of the record will be
>> incorrect in the old API.
>> However, as Harsh correctly points out, this doesn't appear to be the
>> bottleneck in your job. The data motion for records of tens or
>> hundreds of MB is patently inefficient, and OOMs are a regrettable but
>> relatively minor consequence. If you can change your job to handle
>> metadata backed by a store in HDFS, then your job can merge the
>> indices instead of merging GB of record data. In other words, pass a
>> reference to the record data and not the actual.
>> If the job neither sorts nor groups, what is the format for the index?
>> Instead of a reduce phase, a second, single-map job that concatenates
>> the output of the first seems better fit (assuming the goal is a
>> single file). -C