Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Unused streams not getting shutdown

Copy link to this message
Unused streams not getting shutdown
(I suspect this a bug, but thought I'd check with the group before filing)

If I create consumer streams using a topic filter, and request more threads
than are actually allocated (based on the current dynamic topic event
watcher, etc.), the unused threads don't get the shutdown message, when the
connector is shut down.

Specifically, if I have code that looks like:

   Whitelist topicRegex = new Whitelist("^metrics\\..*$");
    List<KafkaStream<Message>> streams         consumerConnector.createMessageStreamsByFilter(topicRegex,

    ExecutorService executor Executors.newFixedThreadPool(consumerThreads);

    for (final KafkaStream<Message> stream : streams) {
      executor.submit(new Runnable() {
         @Override public void run() {
              for (MessageAndMetadata<Message> msgAndMetadata : stream) {
                 // do some processing

And the number of consumerThreads is say, 20, and only 1 stream ends up
receiving messages, then only that 1 stream's thread gets the
ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream
iterator gets notified to exit.

Looking at ZookeeperConsumerConnector.scala, it looks like the
'topicThreadIdAndQueues' list does not contain entries for all the
threadId's, depending on the current state of rebalancing, and thus, the
method, sendShutdownToAllQueues() doesn't actually do what it's intended to

The result, is that it's not possible to cleanly shutdown a consumer.

I am using 0.7.2.

Joel Koshy 2012-11-06, 03:10
Jason Rosenberg 2012-11-06, 03:35
Jason Rosenberg 2012-11-06, 04:01
Neha Narkhede 2012-11-06, 05:00
Jason Rosenberg 2012-11-06, 21:43
Joel Koshy 2012-11-07, 00:06
Jason Rosenberg 2012-11-07, 16:05