|
Felix Giguere Villegas
2011-10-18, 16:01
Felix Giguere Villegas
2011-10-18, 16:04
Jun Rao
2011-10-18, 16:36
Felix Giguere Villegas
2011-10-18, 20:06
Jay Kreps
2011-10-18, 21:03
Richard Park
2011-10-18, 21:52
Felix Giguere Villegas
2011-10-18, 22:18
Jay Kreps
2011-10-18, 22:28
Hisham Mardam-Bey
2011-10-18, 23:31
Felix GV
2011-10-26, 20:10
Richard Park
2011-10-26, 20:33
Felix GV
2011-10-26, 20:51
|
-
How to use the hadoop consumer in distributed mode?Felix Giguere Villegas 2011-10-18, 16:01
Hello everyone :) !
I have trouble using the Kafka hadoop consumer included in contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn or elsewhere? I would also like if someone could confirm or correct the assumptions I make below. Here's what I have so far: It works when pulling from one Kafka broker, but not when pulling from many. There are two problems: The first problem concerns the offset files that the Map/Reduce job takes as its input. From what I understand, these offset files represent the offset to start reading from on each of the Kafka brokers. To generate those files the first time (and thus start from offset -1), we can go in contrib/hadoop-consumer/ and run: ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties The problem is that this DataGenerator class can take only one Kafka broker in its parameters (the properties file) and thus generates only one offset file. The Map/Reduce job will then spawn one map task for each offset file it finds in its input directory, and each of these map tasks will connect to a different Kafka broker. Since the DataGenerator can only generate one offset file, the Map/Reduce job only spawns one map task which queries only one Kafka broker. Unless my assumptions are wrong or someone else provides a nice alternative solution, I was planning to modify the DataGenerator class so that it can generate multiple offset files, but for now, as a manual work-around, I just duplicated the offset files and specified a different Kafka broker in each. Other than that, I am thinking perhaps a more robust solution would be to have ZK-based discovery of the available brokers. Again, I'm curious to find out how this is done at LinkedIn or elsewhere? The second problem is when I run the M/R job. If I run it with the multiple offset files I manually generated as its input, it does spawn three map tasks, as expected, but it then fails with the following error: java.io.IOException: java.io.EOFException at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:180) at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) ... 11 more It fails before writing anything whatsoever, and it fails repeatedly for each Map task until the JobTracker reaches the maximum amount of failures per task and marks the job as failed. I haven't figured this one out yet... Any help would be greatly appreciated :) ! Thanks :) !!!! -- Felix
-
Re: How to use the hadoop consumer in distributed mode?Felix Giguere Villegas 2011-10-18, 16:04
I say that "it does spawn *three* map tasks, as expected", because I was
testing this with three offset files. I meant multiple map tasks. Anyway, thanks in advance :) ! -- Felix On Tue, Oct 18, 2011 at 12:01 PM, Felix Giguere Villegas < [EMAIL PROTECTED]> wrote: > Hello everyone :) ! > > I have trouble using the Kafka hadoop consumer included in > contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn > or elsewhere? I would also like if someone could confirm or correct the > assumptions I make below. > > Here's what I have so far: > > It works when pulling from one Kafka broker, but not when pulling from > many. There are two problems: > > The first problem concerns the offset files that the Map/Reduce job takes > as its input. From what I understand, these offset files represent the > offset to start reading from on each of the Kafka brokers. > > To generate those files the first time (and thus start from offset -1), we > can go in contrib/hadoop-consumer/ and run: > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > The problem is that this DataGenerator class can take only one Kafka broker > in its parameters (the properties file) and thus generates only one offset > file. > > The Map/Reduce job will then spawn one map task for each offset file it > finds in its input directory, and each of these map tasks will connect to a > different Kafka broker. Since the DataGenerator can only generate one offset > file, the Map/Reduce job only spawns one map task which queries only one > Kafka broker. > > Unless my assumptions are wrong or someone else provides a nice alternative > solution, I was planning to modify the DataGenerator class so that it can > generate multiple offset files, but for now, as a manual work-around, I just > duplicated the offset files and specified a different Kafka broker in each. > > Other than that, I am thinking perhaps a more robust solution would be to > have ZK-based discovery of the available brokers. Again, I'm curious to find > out how this is done at LinkedIn or elsewhere? > > The second problem is when I run the M/R job. If I run it with the multiple > offset files I manually generated as its input, it does spawn three map > tasks, as expected, but it then fails with the following error: > > java.io.IOException: java.io.EOFException > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:180) > at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) > at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) > at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) > ... 11 more > > > It fails before writing anything whatsoever, and it fails repeatedly for > each Map task until the JobTracker reaches the maximum amount of failures > per task and marks the job as failed. > > I haven't figured this one out yet...
-
Re: How to use the hadoop consumer in distributed mode?Jun Rao 2011-10-18, 16:36
For the offset part, there is one offset per partition. So there will be as
many as map tasks as total number of partitions, not brokers. Jun On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < [EMAIL PROTECTED]> wrote: > Hello everyone :) ! > > I have trouble using the Kafka hadoop consumer included in > contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn > or elsewhere? I would also like if someone could confirm or correct the > assumptions I make below. > > Here's what I have so far: > > It works when pulling from one Kafka broker, but not when pulling from > many. There are two problems: > > The first problem concerns the offset files that the Map/Reduce job takes > as > its input. From what I understand, these offset files represent the offset > to start reading from on each of the Kafka brokers. > > To generate those files the first time (and thus start from offset -1), we > can go in contrib/hadoop-consumer/ and run: > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > The problem is that this DataGenerator class can take only one Kafka broker > in its parameters (the properties file) and thus generates only one offset > file. > > The Map/Reduce job will then spawn one map task for each offset file it > finds in its input directory, and each of these map tasks will connect to a > different Kafka broker. Since the DataGenerator can only generate one > offset > file, the Map/Reduce job only spawns one map task which queries only one > Kafka broker. > > Unless my assumptions are wrong or someone else provides a nice alternative > solution, I was planning to modify the DataGenerator class so that it can > generate multiple offset files, but for now, as a manual work-around, I > just > duplicated the offset files and specified a different Kafka broker in each. > > Other than that, I am thinking perhaps a more robust solution would be to > have ZK-based discovery of the available brokers. Again, I'm curious to > find > out how this is done at LinkedIn or elsewhere? > > The second problem is when I run the M/R job. If I run it with the multiple > offset files I manually generated as its input, it does spawn three map > tasks, as expected, but it then fails with the following error: > > java.io.IOException: java.io.EOFException > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: > java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:180) > at > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > at > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > at > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) > at > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) > at > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) > ... 11 more > > > It fails before writing anything whatsoever, and it fails repeatedly for > each Map task until the JobTracker reaches the maximum amount of failures
-
Re: How to use the hadoop consumer in distributed mode?Felix Giguere Villegas 2011-10-18, 20:06
Ah interesting.
So then I guess it would make sense to have as many partitions as I have nodes in my hadoop cluster, or a multiple of the amount of nodes in the cluster, in order to maximize the import speed? Also, can I have just one offset file? Or would I need to somehow generate one offset per partition? -- Felix On Tue, Oct 18, 2011 at 12:36 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > For the offset part, there is one offset per partition. So there will be as > many as map tasks as total number of partitions, not brokers. > > Jun > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < > [EMAIL PROTECTED]> wrote: > > > Hello everyone :) ! > > > > I have trouble using the Kafka hadoop consumer included in > > contrib/hadoop-consumer and I'd like to know if/how it is used at > > or elsewhere? I would also like if someone could confirm or correct the > > assumptions I make below. > > > > Here's what I have so far: > > > > It works when pulling from one Kafka broker, but not when pulling from > > many. There are two problems: > > > > The first problem concerns the offset files that the Map/Reduce job takes > > as > > its input. From what I understand, these offset files represent the > offset > > to start reading from on each of the Kafka brokers. > > > > To generate those files the first time (and thus start from offset -1), > we > > can go in contrib/hadoop-consumer/ and run: > > > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > > > The problem is that this DataGenerator class can take only one Kafka > broker > > in its parameters (the properties file) and thus generates only one > offset > > file. > > > > The Map/Reduce job will then spawn one map task for each offset file it > > finds in its input directory, and each of these map tasks will connect to > a > > different Kafka broker. Since the DataGenerator can only generate one > > offset > > file, the Map/Reduce job only spawns one map task which queries only one > > Kafka broker. > > > > Unless my assumptions are wrong or someone else provides a nice > alternative > > solution, I was planning to modify the DataGenerator class so that it can > > generate multiple offset files, but for now, as a manual work-around, I > > just > > duplicated the offset files and specified a different Kafka broker in > each. > > > > Other than that, I am thinking perhaps a more robust solution would be to > > have ZK-based discovery of the available brokers. Again, I'm curious to > > find > > out how this is done at LinkedIn or elsewhere? > > > > The second problem is when I run the M/R job. If I run it with the > multiple > > offset files I manually generated as its input, it does spawn three map > > tasks, as expected, but it then fails with the following error: > > > > java.io.IOException: java.io.EOFException > > at > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > > at > > > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > > at > > > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:396) > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: > > java.io.EOFException > > at java.io.DataInputStream.readFully(DataInputStream.java:180) > > at > > > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
-
Re: How to use the hadoop consumer in distributed mode?Jay Kreps 2011-10-18, 21:03
Is it possible that this is due to a hadoop version mismatch? Typically if
the client jar you pick up does not match the hadoop version of your hadoop cluster you get EOFException. -Jay On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < [EMAIL PROTECTED]> wrote: > Hello everyone :) ! > > I have trouble using the Kafka hadoop consumer included in > contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn > or elsewhere? I would also like if someone could confirm or correct the > assumptions I make below. > > Here's what I have so far: > > It works when pulling from one Kafka broker, but not when pulling from > many. There are two problems: > > The first problem concerns the offset files that the Map/Reduce job takes > as > its input. From what I understand, these offset files represent the offset > to start reading from on each of the Kafka brokers. > > To generate those files the first time (and thus start from offset -1), we > can go in contrib/hadoop-consumer/ and run: > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > The problem is that this DataGenerator class can take only one Kafka broker > in its parameters (the properties file) and thus generates only one offset > file. > > The Map/Reduce job will then spawn one map task for each offset file it > finds in its input directory, and each of these map tasks will connect to a > different Kafka broker. Since the DataGenerator can only generate one > offset > file, the Map/Reduce job only spawns one map task which queries only one > Kafka broker. > > Unless my assumptions are wrong or someone else provides a nice alternative > solution, I was planning to modify the DataGenerator class so that it can > generate multiple offset files, but for now, as a manual work-around, I > just > duplicated the offset files and specified a different Kafka broker in each. > > Other than that, I am thinking perhaps a more robust solution would be to > have ZK-based discovery of the available brokers. Again, I'm curious to > find > out how this is done at LinkedIn or elsewhere? > > The second problem is when I run the M/R job. If I run it with the multiple > offset files I manually generated as its input, it does spawn three map > tasks, as expected, but it then fails with the following error: > > java.io.IOException: java.io.EOFException > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166) > at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) > at > org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) > at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) > at org.apache.hadoop.mapred.Child$4.run(Child.java:270) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) > at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by: > java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:180) > at > org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63) > at > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101) > at > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945) > at > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077) > at > org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76) > at > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128) > ... 11 more > > > It fails before writing anything whatsoever, and it fails repeatedly for
-
Re: How to use the hadoop consumer in distributed mode?Richard Park 2011-10-18, 21:52
Does the version in contrib contain the fixes for Kafka-131? The offsets
were incorrectly computed prior to this patch. At LinkedIn, this is what we do in a nutshell. 1. We connect to the zookeeper instance. With this we are able to discover the topics, the brokers and the partitions of a broker. 2. For a topic we want to pull, we create files that contains the offset for each broker and partition. Each individual file contains a unique broker/partition pair. This is essentially what data generator does, except we use values from zookeeper. We take the output of the previous run of kafka (the new offsets) and use them as the new offset files. If the old offset doesn't exist, we set a default starting offset. 3. We run the pull hadoop job. One mapper per broker/partition pulls using the simple consumer into hdfs (the KafkaETLRecordReader handles most of this). We query kafka for the latest offset. The mapper fetches from the kafka broker until the latest offset is reached. 4. We group the data by hourly partition with a reduce step. 5. The kafka hadoop job's mapper spits out new offsets for the next time we decide to pull the data. The pull occurs at regular scheduled intervals quite frequently. That's the gist of it. There are a few additional modification we made to the kafka job including the ability to handle unavailable nodes, avro schema resolution and auditing. Thanks, -Richard On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > Is it possible that this is due to a hadoop version mismatch? Typically if > the client jar you pick up does not match the hadoop version of your hadoop > cluster you get EOFException. > > -Jay > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < > [EMAIL PROTECTED]> wrote: > > > Hello everyone :) ! > > > > I have trouble using the Kafka hadoop consumer included in > > contrib/hadoop-consumer and I'd like to know if/how it is used at > > or elsewhere? I would also like if someone could confirm or correct the > > assumptions I make below. > > > > Here's what I have so far: > > > > It works when pulling from one Kafka broker, but not when pulling from > > many. There are two problems: > > > > The first problem concerns the offset files that the Map/Reduce job takes > > as > > its input. From what I understand, these offset files represent the > offset > > to start reading from on each of the Kafka brokers. > > > > To generate those files the first time (and thus start from offset -1), > we > > can go in contrib/hadoop-consumer/ and run: > > > > ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties > > > > The problem is that this DataGenerator class can take only one Kafka > broker > > in its parameters (the properties file) and thus generates only one > offset > > file. > > > > The Map/Reduce job will then spawn one map task for each offset file it > > finds in its input directory, and each of these map tasks will connect to > a > > different Kafka broker. Since the DataGenerator can only generate one > > offset > > file, the Map/Reduce job only spawns one map task which queries only one > > Kafka broker. > > > > Unless my assumptions are wrong or someone else provides a nice > alternative > > solution, I was planning to modify the DataGenerator class so that it can > > generate multiple offset files, but for now, as a manual work-around, I > > just > > duplicated the offset files and specified a different Kafka broker in > each. > > > > Other than that, I am thinking perhaps a more robust solution would be to > > have ZK-based discovery of the available brokers. Again, I'm curious to > > find > > out how this is done at LinkedIn or elsewhere? > > > > The second problem is when I run the M/R job. If I run it with the > multiple > > offset files I manually generated as its input, it does spawn three map > > tasks, as expected, but it then fails with the following error: > > > > java.io.IOException: java.io.EOFException
-
Re: How to use the hadoop consumer in distributed mode?Felix Giguere Villegas 2011-10-18, 22:18
Thanks for your replies guys :)
@Jay: I thought about the Hadoop version mismatch too, because I've had the same problem before. I'll double check again to make sure I have the same versions of hadoop everywhere, as the Kafka distributed cluster I was testing on is a new setup and I might have forgotten to put the hadoop jars we use in it... I'm working part-time for now so I probably won't touch this again until next week but I'll keep you guys posted ASAP :) @Richard: Thanks a lot for your description. That clears out the inaccuracies in my understanding. Is there any chance you guys might release the code you use to query ZK and create appropriate offset files for each broker/partition pair? The hadoop consumer provided in the source works with the setup we get from the quickstart guide, but the process you describe seems more appropriate for production use. Thanks again :) -- Felix On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <[EMAIL PROTECTED]>wrote: > Does the version in contrib contain the fixes for Kafka-131? The offsets > were incorrectly computed prior to this patch. > > At LinkedIn, this is what we do in a nutshell. > 1. We connect to the zookeeper instance. With this we are able to discover > the topics, the brokers and the partitions of a broker. > > 2. For a topic we want to pull, we create files that contains the offset > for > each broker and partition. Each individual file contains a unique > broker/partition pair. This is essentially what data generator does, except > we use values from zookeeper. We take the output of the previous run of > kafka (the new offsets) and use them as the new offset files. If the old > offset doesn't exist, we set a default starting offset. > > 3. We run the pull hadoop job. One mapper per broker/partition pulls using > the simple consumer into hdfs (the KafkaETLRecordReader handles most of > this). We query kafka for the latest offset. The mapper fetches from the > kafka broker until the latest offset is reached. > > 4. We group the data by hourly partition with a reduce step. > > 5. The kafka hadoop job's mapper spits out new offsets for the next time we > decide to pull the data. The pull occurs at regular scheduled intervals > quite frequently. > > That's the gist of it. There are a few additional modification we made to > the kafka job including the ability to handle unavailable nodes, avro > schema > resolution and auditing. > > Thanks, > -Richard > > > > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > > > Is it possible that this is due to a hadoop version mismatch? Typically > if > > the client jar you pick up does not match the hadoop version of your > hadoop > > cluster you get EOFException. > > > > -Jay > > > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < > > [EMAIL PROTECTED]> wrote: > > > > > Hello everyone :) ! > > > > > > I have trouble using the Kafka hadoop consumer included in > > > contrib/hadoop-consumer and I'd like to know if/how it is used at > > > or elsewhere? I would also like if someone could confirm or correct the > > > assumptions I make below. > > > > > > Here's what I have so far: > > > > > > It works when pulling from one Kafka broker, but not when pulling from > > > many. There are two problems: > > > > > > The first problem concerns the offset files that the Map/Reduce job > takes > > > as > > > its input. From what I understand, these offset files represent the > > offset > > > to start reading from on each of the Kafka brokers. > > > > > > To generate those files the first time (and thus start from offset -1), > > we > > > can go in contrib/hadoop-consumer/ and run: > > > > > > ./run-class.sh kafka.etl.impl.DataGenerator > my-properties-file.properties > > > > > > The problem is that this DataGenerator class can take only one Kafka > > broker > > > in its parameters (the properties file) and thus generates only one > > offset > > > file. > > > > > > The Map/Reduce job will then spawn one map task for each offset file it
-
Re: How to use the hadoop consumer in distributed mode?Jay Kreps 2011-10-18, 22:28
I would actually love for us to release the full ETL system we have for
Kafka/Hadoop, it is just a matter of finding the time to get this code into that shape. The hadoop team that maintains that code is pretty busy right now, but i am hoping we can find a way. -Jay On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas < [EMAIL PROTECTED]> wrote: > Thanks for your replies guys :) > > @Jay: I thought about the Hadoop version mismatch too, because I've had the > same problem before. I'll double check again to make sure I have the same > versions of hadoop everywhere, as the Kafka distributed cluster I was > testing on is a new setup and I might have forgotten to put the hadoop jars > we use in it... I'm working part-time for now so I probably won't touch > this > again until next week but I'll keep you guys posted ASAP :) > > @Richard: Thanks a lot for your description. That clears out the > inaccuracies in my understanding. Is there any chance you guys might > release > the code you use to query ZK and create appropriate offset files for each > broker/partition pair? The hadoop consumer provided in the source works > with > the setup we get from the quickstart guide, but the process you describe > seems more appropriate for production use. > > Thanks again :) > > -- > Felix > > > > On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <[EMAIL PROTECTED] > >wrote: > > > Does the version in contrib contain the fixes for Kafka-131? The offsets > > were incorrectly computed prior to this patch. > > > > At LinkedIn, this is what we do in a nutshell. > > 1. We connect to the zookeeper instance. With this we are able to > discover > > the topics, the brokers and the partitions of a broker. > > > > 2. For a topic we want to pull, we create files that contains the offset > > for > > each broker and partition. Each individual file contains a unique > > broker/partition pair. This is essentially what data generator does, > except > > we use values from zookeeper. We take the output of the previous run of > > kafka (the new offsets) and use them as the new offset files. If the old > > offset doesn't exist, we set a default starting offset. > > > > 3. We run the pull hadoop job. One mapper per broker/partition pulls > using > > the simple consumer into hdfs (the KafkaETLRecordReader handles most of > > this). We query kafka for the latest offset. The mapper fetches from the > > kafka broker until the latest offset is reached. > > > > 4. We group the data by hourly partition with a reduce step. > > > > 5. The kafka hadoop job's mapper spits out new offsets for the next time > we > > decide to pull the data. The pull occurs at regular scheduled intervals > > quite frequently. > > > > That's the gist of it. There are a few additional modification we made to > > the kafka job including the ability to handle unavailable nodes, avro > > schema > > resolution and auditing. > > > > Thanks, > > -Richard > > > > > > > > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > > > > > Is it possible that this is due to a hadoop version mismatch? Typically > > if > > > the client jar you pick up does not match the hadoop version of your > > hadoop > > > cluster you get EOFException. > > > > > > -Jay > > > > > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < > > > [EMAIL PROTECTED]> wrote: > > > > > > > Hello everyone :) ! > > > > > > > > I have trouble using the Kafka hadoop consumer included in > > > > contrib/hadoop-consumer and I'd like to know if/how it is used at > > > > or elsewhere? I would also like if someone could confirm or correct > the > > > > assumptions I make below. > > > > > > > > Here's what I have so far: > > > > > > > > It works when pulling from one Kafka broker, but not when pulling > from > > > > many. There are two problems: > > > > > > > > The first problem concerns the offset files that the Map/Reduce job > > takes > > > > as > > > > its input. From what I understand, these offset files represent the
-
Re: How to use the hadoop consumer in distributed mode?Hisham Mardam-Bey 2011-10-18, 23:31
Hi folks, been following this thread, Felix and I are working together
on this project, we really like Kafka and are moving it into production very soon. Jay, question, would you guys consider releasing the code in a "not so clean state" and have the community (we would like to help) shore it up so it becomes usable by the masses or are there other issues (legal?) you have to sort out first? Thanks! hisham. On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > I would actually love for us to release the full ETL system we have for > Kafka/Hadoop, it is just a matter of finding the time to get this code into > that shape. > > The hadoop team that maintains that code is pretty busy right now, but i am > hoping we can find a way. > > -Jay > > On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas < > [EMAIL PROTECTED]> wrote: > >> Thanks for your replies guys :) >> >> @Jay: I thought about the Hadoop version mismatch too, because I've had the >> same problem before. I'll double check again to make sure I have the same >> versions of hadoop everywhere, as the Kafka distributed cluster I was >> testing on is a new setup and I might have forgotten to put the hadoop jars >> we use in it... I'm working part-time for now so I probably won't touch >> this >> again until next week but I'll keep you guys posted ASAP :) >> >> @Richard: Thanks a lot for your description. That clears out the >> inaccuracies in my understanding. Is there any chance you guys might >> release >> the code you use to query ZK and create appropriate offset files for each >> broker/partition pair? The hadoop consumer provided in the source works >> with >> the setup we get from the quickstart guide, but the process you describe >> seems more appropriate for production use. >> >> Thanks again :) >> >> -- >> Felix >> >> >> >> On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <[EMAIL PROTECTED] >> >wrote: >> >> > Does the version in contrib contain the fixes for Kafka-131? The offsets >> > were incorrectly computed prior to this patch. >> > >> > At LinkedIn, this is what we do in a nutshell. >> > 1. We connect to the zookeeper instance. With this we are able to >> discover >> > the topics, the brokers and the partitions of a broker. >> > >> > 2. For a topic we want to pull, we create files that contains the offset >> > for >> > each broker and partition. Each individual file contains a unique >> > broker/partition pair. This is essentially what data generator does, >> except >> > we use values from zookeeper. We take the output of the previous run of >> > kafka (the new offsets) and use them as the new offset files. If the old >> > offset doesn't exist, we set a default starting offset. >> > >> > 3. We run the pull hadoop job. One mapper per broker/partition pulls >> using >> > the simple consumer into hdfs (the KafkaETLRecordReader handles most of >> > this). We query kafka for the latest offset. The mapper fetches from the >> > kafka broker until the latest offset is reached. >> > >> > 4. We group the data by hourly partition with a reduce step. >> > >> > 5. The kafka hadoop job's mapper spits out new offsets for the next time >> we >> > decide to pull the data. The pull occurs at regular scheduled intervals >> > quite frequently. >> > >> > That's the gist of it. There are a few additional modification we made to >> > the kafka job including the ability to handle unavailable nodes, avro >> > schema >> > resolution and auditing. >> > >> > Thanks, >> > -Richard >> > >> > >> > >> > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: >> > >> > > Is it possible that this is due to a hadoop version mismatch? Typically >> > if >> > > the client jar you pick up does not match the hadoop version of your >> > hadoop >> > > cluster you get EOFException. >> > > >> > > -Jay >> > > >> > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas < >> > > [EMAIL PROTECTED]> wrote: >> > > >> > > > Hello everyone :) ! Hisham Mardam Bey A: Because it messes up the order in which people normally read text. Q: Why is top-posting such a bad thing? A: Top-posting. Q: What is the most annoying thing in e-mail? -=[ Codito Ergo Sum ]=-
-
Re: How to use the hadoop consumer in distributed mode?Felix GV 2011-10-26, 20:10
Hi,
I wanted to give a little update on this topic. I was able to make hadoop-consumer work with a kafka cluster. What I did is: 1. I generated a .properties file for one of the kafka brokers I wanted to connect to. 2. I ran the DataGenerator program by passing the .properties file as a parameter. 3. I moved the 1.dat offset file generated in HDFS so that it has another name (so that it's not overwritten the next time I run the DataGenerator). 4. I changed the the broker's address in the .properties file to the next server I wanted to connect to. 5. I repeated step 2 to 4 for every kafka server in the cluster. 6. I then ran SimpleKafkaETLJob and it was able to spawn one map task per broker and pull all the data from each. This is almost exactly what I was trying before, except that before, I had manually modified the .dat offset files instead of generating each one with the DataGenerator, and I think vim didn't play nice with the SEQ files or something like that... I don't know. Anyhow, what I'm doing now is a little convoluted but at least it works... I will create a script that does all this repetitive stuff for me. Ideally, I would also like to pull the brokers list from ZK, like you guys do. The Kafka/Hadoop ETL tools you mentioned are no doubt more mature and complete than the stuff I will create, so it would be really nice if you could release it. I think releasing those tools would help drive the adoption of Kafka, because in the state it's in now, Kafka is not really plug and play. That is, it works (which is already better than a lot of open source projects out there ;) !) but it seems a rather important part is missing. -- Felix On Tue, Oct 18, 2011 at 7:31 PM, Hisham Mardam-Bey <[EMAIL PROTECTED]>wrote: > Hi folks, been following this thread, Felix and I are working together > on this project, we really like Kafka and are moving it into > production very soon. > > Jay, question, would you guys consider releasing the code in a "not so > clean state" and have the community (we would like to help) shore it > up so it becomes usable by the masses or are there other issues > (legal?) you have to sort out first? > > Thanks! > > hisham. > > On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > > I would actually love for us to release the full ETL system we have for > > Kafka/Hadoop, it is just a matter of finding the time to get this code > into > > that shape. > > > > The hadoop team that maintains that code is pretty busy right now, but i > am > > hoping we can find a way. > > > > -Jay > > > > On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas < > > [EMAIL PROTECTED]> wrote: > > > >> Thanks for your replies guys :) > >> > >> @Jay: I thought about the Hadoop version mismatch too, because I've had > the > >> same problem before. I'll double check again to make sure I have the > same > >> versions of hadoop everywhere, as the Kafka distributed cluster I was > >> testing on is a new setup and I might have forgotten to put the hadoop > jars > >> we use in it... I'm working part-time for now so I probably won't touch > >> this > >> again until next week but I'll keep you guys posted ASAP :) > >> > >> @Richard: Thanks a lot for your description. That clears out the > >> inaccuracies in my understanding. Is there any chance you guys might > >> release > >> the code you use to query ZK and create appropriate offset files for > each > >> broker/partition pair? The hadoop consumer provided in the source works > >> with > >> the setup we get from the quickstart guide, but the process you describe > >> seems more appropriate for production use. > >> > >> Thanks again :) > >> > >> -- > >> Felix > >> > >> > >> > >> On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <[EMAIL PROTECTED] > >> >wrote: > >> > >> > Does the version in contrib contain the fixes for Kafka-131? The > offsets > >> > were incorrectly computed prior to this patch. > >> > > >> > At LinkedIn, this is what we do in a nutshell.
-
Re: How to use the hadoop consumer in distributed mode?Richard Park 2011-10-26, 20:33
Jay and I had a talk about this and we would like to release it as soon as
we can. There are a few LinkedIn specific coding that need to be abstracted out first. We also use Avro heavily, and so much of our code is written with that in mind. It should be easy enough to abstract the Avro out, but we may release that part of the code as is. Anyways, we're evaluating what can be released and what needs to be cleaned-up but we hope to get something out there soon. On Wed, Oct 26, 2011 at 1:10 PM, Felix GV <[EMAIL PROTECTED]> wrote: > Hi, > > I wanted to give a little update on this topic. > > I was able to make hadoop-consumer work with a kafka cluster. > > What I did is: > > 1. I generated a .properties file for one of the kafka brokers I wanted > to connect to. > 2. I ran the DataGenerator program by passing the .properties file as a > parameter. > 3. I moved the 1.dat offset file generated in HDFS so that it has another > name (so that it's not overwritten the next time I run the > DataGenerator). > 4. I changed the the broker's address in the .properties file to the next > server I wanted to connect to. > 5. I repeated step 2 to 4 for every kafka server in the cluster. > 6. I then ran SimpleKafkaETLJob and it was able to spawn one map task per > broker and pull all the data from each. > > This is almost exactly what I was trying before, except that before, I had > manually modified the .dat offset files instead of generating each one with > the DataGenerator, and I think vim didn't play nice with the SEQ files or > something like that... I don't know. > > Anyhow, what I'm doing now is a little convoluted but at least it works... > I > will create a script that does all this repetitive stuff for me. Ideally, I > would also like to pull the brokers list from ZK, like you guys do. > > The Kafka/Hadoop ETL tools you mentioned are no doubt more mature and > complete than the stuff I will create, so it would be really nice if you > could release it. > > I think releasing those tools would help drive the adoption of Kafka, > because in the state it's in now, Kafka is not really plug and play. That > is, it works (which is already better than a lot of open source projects > out > there ;) !) but it seems a rather important part is missing. > > -- > Felix > > > > On Tue, Oct 18, 2011 at 7:31 PM, Hisham Mardam-Bey <[EMAIL PROTECTED] > >wrote: > > > Hi folks, been following this thread, Felix and I are working together > > on this project, we really like Kafka and are moving it into > > production very soon. > > > > Jay, question, would you guys consider releasing the code in a "not so > > clean state" and have the community (we would like to help) shore it > > up so it becomes usable by the masses or are there other issues > > (legal?) you have to sort out first? > > > > Thanks! > > > > hisham. > > > > On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > > > I would actually love for us to release the full ETL system we have for > > > Kafka/Hadoop, it is just a matter of finding the time to get this code > > into > > > that shape. > > > > > > The hadoop team that maintains that code is pretty busy right now, but > i > > am > > > hoping we can find a way. > > > > > > -Jay > > > > > > On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas < > > > [EMAIL PROTECTED]> wrote: > > > > > >> Thanks for your replies guys :) > > >> > > >> @Jay: I thought about the Hadoop version mismatch too, because I've > had > > the > > >> same problem before. I'll double check again to make sure I have the > > same > > >> versions of hadoop everywhere, as the Kafka distributed cluster I was > > >> testing on is a new setup and I might have forgotten to put the hadoop > > jars > > >> we use in it... I'm working part-time for now so I probably won't > touch > > >> this > > >> again until next week but I'll keep you guys posted ASAP :) > > >> > > >> @Richard: Thanks a lot for your description. That clears out the
-
Re: How to use the hadoop consumer in distributed mode?Felix GV 2011-10-26, 20:51
Awesome :) !
Do you have any timeline as to when you think you could release this? This is not to rush you, but we are going to switch some of our priorities around based on whether the answer is next week or next month of 3 months from now or more ;) Thanks for your great work! I think I speak for everyone when I say it's really appreciated :) -- Felix On Wed, Oct 26, 2011 at 4:33 PM, Richard Park <[EMAIL PROTECTED]>wrote: > Jay and I had a talk about this and we would like to release it as soon as > we can. There are a few LinkedIn specific coding that need to be abstracted > out first. > > We also use Avro heavily, and so much of our code is written with that in > mind. It should be easy enough to abstract the Avro out, but we may release > that part of the code as is. > > Anyways, we're evaluating what can be released and what needs to be > cleaned-up but we hope to get something out there soon. > > On Wed, Oct 26, 2011 at 1:10 PM, Felix GV <[EMAIL PROTECTED]> wrote: > > > Hi, > > > > I wanted to give a little update on this topic. > > > > I was able to make hadoop-consumer work with a kafka cluster. > > > > What I did is: > > > > 1. I generated a .properties file for one of the kafka brokers I wanted > > to connect to. > > 2. I ran the DataGenerator program by passing the .properties file as a > > parameter. > > 3. I moved the 1.dat offset file generated in HDFS so that it has > another > > name (so that it's not overwritten the next time I run the > > DataGenerator). > > 4. I changed the the broker's address in the .properties file to the > next > > server I wanted to connect to. > > 5. I repeated step 2 to 4 for every kafka server in the cluster. > > 6. I then ran SimpleKafkaETLJob and it was able to spawn one map task > per > > broker and pull all the data from each. > > > > This is almost exactly what I was trying before, except that before, I > had > > manually modified the .dat offset files instead of generating each one > with > > the DataGenerator, and I think vim didn't play nice with the SEQ files or > > something like that... I don't know. > > > > Anyhow, what I'm doing now is a little convoluted but at least it > works... > > I > > will create a script that does all this repetitive stuff for me. Ideally, > I > > would also like to pull the brokers list from ZK, like you guys do. > > > > The Kafka/Hadoop ETL tools you mentioned are no doubt more mature and > > complete than the stuff I will create, so it would be really nice if you > > could release it. > > > > I think releasing those tools would help drive the adoption of Kafka, > > because in the state it's in now, Kafka is not really plug and play. That > > is, it works (which is already better than a lot of open source projects > > out > > there ;) !) but it seems a rather important part is missing. > > > > -- > > Felix > > > > > > > > On Tue, Oct 18, 2011 at 7:31 PM, Hisham Mardam-Bey <[EMAIL PROTECTED] > > >wrote: > > > > > Hi folks, been following this thread, Felix and I are working together > > > on this project, we really like Kafka and are moving it into > > > production very soon. > > > > > > Jay, question, would you guys consider releasing the code in a "not so > > > clean state" and have the community (we would like to help) shore it > > > up so it becomes usable by the masses or are there other issues > > > (legal?) you have to sort out first? > > > > > > Thanks! > > > > > > hisham. > > > > > > On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <[EMAIL PROTECTED]> > wrote: > > > > I would actually love for us to release the full ETL system we have > for > > > > Kafka/Hadoop, it is just a matter of finding the time to get this > code > > > into > > > > that shape. > > > > > > > > The hadoop team that maintains that code is pretty busy right now, > but > > i > > > am > > > > hoping we can find a way. > > > > > > > > -Jay > > > > > > > > On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas < > > > > [EMAIL PROTECTED]> wrote: |