-Re: OutOfMemoryError during reduce shuffle
Shivaram Lingamneni 2013-02-22, 06:56
Thanks. You are correct that this strategy does not achieve a total
sort, only a partial/local sort, since that's all the application
requires. I think the technique is sometimes referred to as "secondary
sort", and KeyFieldBasedPartitioner is sometimes used as a convenience
to implement it, but our implementation just uses HashPartitioner with
a specially designed hash function.
Thanks for your advice re. the options. I'll investigate further with
tweaking them. If I end up filing a Hadoop bug, I'll try and remember
to follow up here.
On Thu, Feb 21, 2013 at 4:43 AM, Hemanth Yamijala
<[EMAIL PROTECTED]> wrote:
> I might be going slightly tangential here. Since you mention sorting - is
> this sorting the total input ? In that case, does HashPartitioner even work
> ? Because the partitions would only be locally sorted - but globally
> There is a sort example in Hadoop:
> which you can look at to see how it works. Basically there is a different
> type of partitioner called TotalOrderPartitioner that it uses to achieve
> global sorting of the input.
> Regarding the configuration options being more tuning related than
> correctness related - I can't vouch for it. However, I know cases where we
> have been able to tune these options and make MR programs work on larger
> On Thu, Feb 21, 2013 at 4:33 PM, Shivaram Lingamneni
> <[EMAIL PROTECTED]> wrote:
>> Thanks very much for your helpful response!
>> I should go into some more details about this job. It's essentially a
>> use of the Hadoop framework to sort a large amount of data. The mapper
>> transforms a record to (sorting_key, record), where the sorting keys
>> are effectively unique, and the reducer is trivial, outputting the
>> record and discarding the sorting key, so the memory consumption of
>> both the map and the reduce steps is intended to be O(1).
>> However, due to the nature of the sorting, it's necessary that certain
>> sets of records appear together in the sorted output. Thus the
>> partitioner (HashPartitioner with a specially designed hash function)
>> will sometimes be forced to send a large number of records to a
>> particular reducer. This is not desirable, and it occurs only rarely,
>> but it's not feasible to prevent it from happening on a deterministic
>> basis. You could say that it creates a reliability engineering
>> My understanding of the configuration options you've linked to is that
>> they're intended for performance tuning, and that even if the defaults
>> are not optimal for a particular input, the shuffle should still
>> succeed, albeit more slowly than it could have otherwise. In
>> particular, it seems like the ShuffleRamManager class (I think
>> ReduceTask.ReduceCopier.ShuffleRamManager) is intended to prevent my
>> crash from occurring, by disallowing the in-memory shuffle from using
>> up all the JVM heap.
>> Is it possible that the continued existence of this OutOfMemoryError
>> represents a bug in ShuffleRamManager, or in some other code that is
>> intended to prevent this situation from occurring?
>> Thanks so much for your time.
>> On Wed, Feb 20, 2013 at 5:41 PM, Hemanth Yamijala
>> <[EMAIL PROTECTED]> wrote:
>> > There are a few tweaks In configuration that may help. Can you please
>> > look
>> > at
>> > http://hadoop.apache.org/docs/r1.0.4/mapred_tutorial.html#Shuffle%2FReduce+Parameters
>> > Also, since you have mentioned reducers are unbalanced, could you use a
>> > custom partitioner to balance out the outputs. Or just increase the
>> > number
>> > of reducers so the load is spread out.
>> > Thanks
>> > Hemanth
>> > On Wednesday, February 20, 2013, Shivaram Lingamneni wrote:
>> >> I'm experiencing the following crash during reduce tasks:
>> >> https://gist.github.com/slingamn/04ff3ff3412af23aa50d