Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Unused streams not getting shutdown


Copy link to this message
-
Re: Unused streams not getting shutdown
The 0.7.0 seems to be a versioning bug on our side. We expect an 0.8
beta coming up soon.

Thanks,
Neha

On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
> Interestingly, I just checked out the latest sources, and did the build,
> and it produced jars for 0.7.0!  What's that about?
>
> Anyway, it does indeed look like this issue is indeed fixed with the latest
> trunk.
>
> Will there be a released version of this, prior to 0.8?  Or will there be a
> beta for 0.8 upcoming?
>
> Thanks,
>
> Jason
>
> On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>
>> Hi Joel,
>>
>> I'd be happy to try it, but am a bit concerned about porting any other 0.8
>> api changes to get everything working (I'd rather not expend the effort
>> unless there's a stable version I can port to).  Or should I just be able
>> to drop latest trunk (or 0.8.x) code in place without any changes?
>>
>> Also, is there a ready bundled download beyond 0.7.2, or do I need
>> download sources and build everything locally?
>>
>> Jason
>>
>>
>> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:
>>
>>> Can you try the latest from trunk? This might be related to
>>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it
>>> into
>>> 0.7.2
>>>
>>> Thanks,
>>>
>>> Joel
>>>
>>>
>>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>>>
>>> > (I suspect this a bug, but thought I'd check with the group before
>>> filing)
>>> >
>>> > If I create consumer streams using a topic filter, and request more
>>> threads
>>> > than are actually allocated (based on the current dynamic topic event
>>> > watcher, etc.), the unused threads don't get the shutdown message, when
>>> the
>>> > connector is shut down.
>>> >
>>> > Specifically, if I have code that looks like:
>>> >
>>> >    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
>>> >     List<KafkaStream<Message>> streams >>> >         consumerConnector.createMessageStreamsByFilter(topicRegex,
>>> > consumerThreads);
>>> >
>>> >     ExecutorService executor >>> > Executors.newFixedThreadPool(consumerThreads);
>>> >
>>> >     for (final KafkaStream<Message> stream : streams) {
>>> >       executor.submit(new Runnable() {
>>> >          @Override public void run() {
>>> >               for (MessageAndMetadata<Message> msgAndMetadata : stream)
>>> {
>>> >                  // do some processing
>>> >               }
>>> >           }
>>> >       };
>>> >     }
>>> >
>>> > And the number of consumerThreads is say, 20, and only 1 stream ends up
>>> > receiving messages, then only that 1 stream's thread gets the
>>> > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream
>>> > iterator gets notified to exit.
>>> >
>>> > Looking at ZookeeperConsumerConnector.scala, it looks like the
>>> > 'topicThreadIdAndQueues' list does not contain entries for all the
>>> > threadId's, depending on the current state of rebalancing, and thus, the
>>> > method, sendShutdownToAllQueues() doesn't actually do what it's
>>> intended to
>>> > do.
>>> >
>>> > The result, is that it's not possible to cleanly shutdown a consumer.
>>> >
>>> > I am using 0.7.2.
>>> >
>>> > Jason
>>> >
>>>
>>
>>