Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - Spill file compression


Copy link to this message
-
Re: Spill file compression
Sigurd Spieckermann 2012-11-07, 14:29
Hm, maybe I need some clarification on what the combiner exactly does. From
what I understand from "Hadoop - The Definitive Guide", there are a few
occasions when a combiner may be called before the sort&shuffle phase.

1) Once the in-memory buffer reaches the threshold it will spill out to
disk. "Before it writes to disk, the thread first divides the data into
partitions corresponding to the reducers that they will ultimately be sent
to. Within each partition, the background thread performs an in-memory sort
by key, and if there is a combiner function, it is run on the output of the
sort. Running the combiner function makes for a more compact map output, so
there is less data to write to local disk and to transfer to the reducer."
So to me, this means that the combiner at this point only operates on the
data that is located in the in-memory buffer. If the buffer can keep at
most n records with k distinct keys (uniformly distributed), then the
combiner will cause a reduction in records spilled to disk by a factor of
k. (correct?)

2) "Before the task is finished, the spill files are merged into a single
partitioned and sorted output file. [...] If there are at least three spill
files (set by the min.num.spills.for.combine property) then the combiner is
run again before the output file is written." So the number of spill files
is not affected by the use of a combiner, only their sizes usually get
reduced and only at the end of the map task, all spill files are touched
again, merged and combined. If I have k distinct keys per map-task, then I
will be guaranteed to have k records at the very end of the map-task.
(correct?)

Is there any other occasion when the combiner may be called? Are spill
files ever touched again before the final merge?

Thanks,
Sigurd
2012/11/7 Sigurd Spieckermann <[EMAIL PROTECTED]>

> OK, I found the answer to one of my questions just now -- the location of
> the spill files and their sizes. So, there's a discrepancy between what I
> see and what you said about the compression. The total size of all spill
> files of a single task matches with what I estimate for them to be
> *without* compression. It seems they aren't compressed, but that's strange
> because I definitely enabled compression the way I described.
>
>
>
> 2012/11/7 Sigurd Spieckermann <[EMAIL PROTECTED]>
>
>> OK, just wanted to confirm. Maybe there is another problem then. I just
>> looked at the task logs and there were ~200 spills recorded for a single
>> task, only afterwards there was a merge phase. In my case, 200 spills are
>> about 2GB (uncompressed). One map output record easily fits into the
>> in-memory buffer, in fact, a few records fit into it. But Hadoop decides to
>> write gigabytes of spill to disk and it seems that the disk I/O and merging
>> make everything really slow. There doesn't seem to be a
>> max.num.spills.for.combine though. Is there any typical advise for this
>> kind of situation? Also, is there a way to see the size of the compressed
>> spill files to get a better idea about the file sizes I'm dealing with?
>>
>>
>>
>> 2012/11/7 Harsh J <[EMAIL PROTECTED]>
>>
>>> Yes we do compress each spill output using the same codec as specified
>>> for map (intermediate) output compression. However, the counted bytes
>>> may be counting decompressed values of the records written, and not
>>> post-compressed ones.
>>>
>>> On Wed, Nov 7, 2012 at 6:02 PM, Sigurd Spieckermann
>>> <[EMAIL PROTECTED]> wrote:
>>> > Hi guys,
>>> >
>>> > I've encountered a situation where the ratio between "Map output
>>> bytes" and
>>> > "Map output materialized bytes" is quite huge and during the map-phase
>>> data
>>> > is spilled to disk quite a lot. This is something I'll try to
>>> optimize, but
>>> > I'm wondering if the spill files are compressed at all. I set
>>> > mapred.compress.map.output=true and
>>> >
>>> mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
>>> > and everything else seems to be working correctly. Does Hadoop actually