|
Tom Brown
2012-12-19, 18:49
Joel Koshy
2012-12-19, 19:16
永辉 赵
2012-12-20, 09:07
Neha Narkhede
2012-12-20, 16:56
Joel Koshy
2012-12-20, 18:06
Neha Narkhede
2012-12-20, 18:14
Tom Brown
2012-12-20, 19:32
Neha Narkhede
2012-12-20, 19:43
永辉 赵
2012-12-21, 10:56
Neha Narkhede
2012-12-21, 17:37
Tom Brown
2012-12-21, 17:47
Jun Rao
2012-12-21, 17:57
Yonghui Zhao
2012-12-21, 23:12
Tom Brown
2012-12-22, 01:44
Neha Narkhede
2012-12-23, 19:54
|
-
Proper use of ConsumerConnectorTom Brown 2012-12-19, 18:49
I wish to use either the ConsumerConnector or the SimpleConsumer to
read messages from all partitions across multiple brokers using a fixed number of threads (hopefully leveraging asynchronous IO for high performance). I know that the ConsumerConnector sounds like this, but the documentation was not clear about a few things. Would somebody who has used it (or knows the code) be willing to answer these questions about it: - Does the ConsumerConnector manage connections to multiple brokers, or just a single broker? - Does the ConsumerConnector require a thread for each partition on each broker? (If not, how many threads does it require?) - Does the ConsumerConnector use actual asynchronous IO, or does it mimic it by using a dedicated behind-the-scenes thread (and the traditional java socket API)? If the ConsumerConnector won't serve my purpose I think I could use the SimpleConsumer to implement a version of this, but I worry that it's not thread safe; That is, can I use the same SimpleConsumer instance on Thread-A, then on Thread-B, and then again on Thread-A (though never at the same time)? Thanks in advance! --Tom
-
Re: Proper use of ConsumerConnectorJoel Koshy 2012-12-19, 19:16
In general, you should use the consumer connector - unless you have a good
reason to load balance and manage offsets manually (which is taken care of in the consumer connector). - Does the ConsumerConnector manage connections to multiple brokers, > or just a single broker? > Multiple brokers. > - Does the ConsumerConnector require a thread for each partition on > each broker? (If not, how many threads does it require?) > You can specify how many streams you want - if there are more partitions than threads, then a given thread can consume from multiple partitions. If there are more threads than available partitions, there will be idle threads. > - Does the ConsumerConnector use actual asynchronous IO, or does it > mimic it by using a dedicated behind-the-scenes thread (and the > traditional java socket API)? > The consumer connector uses SimpleConsumers for each broker that it connects to. These consumers fetch from each broker and insert chunks into blocking queues which the consumer iterators then dequeue. Joel
-
Re: Proper use of ConsumerConnector永辉 赵 2012-12-20, 09:07
Hi Joel,
“unless you have a good reason to load balance and manage offsets manually” In general one consumer connector consumes more than one partition. In client side, we want to get all partitions offset for any message, if crash happens(some message is fetched from kafka but the result is not flushed to disk) happens we can use offset info to rewind kafka consumer. Do you think this is a good reason to use SimpleConsumer rather than ConsumerConnector? I think this is a common request, so is there any existed solution? Thanks, Yonghui On 12-12-20 上午3:16, "Joel Koshy" <[EMAIL PROTECTED]> wrote: >In general, you should use the consumer connector - unless you have a good >reason to load balance and manage offsets manually (which is taken care of >in the consumer connector). > > >- Does the ConsumerConnector manage connections to multiple brokers, >> or just a single broker? >> > >Multiple brokers. > > >> - Does the ConsumerConnector require a thread for each partition on >> each broker? (If not, how many threads does it require?) >> > >You can specify how many streams you want - if there are more partitions >than threads, then a given thread can consume from multiple partitions. If >there are more threads than available partitions, there will be idle >threads. > > >> - Does the ConsumerConnector use actual asynchronous IO, or does it >> mimic it by using a dedicated behind-the-scenes thread (and the >> traditional java socket API)? >> > >The consumer connector uses SimpleConsumers for each broker that it >connects to. These consumers fetch from each broker and insert chunks into >blocking queues which the consumer iterators then dequeue. > >Joel
-
Re: Proper use of ConsumerConnectorNeha Narkhede 2012-12-20, 16:56
> Do you think this is a good reason to use SimpleConsumer rather than
> ConsumerConnector? > Yes, if you want to be able to rewind to some offset, SimpleConsumer is the right API for this purpose.
-
Re: Proper use of ConsumerConnectorJoel Koshy 2012-12-20, 18:06
“unless you have a good reason to load balance and manage offsets manually”
> > In general one consumer connector consumes more than one partition. > In client side, we want to get all partitions offset for any message, if > crash happens(some message is fetched from kafka but the result is not > flushed to disk) > happens we can use offset info to rewind kafka consumer. > > Do you think this is a good reason to use SimpleConsumer rather than > ConsumerConnector? An alternative to using simpleconsumer in this use case is to use the zookeeper consumer connector and turn off auto commit. After your consumer process is done processing a batch of messages you can all commitOffsets - the main caveat to be aware of is that if your consumer processes batches very fast you would write to zookeeper that often - so in fact setting an autocommit interval and being willing to deal with duplicates is almost equivalent. KAFKA-657 would help I think - since once that API is available you can store your offsets anywhere you like. Joel > > On 12-12-20 上午3:16, "Joel Koshy" <[EMAIL PROTECTED]> wrote: > > >In general, you should use the consumer connector - unless you have a good > >reason to load balance and manage offsets manually (which is taken care of > >in the consumer connector). > > > > > >- Does the ConsumerConnector manage connections to multiple brokers, > >> or just a single broker? > >> > > > >Multiple brokers. > > > > > >> - Does the ConsumerConnector require a thread for each partition on > >> each broker? (If not, how many threads does it require?) > >> > > > >You can specify how many streams you want - if there are more partitions > >than threads, then a given thread can consume from multiple partitions. If > >there are more threads than available partitions, there will be idle > >threads. > > > > > >> - Does the ConsumerConnector use actual asynchronous IO, or does it > >> mimic it by using a dedicated behind-the-scenes thread (and the > >> traditional java socket API)? > >> > > > >The consumer connector uses SimpleConsumers for each broker that it > >connects to. These consumers fetch from each broker and insert chunks into > >blocking queues which the consumer iterators then dequeue. > > > >Joel > > >
-
Re: Proper use of ConsumerConnectorNeha Narkhede 2012-12-20, 18:14
> An alternative to using simpleconsumer in this use case is to use the
> zookeeper consumer connector and turn off auto commit. > Keep in mind that this works only if you don't care about controlling per partition rewind capability. The high level consumer will not give you control over which partitions your consumer consumes and which partitions it commits the offsets for. If you need to rewind consumption for a subset of those partitions, then ZookeeperConsumerConnector will not work for you. Thanks, Neha
-
Re: Proper use of ConsumerConnectorTom Brown 2012-12-20, 19:32
In order to support rollbacks and checkpoints, there would have to be
a way to both supply partition offsets to the consumer before reading, as well as retrieve partition offsets from them consumer once reading is complete. From what I've read here, it appears that neither the ConsumerConnector nor the ZookeeperConsumerConnector have either of those capabilities. In order to finely manage offsets, only the SimpleConsumer will work. Is that the correct interpretation? --Tom On Thu, Dec 20, 2012 at 11:13 AM, Neha Narkhede <[EMAIL PROTECTED]> wrote: >> An alternative to using simpleconsumer in this use case is to use the >> zookeeper consumer connector and turn off auto commit. >> > > Keep in mind that this works only if you don't care about controlling per > partition rewind capability. > The high level consumer will not give you control over which partitions > your consumer consumes and > which partitions it commits the offsets for. If you need to rewind > consumption for a subset of those partitions, > then ZookeeperConsumerConnector will not work for you. > > Thanks, > Neha
-
Re: Proper use of ConsumerConnectorNeha Narkhede 2012-12-20, 19:43
> Is that the correct interpretation?
Correct.
-
Re: Proper use of ConsumerConnector永辉 赵 2012-12-21, 10:56
Thanks Neha and Joel.
My understanding about offset is: 1. Offset stored in zk is only used when the consumer is connected again. 2. Joel's suggestion "in fact setting an autocommit interval and being willing to deal with duplicates is almost equivalent. " makes sense. But if crash happens just after offset committed, then unprocessed message in consumer will be skipped after reconnected. Please correct me if I am wrong. In ConsumerConnector, if ConsumerIterator can return partition offset with message together, then we save offset in client side and commit offset only after all the message before this offset is done(turn off autoCommit). I roughly go through the code, if use this option I need change some code. Another option is use simpleConnector as we discussed before, but this option required more code work in client side, since one consumer may has more than 1 simpleConnector. We need manage these connector with Zk and merge result for each connector. I tend to option 1. Thanks, Yonghui From: Neha Narkhede <[EMAIL PROTECTED]> Date: 2012年12月21日星期五 上午2:13 To: <[EMAIL PROTECTED]> Cc: 永辉 赵 <[EMAIL PROTECTED]> Subject: Re: Proper use of ConsumerConnector > An alternative to using simpleconsumer in this use case is to use the > zookeeper consumer connector and turn off auto commit. Keep in mind that this works only if you don't care about controlling per partition rewind capability. The high level consumer will not give you control over which partitions your consumer consumes and which partitions it commits the offsets for. If you need to rewind consumption for a subset of those partitions, then ZookeeperConsumerConnector will not work for you. Thanks, Neha
-
Re: Proper use of ConsumerConnectorNeha Narkhede 2012-12-21, 17:37
> But if crash happens just after offset committed, then unprocessed
> message in consumer will be skipped after reconnected. > If the consumer crashes, you will get duplicates, not lose any data. Thanks, Neha
-
Re: Proper use of ConsumerConnectorTom Brown 2012-12-21, 17:47
Does the ConsumerConnector keep track of the offsets of data
downloaded from the server (and queued for consumption by the end user of the API), or does it keep track of the actual offset that has been consumed by the end user? --Tom On Fri, Dec 21, 2012 at 10:37 AM, Neha Narkhede <[EMAIL PROTECTED]> wrote: >> But if crash happens just after offset committed, then unprocessed >> message in consumer will be skipped after reconnected. >> > > If the consumer crashes, you will get duplicates, not lose any data. > > Thanks, > Neha
-
Re: Proper use of ConsumerConnectorJun Rao 2012-12-21, 17:57
It tracks the consumed offset, not the fetched offset.
Thanks, Jun On Fri, Dec 21, 2012 at 9:46 AM, Tom Brown <[EMAIL PROTECTED]> wrote: > Does the ConsumerConnector keep track of the offsets of data > downloaded from the server (and queued for consumption by the end user > of the API), or does it keep track of the actual offset that has been > consumed by the end user? > > --Tom > > On Fri, Dec 21, 2012 at 10:37 AM, Neha Narkhede <[EMAIL PROTECTED]> > wrote: > >> But if crash happens just after offset committed, then unprocessed > >> message in consumer will be skipped after reconnected. > >> > > > > If the consumer crashes, you will get duplicates, not lose any data. > > > > Thanks, > > Neha >
-
Re: Proper use of ConsumerConnectorYonghui Zhao 2012-12-21, 23:12
In our project we use senseidb to consume kafka data. Senseidb will process the message immediately but won't flush to disk immeidately. So if senseidb crash then all result not flushed will be lost£¬ we want to rewind kafka. The offset we want to rewind to is the flush checkpoint.
In this case, we will lost some data Sent from my iPad ÔÚ 2012-12-22£¬1:37£¬Neha Narkhede <[EMAIL PROTECTED]> дµÀ£º > > But if crash happens just after offset committed, then unprocessed message in consumer will be skipped after reconnected. > > If the consumer crashes, you will get duplicates, not lose any data. > > Thanks, > Neha >
-
Re: Proper use of ConsumerConnectorTom Brown 2012-12-22, 01:44
It seems that a common thread is that while ConsumerConnector works
well for the standard case, it just doesn't work for any case where manual offset management (explicit checkpoints, rollbacks, etc) is required. If any Kafka devs are looking for a way to improve it, I think modifying it to be more modular regarding offset management would be great! You could provide an interface for loading/committing offsets, then provide a ZK implementation as the default. It would be backwards compatible, but be useful in all of the use cases where explicit offset management is required. (of course, I know I'm just an armchair kafka dev, so there may be reasons why this won't work, or would be an extremely low priorty, or...) --Tom On Fri, Dec 21, 2012 at 4:12 PM, Yonghui Zhao <[EMAIL PROTECTED]> wrote: > In our project we use senseidb to consume kafka data. Senseidb will process the message immediately but won't flush to disk immeidately. So if senseidb crash then all result not flushed will be lost, we want to rewind kafka. The offset we want to rewind to is the flush checkpoint. > In this case, we will lost some data > > Sent from my iPad > > 在 2012-12-22,1:37,Neha Narkhede <[EMAIL PROTECTED]> 写道: > >> >> But if crash happens just after offset committed, then unprocessed message in consumer will be skipped after reconnected. >> >> If the consumer crashes, you will get duplicates, not lose any data. >> >> Thanks, >> Neha >>
-
Re: Proper use of ConsumerConnectorNeha Narkhede 2012-12-23, 19:54
Tom,
That is a good suggestion. Some of us started thinking about re-designing the consumer client a while ago and wrote up some ideas here - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design. In addition to this, we have a working prototype of stage 1 of that re-design here https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design Besides this, work has started on scaling the offset storage for the consumer as part of this JIRA - https://issues.apache.org/jira/browse/KAFKA-657. It is true that the team is currently focussed on developing and stabilizing replication, but we welcome ideas and contribution to the consumer client re-design project as well. Thanks, Neha On Fri, Dec 21, 2012 at 5:44 PM, Tom Brown <[EMAIL PROTECTED]> wrote: > It seems that a common thread is that while ConsumerConnector works > well for the standard case, it just doesn't work for any case where > manual offset management (explicit checkpoints, rollbacks, etc) is > required. > > If any Kafka devs are looking for a way to improve it, I think > modifying it to be more modular regarding offset management would be > great! You could provide an interface for loading/committing offsets, > then provide a ZK implementation as the default. It would be backwards > compatible, but be useful in all of the use cases where explicit > offset management is required. > > (of course, I know I'm just an armchair kafka dev, so there may be > reasons why this won't work, or would be an extremely low priorty, > or...) > > --Tom > > On Fri, Dec 21, 2012 at 4:12 PM, Yonghui Zhao <[EMAIL PROTECTED]> > wrote: > > In our project we use senseidb to consume kafka data. Senseidb will > process the message immediately but won't flush to disk immeidately. So if > senseidb crash then all result not flushed will be lost, we want to rewind > kafka. The offset we want to rewind to is the flush checkpoint. > > In this case, we will lost some data > > > > Sent from my iPad > > > > 在 2012-12-22,1:37,Neha Narkhede <[EMAIL PROTECTED]> 写道: > > > >> > >> But if crash happens just after offset committed, then unprocessed > message in consumer will be skipped after reconnected. > >> > >> If the consumer crashes, you will get duplicates, not lose any data. > >> > >> Thanks, > >> Neha > >> > |