Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Multi-threaded consumer question


Copy link to this message
-
Multi-threaded consumer question
I am writing an app that needs to distribute incoming messages to multiple topics and have those topics read by multiple threads per process in the following way:

First, one process writes messages into many different topics (there is a mapping function that defines which message goes to which topic).

Next, a bank of machines is each running a consumer process.  Each consumer process reads a subset of the topics.  Each topic gets read by two different machines (for redundancy), but the exact mix of topics and machines may vary and in principle could be adjusted on the fly (although in my test all that is happening is that the topics are being spun up one at a time until the process is happy that it is reading from all the topics it cares about).

I am creating a separate ConsumerConnector for each topic, and then creating a separate KafkaStream map from each ConsumerConnector, selecting the stream and starting up an iterator.

Do I need to manage a single, shared ConsumerConnector?  My consumer configs defined a groupid of 'katta-group' but I'm not sure what impact that has.

It appears from the logic that each iterator spins up and drains whatever messages were in the topic when the iterator started.  My logs show that iterators do even overlap, one iterator continuing to read messages while a later, different topic iterator starts reading as well.

However, once all topics are caught up and the consumer process quieces (all iterators presumably waiting for input), if I then run the message generation process to push more data into topics, I get a burst of log entries in the consumer as follows:
2012-11-27 13:16:06,489 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 begin rebalancing consumer katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 try #0

2012-11-27 13:16:06,507 INFO consumer.Fetcher - Cleared all relevant queues for this fetcher

2012-11-27 13:16:06,507 INFO consumer.ConsumerIterator - Clearing the current data chunk for this consumer iterator

2012-11-27 13:16:06,507 INFO consumer.Fetcher - Cleared the data chunks in all the consumer message iterators

2012-11-27 13:16:06,507 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Committing all offsets after clearing the fetcher queues

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Releasing partition ownership

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Consumer katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 rebalancing the following partitions: List(0-0) for topic v1-farsi-0 with consumers: List(katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63-0)

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63-0 attempting to claim partition 0-0

2012-11-27 13:16:06,537 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63-0 successfully owned partition 0-0 for topic v1-farsi-0

2012-11-27 13:16:06,537 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Updating the cache

2012-11-27 13:16:06,538 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Consumer katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 selected partitions : v1-farsi-0:0-0: fetched offset = 0: consumed offset = 0

2012-11-27 13:16:06,544 INFO consumer.ZookeeperConsumerConnector - katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 end rebalancing consumer katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 try #0

2012-11-27 13:16:06,544 INFO consumer.FetcherRunnable - FetchRunnable-0 start fetching topic: v1-farsi-0 part: 0 offset: 0 from 192.168.19.203:9092

But I get no messages read.  It looks like there are about 500 new messages written to a variety of topics.  If I try this experiment again, pushing another batch of messages, I get the same burt of rebalancing log entries (but with a different topic from the one listed above).  It appears that there is at least one message written to the topic that is mentioned in the rebalancing log entries, but there are no log entries indicating that the iterator for that topic did anything.

Any ideas?
Bob Jervis | Senior Architect

[cid:[EMAIL PROTECTED]8E20]<http://www.visibletechnologies.com/>
Seattle | Boston | New York | London
Phone: 425.957.6075 | Fax: 781.404.5711

Follow Visibly Intelligent Blog<http://www.visibletechnologies.com/blog/>

[cid:[EMAIL PROTECTED]8E20]<http://twitter.com/visible>[cid:[EMAIL PROTECTED]8E20]<http://www.facebook.com/Visible.Technologies> [cid:[EMAIL PROTECTED]8E20] <http://www.linkedin.com/company/visible-technologies>