Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Unused streams not getting shutdown


Copy link to this message
-
Re: Unused streams not getting shutdown
Can you try the latest from trunk? This might be related to
https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into
0.7.2

Thanks,

Joel
On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:

> (I suspect this a bug, but thought I'd check with the group before filing)
>
> If I create consumer streams using a topic filter, and request more threads
> than are actually allocated (based on the current dynamic topic event
> watcher, etc.), the unused threads don't get the shutdown message, when the
> connector is shut down.
>
> Specifically, if I have code that looks like:
>
>    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
>     List<KafkaStream<Message>> streams >         consumerConnector.createMessageStreamsByFilter(topicRegex,
> consumerThreads);
>
>     ExecutorService executor > Executors.newFixedThreadPool(consumerThreads);
>
>     for (final KafkaStream<Message> stream : streams) {
>       executor.submit(new Runnable() {
>          @Override public void run() {
>               for (MessageAndMetadata<Message> msgAndMetadata : stream) {
>                  // do some processing
>               }
>           }
>       };
>     }
>
> And the number of consumerThreads is say, 20, and only 1 stream ends up
> receiving messages, then only that 1 stream's thread gets the
> ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream
> iterator gets notified to exit.
>
> Looking at ZookeeperConsumerConnector.scala, it looks like the
> 'topicThreadIdAndQueues' list does not contain entries for all the
> threadId's, depending on the current state of rebalancing, and thus, the
> method, sendShutdownToAllQueues() doesn't actually do what it's intended to
> do.
>
> The result, is that it's not possible to cleanly shutdown a consumer.
>
> I am using 0.7.2.
>
> Jason
>