Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - Unused streams not getting shutdown


+
Jason Rosenberg 2012-11-06, 00:34
+
Joel Koshy 2012-11-06, 03:10
+
Jason Rosenberg 2012-11-06, 03:35
+
Jason Rosenberg 2012-11-06, 04:01
+
Neha Narkhede 2012-11-06, 05:00
Copy link to this message
-
Re: Unused streams not getting shutdown
Jason Rosenberg 2012-11-06, 21:43
Ok,

So one variation on this which does not appear to be working correctly on
trunk, is if I start up a consumer connector, but then never send any
messages to it, and then shut it down, it's consumer threads never get
notified of the shut down.

It seems there's a dependency on initially receiving at least one message
and processing it, before the topicThreadIdAndQueues object gets
initialized.

Shall I file this one?

Jason

On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:

> Interestingly, I just checked out the latest sources, and did the build,
> and it produced jars for 0.7.0!  What's that about?
>
> Anyway, it does indeed look like this issue is indeed fixed with the
> latest trunk.
>
> Will there be a released version of this, prior to 0.8?  Or will there be
> a beta for 0.8 upcoming?
>
> Thanks,
>
> Jason
>
>
> On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>
>> Hi Joel,
>>
>> I'd be happy to try it, but am a bit concerned about porting any other
>> 0.8 api changes to get everything working (I'd rather not expend the effort
>> unless there's a stable version I can port to).  Or should I just be able
>> to drop latest trunk (or 0.8.x) code in place without any changes?
>>
>> Also, is there a ready bundled download beyond 0.7.2, or do I need
>> download sources and build everything locally?
>>
>> Jason
>>
>>
>> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:
>>
>>> Can you try the latest from trunk? This might be related to
>>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it
>>> into
>>> 0.7.2
>>>
>>> Thanks,
>>>
>>> Joel
>>>
>>>
>>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]>
>>> wrote:
>>>
>>> > (I suspect this a bug, but thought I'd check with the group before
>>> filing)
>>> >
>>> > If I create consumer streams using a topic filter, and request more
>>> threads
>>> > than are actually allocated (based on the current dynamic topic event
>>> > watcher, etc.), the unused threads don't get the shutdown message,
>>> when the
>>> > connector is shut down.
>>> >
>>> > Specifically, if I have code that looks like:
>>> >
>>> >    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
>>> >     List<KafkaStream<Message>> streams >>> >         consumerConnector.createMessageStreamsByFilter(topicRegex,
>>> > consumerThreads);
>>> >
>>> >     ExecutorService executor >>> > Executors.newFixedThreadPool(consumerThreads);
>>> >
>>> >     for (final KafkaStream<Message> stream : streams) {
>>> >       executor.submit(new Runnable() {
>>> >          @Override public void run() {
>>> >               for (MessageAndMetadata<Message> msgAndMetadata :
>>> stream) {
>>> >                  // do some processing
>>> >               }
>>> >           }
>>> >       };
>>> >     }
>>> >
>>> > And the number of consumerThreads is say, 20, and only 1 stream ends up
>>> > receiving messages, then only that 1 stream's thread gets the
>>> > ZookeeperConsumerConnector.shutdownCommand, which is how the
>>> KafkaStream
>>> > iterator gets notified to exit.
>>> >
>>> > Looking at ZookeeperConsumerConnector.scala, it looks like the
>>> > 'topicThreadIdAndQueues' list does not contain entries for all the
>>> > threadId's, depending on the current state of rebalancing, and thus,
>>> the
>>> > method, sendShutdownToAllQueues() doesn't actually do what it's
>>> intended to
>>> > do.
>>> >
>>> > The result, is that it's not possible to cleanly shutdown a consumer.
>>> >
>>> > I am using 0.7.2.
>>> >
>>> > Jason
>>> >
>>>
>>
>>
>
+
Joel Koshy 2012-11-07, 00:06
+
Jason Rosenberg 2012-11-07, 16:05