Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> How to use the hadoop consumer in distributed mode?

Copy link to this message
Re: How to use the hadoop consumer in distributed mode?
Does the version in contrib contain the fixes for Kafka-131? The offsets
were incorrectly computed prior to this patch.

At LinkedIn, this is what we do in a nutshell.
1. We connect to the zookeeper instance. With this we are able to discover
the topics, the brokers and the partitions of a broker.

2. For a topic we want to pull, we create files that contains the offset for
each broker and partition.  Each individual file contains a unique
broker/partition pair. This is essentially what data generator does, except
we use values from zookeeper. We take the output of the previous run of
kafka (the new offsets) and use them as the new offset files. If the old
offset doesn't exist, we set a default starting offset.

3. We run the pull hadoop job. One mapper per broker/partition pulls using
the simple consumer into hdfs (the KafkaETLRecordReader handles most of
this). We query kafka for the latest offset. The mapper fetches from the
kafka broker until the latest offset is reached.

4. We group the data by hourly partition with a reduce step.

5. The kafka hadoop job's mapper spits out new offsets for the next time we
decide to pull the data. The pull occurs at regular scheduled intervals
quite frequently.

That's the gist of it. There are a few additional modification we made to
the kafka job including the ability to handle unavailable nodes, avro schema
resolution and auditing.


On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <[EMAIL PROTECTED]> wrote:

> Is it possible that this is due to a hadoop version mismatch? Typically if
> the client jar you pick up does not match the hadoop version of your hadoop
> cluster you get EOFException.
> -Jay
> On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
> > Hello everyone :) !
> >
> > I have trouble using the Kafka hadoop consumer included in
> > contrib/hadoop-consumer and I'd like to know if/how it is used at
> LinkedIn
> > or elsewhere? I would also like if someone could confirm or correct the
> > assumptions I make below.
> >
> > Here's what I have so far:
> >
> > It works when pulling from one Kafka broker, but not when pulling from
> > many. There are two problems:
> >
> > The first problem concerns the offset files that the Map/Reduce job takes
> > as
> > its input. From what I understand, these offset files represent the
> offset
> > to start reading from on each of the Kafka brokers.
> >
> > To generate those files the first time (and thus start from offset -1),
> we
> > can go in contrib/hadoop-consumer/ and run:
> >
> > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties
> >
> > The problem is that this DataGenerator class can take only one Kafka
> broker
> > in its parameters (the properties file) and thus generates only one
> offset
> > file.
> >
> > The Map/Reduce job will then spawn one map task for each offset file it
> > finds in its input directory, and each of these map tasks will connect to
> a
> > different Kafka broker. Since the DataGenerator can only generate one
> > offset
> > file, the Map/Reduce job only spawns one map task which queries only one
> > Kafka broker.
> >
> > Unless my assumptions are wrong or someone else provides a nice
> alternative
> > solution, I was planning to modify the DataGenerator class so that it can
> > generate multiple offset files, but for now, as a manual work-around, I
> > just
> > duplicated the offset files and specified a different Kafka broker in
> each.
> >
> > Other than that, I am thinking perhaps a more robust solution would be to
> > have ZK-based discovery of the available brokers. Again, I'm curious to
> > find
> > out how this is done at LinkedIn or elsewhere?
> >
> > The second problem is when I run the M/R job. If I run it with the
> multiple
> > offset files I manually generated as its input, it does spawn three map
> > tasks, as expected, but it then fails with the following error:
> >
> > java.io.IOException: java.io.EOFException