Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - The problem with Hadoop and Iterative applications and merge join.

Copy link to this message
The problem with Hadoop and Iterative applications and merge join.
Kevin Burton 2011-09-14, 01:58
I was going to post this to my blog but I'm running into technical
difficulties at the moment (don't ask) so figured I'd just post it here and
see if anyone any feedback.


I recently wrote an implementation of an algorithm in in Pig which exposed
some bugs / design flaws in the Hadoop core which are addressed in other Map
Reduce implementations.

It exposed some bugs in the Pig optimizer but at the core these were
problems with Hadoop.

Specifically, if you have run one previous map reduce job, and the data is
all nicely partitioned across the cluster, your next map reduce job can not
do an efficient merge with the previous data set.

In the case of Pig it takes the previous output and does a full rejoin,
sorting the data again, shuffling the data to other notes, and re-grouping
the data.

Twister <http://www.iterativemapreduce.org/>, and Map Reduce
designed to address these problems.

Twister was specifically designed for iterative applications.

For example, if we consider K-means clustering algorithm[16], during the nth
iteration the program uses the input data set and the cluster centers
computed during the (n-1)th iteration to compute the next set of cluster
centers. To support map/reduce tasks operating with these two types of data
products we introduced a "configure" phase for map and reduce tasks, which
can be used to load (read) any static data at the map and reduce tasks.

Twister is interesting in that it's a 'micro-mapreduce' implementation
(which is meant to be a complement) and is only 11k lines of code.

Basically it consists of a controller which you use to partition your input
to N machines. The N machines then act as static partitions so that when you
do your next Map Reduce round, the data is already partitioned next to you
on disk.

This means that if you have data with little skew, it will all be nicely
balanced out across the cluster and you can do a local map-side merge join
with the data that is sitting on disk.

With Hadoop and HDFS you can't do this. The data from your stored reducer is
written to the local HDFS node which counts as one copy but then the extra
blocks just are randomly given to other nodes.

Since the storage of the partitions isn't deterministic, there is no way to
route the secondary map reduce jobs so that the reducers and the data from
the previous map reduce partition are on the same machine.

This basically means that you only have a nr_replicas / N probability that
you will be doing a local read. If your nr_replicas is 2-3 and N is like
1000 machines, the probability that you're doing a local read is very low.

The Twister paper notes that their failure mode isn't very sophisticated.
Basically they have a controller and you just run back one iteration of the
MR job:

Our approach is to save the application state of the computation between
iterations so that in the case of a failure the entire computation can be
rolled back few iterations. Supporting individual map or reduce failures
require adopting an architecture similar to Google, which will eliminate
most of the efficiencies that we have gained using Twister for iterative
MapReduce computations.

This seems like a non-starter for many applications. If your computation
requires hundreds or thousands of nodes you're going to rollback often. The
rollback probability is MTBF of a node / N so the more nodes you have the
more often your Twister jobs will be crashing and needing to restart

I don't think this design is impossible to fix though.

It seems that sharding HDFS could be the solution here.

Basically, instead of using random block balancing, you just have block
replication paired to only 2-3 nodes.

This has its drawbacks. With the random approach if you have a node fail,
you can read blocks from N-1 nodes. This means that each source node only
has to serve 1 HDFS block chunk so you never saturate source ethernet ports
or cause too high of a load on a single box.

However, with a shared approach, your HDFS reads are only going to come from
two boxes. Your other two machines in the same shard.

However, with temporary data you might just be able to avoid doing the block
replication as it might not matter. If the data will eventually be discarded
in the next iteration being down to 2 replicas might not be the end of the

Another issue is how to do your reduce jobs if these box form one partition.

Should the entire partition be on reduces job or should you run three reduce
jobs in a shard.

It might be possible to to just run three reduce jobs but replicate the data
to all nodes.

Then when the next iteration's mappers run you can read from three
partitions and do the join and final reduction then. I think the trade off
would be worth it as it might be a slight performance hit to do it this way
but a MAJOR hit to only use 1/3rd of your hardware for reducers.

<b>Map Reduce Merge</b>

The Map Reduce Merge paper goes a step beyond this and argues that we need
to stop thinking in Map Reduce and instead start to think in

We improve Map-Reduce into a new model called Map- Reduce-Merge. It adds to
Map-Reduce a Merge phase that can efficiently merge data already partitioned
and sorted (or hashed) by map and reduce modules. We also demonstrate that
this new model can express relational algebra operators as well as implement
several join algorithms.


In this new model, the map function transforms an input key/value pair
(k1,v1) into a list of intermediate key/value pairs [(k2 , v2 )]. The reduce
function aggregates the list of values [v2] associated with k2 and produces
a list of values [v3], which is also associated with k2. Note that inputs
and outputs of both functions belong to the same lineage, say α. Another
pair of map and reduce functions produce the intermediate output (k3,[v4])
from another lineage, say β. Based on keys k2