|
Pratyush Chandra
2012-12-27, 09:42
David Arthur
2012-12-27, 15:08
Pratyush Chandra
2012-12-28, 13:57
Matthew Rathbone
2012-12-28, 14:32
Pratyush Chandra
2012-12-28, 15:22
Liam Stewart
2012-12-28, 16:06
Russell Jurney
2012-12-28, 23:46
Chetan Conikee
2012-12-29, 06:15
|
-
S3 ConsumerPratyush Chandra 2012-12-27, 09:42
Hi,
I am looking for a S3 based consumer, which can write all the received events to S3 bucket (say every minute). Something similar to Flume HDFSSink http://flume.apache.org/FlumeUserGuide.html#hdfs-sink I have tried evaluating hadoop-consumer in contrib folder. But it seems to be more for offline processing, which will fetch everything from offset 0 at once and replace it in S3 bucket. Any help would be appreciated ? -- Pratyush Chandra
-
Re: S3 ConsumerDavid Arthur 2012-12-27, 15:08
I don't think anything exists like this in Kafka (or contrib), but it
would be a useful addition! Personally, I have written this exact thing at previous jobs. As for the Hadoop consumer, since there is a FileSystem implementation for S3 in Hadoop, it should be possible. The Hadoop consumer works by writing out data files containing the Kafka messages along side offset files which contain the last offset read for each partition. If it is re-consuming from zero each time you run it, it means it's not finding the offset files from the previous run. Having used it a bit, the Hadoop consumer is certainly an area that could use improvement. HTH, David On 12/27/12 4:41 AM, Pratyush Chandra wrote: > Hi, > > I am looking for a S3 based consumer, which can write all the received > events to S3 bucket (say every minute). Something similar to Flume HDFSSink > http://flume.apache.org/FlumeUserGuide.html#hdfs-sink > I have tried evaluating hadoop-consumer in contrib folder. But it seems to > be more for offline processing, which will fetch everything from offset 0 > at once and replace it in S3 bucket. > Any help would be appreciated ? >
-
Re: S3 ConsumerPratyush Chandra 2012-12-28, 13:57
I went through the source code of Hadoop consumer in contrib. It doesn't
seem to be using previous offset at all. Neither in Data Generator or in Map reduce stage. Before I go into the implementation, I can think of 2 ways : 1. A consumerconnector receiving all the messages continuously, and then writing it to HDFS (in this case S3). Problem is autocommit is handled internally, and there is no handler function while committing offset, which can be used to upload file. 2. Wake up every one minute, pull all the data using simple consumer into a local file and put to HDFS. So, what is better approach ? - Listen continuously vs in batch - Use consumerconnector (where auto commit/offsets are handled internally) vs simple consumer (which doesnot use zk, so I need to connect to each broker individually) Pratyush On Thu, Dec 27, 2012 at 8:38 PM, David Arthur <[EMAIL PROTECTED]> wrote: > I don't think anything exists like this in Kafka (or contrib), but it > would be a useful addition! Personally, I have written this exact thing at > previous jobs. > > As for the Hadoop consumer, since there is a FileSystem implementation for > S3 in Hadoop, it should be possible. The Hadoop consumer works by writing > out data files containing the Kafka messages along side offset files which > contain the last offset read for each partition. If it is re-consuming from > zero each time you run it, it means it's not finding the offset files from > the previous run. > > Having used it a bit, the Hadoop consumer is certainly an area that could > use improvement. > > HTH, > David > > > On 12/27/12 4:41 AM, Pratyush Chandra wrote: > >> Hi, >> >> I am looking for a S3 based consumer, which can write all the received >> events to S3 bucket (say every minute). Something similar to Flume >> HDFSSink >> http://flume.apache.org/**FlumeUserGuide.html#hdfs-sink<http://flume.apache.org/FlumeUserGuide.html#hdfs-sink> >> I have tried evaluating hadoop-consumer in contrib folder. But it seems to >> be more for offline processing, which will fetch everything from offset 0 >> at once and replace it in S3 bucket. >> Any help would be appreciated ? >> >> > -- Pratyush Chandra
-
Re: S3 ConsumerMatthew Rathbone 2012-12-28, 14:32
So the hadoop consumer does use the latest offset, it reads it from the
'input' directory in the record reader. We have a heavily modified version of the hadoop consumer that reads / writes offsets to zookeeper [much like the scala consumers] and this works great. FWIW we also use the hadoop consumer to write to S3 without any issues, much like any ordinary mapreduce job, and it's pretty solid. We run our job every 10-30 minutes. Maybe also interesting is that we used to use Flume [0.9], and find the kafka method of consuming to be much better during s3 networking issues. With flume if you 'push' to s3, but something goes wrong it can fall over and you can fairly easily lose data, with the hadoop kafka consumer the mapper just fails-over and tries again, which is a little wasteful (you're reading the records twice), but generally great. On Fri, Dec 28, 2012 at 1:56 PM, Pratyush Chandra < [EMAIL PROTECTED]> wrote: > I went through the source code of Hadoop consumer in contrib. It doesn't > seem to be using previous offset at all. Neither in Data Generator or in > Map reduce stage. > > Before I go into the implementation, I can think of 2 ways : > 1. A consumerconnector receiving all the messages continuously, and then > writing it to HDFS (in this case S3). Problem is autocommit is handled > internally, and there is no handler function while committing offset, which > can be used to upload file. > 2. Wake up every one minute, pull all the data using simple consumer into a > local file and put to HDFS. > > So, what is better approach ? > - Listen continuously vs in batch > - Use consumerconnector (where auto commit/offsets are handled internally) > vs simple consumer (which doesnot use zk, so I need to connect to each > broker individually) > > Pratyush > > On Thu, Dec 27, 2012 at 8:38 PM, David Arthur <[EMAIL PROTECTED]> wrote: > > > I don't think anything exists like this in Kafka (or contrib), but it > > would be a useful addition! Personally, I have written this exact thing > at > > previous jobs. > > > > As for the Hadoop consumer, since there is a FileSystem implementation > for > > S3 in Hadoop, it should be possible. The Hadoop consumer works by writing > > out data files containing the Kafka messages along side offset files > which > > contain the last offset read for each partition. If it is re-consuming > from > > zero each time you run it, it means it's not finding the offset files > from > > the previous run. > > > > Having used it a bit, the Hadoop consumer is certainly an area that could > > use improvement. > > > > HTH, > > David > > > > > > On 12/27/12 4:41 AM, Pratyush Chandra wrote: > > > >> Hi, > >> > >> I am looking for a S3 based consumer, which can write all the received > >> events to S3 bucket (say every minute). Something similar to Flume > >> HDFSSink > >> http://flume.apache.org/**FlumeUserGuide.html#hdfs-sink< > http://flume.apache.org/FlumeUserGuide.html#hdfs-sink> > >> I have tried evaluating hadoop-consumer in contrib folder. But it seems > to > >> be more for offline processing, which will fetch everything from offset > 0 > >> at once and replace it in S3 bucket. > >> Any help would be appreciated ? > >> > >> > > > > > -- > Pratyush Chandra > -- Matthew Rathbone Foursquare | Software Engineer | Server Engineering Team [EMAIL PROTECTED] | @rathboma <http://twitter.com/rathboma> | 4sq<http://foursquare.com/rathboma>
-
Re: S3 ConsumerPratyush Chandra 2012-12-28, 15:22
Hi Matthew,
I may be doing something wrong. I cloned the code at https://github.com/apache/kafka/tree/trunk/contrib/hadoop-consumer I am running following : - ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties which generates a /tmp/kafka/data/1.dat file containing Dump tcp://localhost:9092 atlas-topic1 0 -1 to /tmp/kafka/data/1.dat - ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties It says Using offset range [0, 42649] Connected to node tcp://localhost:9092 beginning reading at offset 0 latest offset=42649 Again I run again - ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties It says Using offset range [0, 42759] Connected to node tcp://localhost:9092 beginning reading at offset 0 latest offset=42759 My test.properties contain local file system for input/output to test kafka.etl.topic=topic1 hdfs.default.classpath.dir=/tmp/kafka/lib event.count=1000 hadoop.job.ugi=kafka,hadoop kafka.server.uri=tcp://localhost:9092 input=/tmp/kafka/data output=/tmp/kafka/output kafka.request.limit=-1 client.buffer.size=1048576 client.so.timeout=60000 Pratyush On Fri, Dec 28, 2012 at 8:02 PM, Matthew Rathbone <[EMAIL PROTECTED]>wrote: > So the hadoop consumer does use the latest offset, it reads it from the > 'input' directory in the record reader. > > We have a heavily modified version of the hadoop consumer that reads / > writes offsets to zookeeper [much like the scala consumers] and this works > great. > > FWIW we also use the hadoop consumer to write to S3 without any issues, > much like any ordinary mapreduce job, and it's pretty solid. We run our job > every 10-30 minutes. > > Maybe also interesting is that we used to use Flume [0.9], and find the > kafka method of consuming to be much better during s3 networking issues. > With flume if you 'push' to s3, but something goes wrong it can fall over > and you can fairly easily lose data, with the hadoop kafka consumer the > mapper just fails-over and tries again, which is a little wasteful (you're > reading the records twice), but generally great. > > > > On Fri, Dec 28, 2012 at 1:56 PM, Pratyush Chandra < > [EMAIL PROTECTED]> wrote: > > > I went through the source code of Hadoop consumer in contrib. It doesn't > > seem to be using previous offset at all. Neither in Data Generator or in > > Map reduce stage. > > > > Before I go into the implementation, I can think of 2 ways : > > 1. A consumerconnector receiving all the messages continuously, and then > > writing it to HDFS (in this case S3). Problem is autocommit is handled > > internally, and there is no handler function while committing offset, > which > > can be used to upload file. > > 2. Wake up every one minute, pull all the data using simple consumer > into a > > local file and put to HDFS. > > > > So, what is better approach ? > > - Listen continuously vs in batch > > - Use consumerconnector (where auto commit/offsets are handled > internally) > > vs simple consumer (which doesnot use zk, so I need to connect to each > > broker individually) > > > > Pratyush > > > > On Thu, Dec 27, 2012 at 8:38 PM, David Arthur <[EMAIL PROTECTED]> wrote: > > > > > I don't think anything exists like this in Kafka (or contrib), but it > > > would be a useful addition! Personally, I have written this exact thing > > at > > > previous jobs. > > > > > > As for the Hadoop consumer, since there is a FileSystem implementation > > for > > > S3 in Hadoop, it should be possible. The Hadoop consumer works by > writing > > > out data files containing the Kafka messages along side offset files > > which > > > contain the last offset read for each partition. If it is re-consuming > > from > > > zero each time you run it, it means it's not finding the offset files > > from > > > the previous run. > > > > > > Having used it a bit, the Hadoop consumer is certainly an area that > could > > > use improvement. > > > > > > HTH, > > > David > > > > > > > > > On 12/27/12 4:41 AM, Pratyush Chandra wrote: Pratyush Chandra
-
Re: S3 ConsumerLiam Stewart 2012-12-28, 16:06
We have a tool that reads data continuously from brokers and then writes
files to S3. A MR job didn't make sense for us given our current size and volume. We have one instance running right now and could add more by if needed, adjusting which instance reads from which brokers/topics/... Unfortunately, part of the implementation is tied to our internals so I can't open source it at this point. The idea is roughly: - one simple consumer per broker / topic / partition; reads data in batches and writes to local temp files; reads are done in parallel - files are finalized after a given size or age (to better handle low volume topics) and then written to S3. all uploads are done in a separate thread pool (using the aws transfer manager) and don't block reads from kafka unless too many files get backed up due to either problems with S3 or upload speed - after a file is written, the next offset to read is written to zookeeper For me, part of the implementation was as an exercise to get experience with zookeeper so that was one reason for using the lower-level API and handling offset tracking etc myself. On Fri, Dec 28, 2012 at 6:56 AM, Pratyush Chandra < [EMAIL PROTECTED]> wrote: > I went through the source code of Hadoop consumer in contrib. It doesn't > seem to be using previous offset at all. Neither in Data Generator or in > Map reduce stage. > > Before I go into the implementation, I can think of 2 ways : > 1. A consumerconnector receiving all the messages continuously, and then > writing it to HDFS (in this case S3). Problem is autocommit is handled > internally, and there is no handler function while committing offset, which > can be used to upload file. > 2. Wake up every one minute, pull all the data using simple consumer into a > local file and put to HDFS. > > So, what is better approach ? > - Listen continuously vs in batch > - Use consumerconnector (where auto commit/offsets are handled internally) > vs simple consumer (which doesnot use zk, so I need to connect to each > broker individually) > > Pratyush > > On Thu, Dec 27, 2012 at 8:38 PM, David Arthur <[EMAIL PROTECTED]> wrote: > > > I don't think anything exists like this in Kafka (or contrib), but it > > would be a useful addition! Personally, I have written this exact thing > at > > previous jobs. > > > > As for the Hadoop consumer, since there is a FileSystem implementation > for > > S3 in Hadoop, it should be possible. The Hadoop consumer works by writing > > out data files containing the Kafka messages along side offset files > which > > contain the last offset read for each partition. If it is re-consuming > from > > zero each time you run it, it means it's not finding the offset files > from > > the previous run. > > > > Having used it a bit, the Hadoop consumer is certainly an area that could > > use improvement. > > > > HTH, > > David > > > > > > On 12/27/12 4:41 AM, Pratyush Chandra wrote: > > > >> Hi, > >> > >> I am looking for a S3 based consumer, which can write all the received > >> events to S3 bucket (say every minute). Something similar to Flume > >> HDFSSink > >> http://flume.apache.org/**FlumeUserGuide.html#hdfs-sink< > http://flume.apache.org/FlumeUserGuide.html#hdfs-sink> > >> I have tried evaluating hadoop-consumer in contrib folder. But it seems > to > >> be more for offline processing, which will fetch everything from offset > 0 > >> at once and replace it in S3 bucket. > >> Any help would be appreciated ? > >> > >> > > > > > -- > Pratyush Chandra > -- Liam Stewart :: [EMAIL PROTECTED]
-
Re: S3 ConsumerRussell Jurney 2012-12-28, 23:46
Would you please contribute this to open source? What you've written
has been asked for many times. FWIW, I would immediately incorporate it into my book, Agile Data. Russell Jurney http://datasyndrome.com On Dec 28, 2012, at 8:06 AM, Liam Stewart <[EMAIL PROTECTED]> wrote: > We have a tool that reads data continuously from brokers and then writes > files to S3. A MR job didn't make sense for us given our current size and > volume. We have one instance running right now and could add more by if > needed, adjusting which instance reads from which brokers/topics/... > Unfortunately, part of the implementation is tied to our internals so I > can't open source it at this point. The idea is roughly: > > - one simple consumer per broker / topic / partition; reads data in batches > and writes to local temp files; reads are done in parallel > - files are finalized after a given size or age (to better handle low > volume topics) and then written to S3. all uploads are done in a separate > thread pool (using the aws transfer manager) and don't block reads from > kafka unless too many files get backed up due to either problems with S3 or > upload speed > - after a file is written, the next offset to read is written to zookeeper > > For me, part of the implementation was as an exercise to get experience > with zookeeper so that was one reason for using the lower-level API and > handling offset tracking etc myself. > > On Fri, Dec 28, 2012 at 6:56 AM, Pratyush Chandra < > [EMAIL PROTECTED]> wrote: > >> I went through the source code of Hadoop consumer in contrib. It doesn't >> seem to be using previous offset at all. Neither in Data Generator or in >> Map reduce stage. >> >> Before I go into the implementation, I can think of 2 ways : >> 1. A consumerconnector receiving all the messages continuously, and then >> writing it to HDFS (in this case S3). Problem is autocommit is handled >> internally, and there is no handler function while committing offset, which >> can be used to upload file. >> 2. Wake up every one minute, pull all the data using simple consumer into a >> local file and put to HDFS. >> >> So, what is better approach ? >> - Listen continuously vs in batch >> - Use consumerconnector (where auto commit/offsets are handled internally) >> vs simple consumer (which doesnot use zk, so I need to connect to each >> broker individually) >> >> Pratyush >> >> On Thu, Dec 27, 2012 at 8:38 PM, David Arthur <[EMAIL PROTECTED]> wrote: >> >>> I don't think anything exists like this in Kafka (or contrib), but it >>> would be a useful addition! Personally, I have written this exact thing >> at >>> previous jobs. >>> >>> As for the Hadoop consumer, since there is a FileSystem implementation >> for >>> S3 in Hadoop, it should be possible. The Hadoop consumer works by writing >>> out data files containing the Kafka messages along side offset files >> which >>> contain the last offset read for each partition. If it is re-consuming >> from >>> zero each time you run it, it means it's not finding the offset files >> from >>> the previous run. >>> >>> Having used it a bit, the Hadoop consumer is certainly an area that could >>> use improvement. >>> >>> HTH, >>> David >>> >>> >>> On 12/27/12 4:41 AM, Pratyush Chandra wrote: >>> >>>> Hi, >>>> >>>> I am looking for a S3 based consumer, which can write all the received >>>> events to S3 bucket (say every minute). Something similar to Flume >>>> HDFSSink >>>> http://flume.apache.org/**FlumeUserGuide.html#hdfs-sink< >> http://flume.apache.org/FlumeUserGuide.html#hdfs-sink> >>>> I have tried evaluating hadoop-consumer in contrib folder. But it seems >> to >>>> be more for offline processing, which will fetch everything from offset >> 0 >>>> at once and replace it in S3 bucket. >>>> Any help would be appreciated ? >>>> >>>> >>> >> >> >> -- >> Pratyush Chandra >> > > > > -- > Liam Stewart :: [EMAIL PROTECTED]
-
Re: S3 ConsumerChetan Conikee 2012-12-29, 06:15
Noticed this s3 based consumer project on github
https://github.com/razvan/kafka-s3-consumer On Dec 27, 2012, at 7:08 AM, David Arthur <[EMAIL PROTECTED]> wrote: > I don't think anything exists like this in Kafka (or contrib), but it would be a useful addition! Personally, I have written this exact thing at previous jobs. > > As for the Hadoop consumer, since there is a FileSystem implementation for S3 in Hadoop, it should be possible. The Hadoop consumer works by writing out data files containing the Kafka messages along side offset files which contain the last offset read for each partition. If it is re-consuming from zero each time you run it, it means it's not finding the offset files from the previous run. > > Having used it a bit, the Hadoop consumer is certainly an area that could use improvement. > > HTH, > David > > On 12/27/12 4:41 AM, Pratyush Chandra wrote: >> Hi, >> >> I am looking for a S3 based consumer, which can write all the received >> events to S3 bucket (say every minute). Something similar to Flume HDFSSink >> http://flume.apache.org/FlumeUserGuide.html#hdfs-sink >> I have tried evaluating hadoop-consumer in contrib folder. But it seems to >> be more for offline processing, which will fetch everything from offset 0 >> at once and replace it in S3 bucket. >> Any help would be appreciated ? > |