|
|
-
Unused streams not getting shutdown
Jason Rosenberg 2012-11-06, 00:34
(I suspect this a bug, but thought I'd check with the group before filing)
If I create consumer streams using a topic filter, and request more threads than are actually allocated (based on the current dynamic topic event watcher, etc.), the unused threads don't get the shutdown message, when the connector is shut down.
Specifically, if I have code that looks like:
Whitelist topicRegex = new Whitelist("^metrics\\..*$"); List<KafkaStream<Message>> streams consumerConnector.createMessageStreamsByFilter(topicRegex, consumerThreads);
ExecutorService executor Executors.newFixedThreadPool(consumerThreads);
for (final KafkaStream<Message> stream : streams) { executor.submit(new Runnable() { @Override public void run() { for (MessageAndMetadata<Message> msgAndMetadata : stream) { // do some processing } } }; }
And the number of consumerThreads is say, 20, and only 1 stream ends up receiving messages, then only that 1 stream's thread gets the ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream iterator gets notified to exit.
Looking at ZookeeperConsumerConnector.scala, it looks like the 'topicThreadIdAndQueues' list does not contain entries for all the threadId's, depending on the current state of rebalancing, and thus, the method, sendShutdownToAllQueues() doesn't actually do what it's intended to do.
The result, is that it's not possible to cleanly shutdown a consumer.
I am using 0.7.2.
Jason
-
Re: Unused streams not getting shutdown
Joel Koshy 2012-11-06, 03:10
Can you try the latest from trunk? This might be related to https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into 0.7.2 Thanks, Joel On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > (I suspect this a bug, but thought I'd check with the group before filing) > > If I create consumer streams using a topic filter, and request more threads > than are actually allocated (based on the current dynamic topic event > watcher, etc.), the unused threads don't get the shutdown message, when the > connector is shut down. > > Specifically, if I have code that looks like: > > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > List<KafkaStream<Message>> streams > consumerConnector.createMessageStreamsByFilter(topicRegex, > consumerThreads); > > ExecutorService executor > Executors.newFixedThreadPool(consumerThreads); > > for (final KafkaStream<Message> stream : streams) { > executor.submit(new Runnable() { > @Override public void run() { > for (MessageAndMetadata<Message> msgAndMetadata : stream) { > // do some processing > } > } > }; > } > > And the number of consumerThreads is say, 20, and only 1 stream ends up > receiving messages, then only that 1 stream's thread gets the > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream > iterator gets notified to exit. > > Looking at ZookeeperConsumerConnector.scala, it looks like the > 'topicThreadIdAndQueues' list does not contain entries for all the > threadId's, depending on the current state of rebalancing, and thus, the > method, sendShutdownToAllQueues() doesn't actually do what it's intended to > do. > > The result, is that it's not possible to cleanly shutdown a consumer. > > I am using 0.7.2. > > Jason >
-
Re: Unused streams not getting shutdown
Jason Rosenberg 2012-11-06, 03:35
Hi Joel, I'd be happy to try it, but am a bit concerned about porting any other 0.8 api changes to get everything working (I'd rather not expend the effort unless there's a stable version I can port to). Or should I just be able to drop latest trunk (or 0.8.x) code in place without any changes? Also, is there a ready bundled download beyond 0.7.2, or do I need download sources and build everything locally? Jason On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote: > Can you try the latest from trunk? This might be related to > https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into > 0.7.2 > > Thanks, > > Joel > > > On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > > > (I suspect this a bug, but thought I'd check with the group before > filing) > > > > If I create consumer streams using a topic filter, and request more > threads > > than are actually allocated (based on the current dynamic topic event > > watcher, etc.), the unused threads don't get the shutdown message, when > the > > connector is shut down. > > > > Specifically, if I have code that looks like: > > > > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > > List<KafkaStream<Message>> streams > > consumerConnector.createMessageStreamsByFilter(topicRegex, > > consumerThreads); > > > > ExecutorService executor > > Executors.newFixedThreadPool(consumerThreads); > > > > for (final KafkaStream<Message> stream : streams) { > > executor.submit(new Runnable() { > > @Override public void run() { > > for (MessageAndMetadata<Message> msgAndMetadata : stream) { > > // do some processing > > } > > } > > }; > > } > > > > And the number of consumerThreads is say, 20, and only 1 stream ends up > > receiving messages, then only that 1 stream's thread gets the > > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream > > iterator gets notified to exit. > > > > Looking at ZookeeperConsumerConnector.scala, it looks like the > > 'topicThreadIdAndQueues' list does not contain entries for all the > > threadId's, depending on the current state of rebalancing, and thus, the > > method, sendShutdownToAllQueues() doesn't actually do what it's intended > to > > do. > > > > The result, is that it's not possible to cleanly shutdown a consumer. > > > > I am using 0.7.2. > > > > Jason > > >
-
Re: Unused streams not getting shutdown
Jason Rosenberg 2012-11-06, 04:01
Interestingly, I just checked out the latest sources, and did the build, and it produced jars for 0.7.0! What's that about? Anyway, it does indeed look like this issue is indeed fixed with the latest trunk. Will there be a released version of this, prior to 0.8? Or will there be a beta for 0.8 upcoming? Thanks, Jason On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > Hi Joel, > > I'd be happy to try it, but am a bit concerned about porting any other 0.8 > api changes to get everything working (I'd rather not expend the effort > unless there's a stable version I can port to). Or should I just be able > to drop latest trunk (or 0.8.x) code in place without any changes? > > Also, is there a ready bundled download beyond 0.7.2, or do I need > download sources and build everything locally? > > Jason > > > On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote: > >> Can you try the latest from trunk? This might be related to >> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it >> into >> 0.7.2 >> >> Thanks, >> >> Joel >> >> >> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: >> >> > (I suspect this a bug, but thought I'd check with the group before >> filing) >> > >> > If I create consumer streams using a topic filter, and request more >> threads >> > than are actually allocated (based on the current dynamic topic event >> > watcher, etc.), the unused threads don't get the shutdown message, when >> the >> > connector is shut down. >> > >> > Specifically, if I have code that looks like: >> > >> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); >> > List<KafkaStream<Message>> streams >> > consumerConnector.createMessageStreamsByFilter(topicRegex, >> > consumerThreads); >> > >> > ExecutorService executor >> > Executors.newFixedThreadPool(consumerThreads); >> > >> > for (final KafkaStream<Message> stream : streams) { >> > executor.submit(new Runnable() { >> > @Override public void run() { >> > for (MessageAndMetadata<Message> msgAndMetadata : stream) >> { >> > // do some processing >> > } >> > } >> > }; >> > } >> > >> > And the number of consumerThreads is say, 20, and only 1 stream ends up >> > receiving messages, then only that 1 stream's thread gets the >> > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream >> > iterator gets notified to exit. >> > >> > Looking at ZookeeperConsumerConnector.scala, it looks like the >> > 'topicThreadIdAndQueues' list does not contain entries for all the >> > threadId's, depending on the current state of rebalancing, and thus, the >> > method, sendShutdownToAllQueues() doesn't actually do what it's >> intended to >> > do. >> > >> > The result, is that it's not possible to cleanly shutdown a consumer. >> > >> > I am using 0.7.2. >> > >> > Jason >> > >> > >
-
Re: Unused streams not getting shutdown
Neha Narkhede 2012-11-06, 05:00
The 0.7.0 seems to be a versioning bug on our side. We expect an 0.8 beta coming up soon. Thanks, Neha On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > Interestingly, I just checked out the latest sources, and did the build, > and it produced jars for 0.7.0! What's that about? > > Anyway, it does indeed look like this issue is indeed fixed with the latest > trunk. > > Will there be a released version of this, prior to 0.8? Or will there be a > beta for 0.8 upcoming? > > Thanks, > > Jason > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > >> Hi Joel, >> >> I'd be happy to try it, but am a bit concerned about porting any other 0.8 >> api changes to get everything working (I'd rather not expend the effort >> unless there's a stable version I can port to). Or should I just be able >> to drop latest trunk (or 0.8.x) code in place without any changes? >> >> Also, is there a ready bundled download beyond 0.7.2, or do I need >> download sources and build everything locally? >> >> Jason >> >> >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote: >> >>> Can you try the latest from trunk? This might be related to >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it >>> into >>> 0.7.2 >>> >>> Thanks, >>> >>> Joel >>> >>> >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: >>> >>> > (I suspect this a bug, but thought I'd check with the group before >>> filing) >>> > >>> > If I create consumer streams using a topic filter, and request more >>> threads >>> > than are actually allocated (based on the current dynamic topic event >>> > watcher, etc.), the unused threads don't get the shutdown message, when >>> the >>> > connector is shut down. >>> > >>> > Specifically, if I have code that looks like: >>> > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); >>> > List<KafkaStream<Message>> streams >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, >>> > consumerThreads); >>> > >>> > ExecutorService executor >>> > Executors.newFixedThreadPool(consumerThreads); >>> > >>> > for (final KafkaStream<Message> stream : streams) { >>> > executor.submit(new Runnable() { >>> > @Override public void run() { >>> > for (MessageAndMetadata<Message> msgAndMetadata : stream) >>> { >>> > // do some processing >>> > } >>> > } >>> > }; >>> > } >>> > >>> > And the number of consumerThreads is say, 20, and only 1 stream ends up >>> > receiving messages, then only that 1 stream's thread gets the >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream >>> > iterator gets notified to exit. >>> > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the >>> > 'topicThreadIdAndQueues' list does not contain entries for all the >>> > threadId's, depending on the current state of rebalancing, and thus, the >>> > method, sendShutdownToAllQueues() doesn't actually do what it's >>> intended to >>> > do. >>> > >>> > The result, is that it's not possible to cleanly shutdown a consumer. >>> > >>> > I am using 0.7.2. >>> > >>> > Jason >>> > >>> >> >>
-
Re: Unused streams not getting shutdown
Jason Rosenberg 2012-11-06, 21:43
Ok, So one variation on this which does not appear to be working correctly on trunk, is if I start up a consumer connector, but then never send any messages to it, and then shut it down, it's consumer threads never get notified of the shut down. It seems there's a dependency on initially receiving at least one message and processing it, before the topicThreadIdAndQueues object gets initialized. Shall I file this one? Jason On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > Interestingly, I just checked out the latest sources, and did the build, > and it produced jars for 0.7.0! What's that about? > > Anyway, it does indeed look like this issue is indeed fixed with the > latest trunk. > > Will there be a released version of this, prior to 0.8? Or will there be > a beta for 0.8 upcoming? > > Thanks, > > Jason > > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > >> Hi Joel, >> >> I'd be happy to try it, but am a bit concerned about porting any other >> 0.8 api changes to get everything working (I'd rather not expend the effort >> unless there's a stable version I can port to). Or should I just be able >> to drop latest trunk (or 0.8.x) code in place without any changes? >> >> Also, is there a ready bundled download beyond 0.7.2, or do I need >> download sources and build everything locally? >> >> Jason >> >> >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote: >> >>> Can you try the latest from trunk? This might be related to >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it >>> into >>> 0.7.2 >>> >>> Thanks, >>> >>> Joel >>> >>> >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> >>> wrote: >>> >>> > (I suspect this a bug, but thought I'd check with the group before >>> filing) >>> > >>> > If I create consumer streams using a topic filter, and request more >>> threads >>> > than are actually allocated (based on the current dynamic topic event >>> > watcher, etc.), the unused threads don't get the shutdown message, >>> when the >>> > connector is shut down. >>> > >>> > Specifically, if I have code that looks like: >>> > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); >>> > List<KafkaStream<Message>> streams >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, >>> > consumerThreads); >>> > >>> > ExecutorService executor >>> > Executors.newFixedThreadPool(consumerThreads); >>> > >>> > for (final KafkaStream<Message> stream : streams) { >>> > executor.submit(new Runnable() { >>> > @Override public void run() { >>> > for (MessageAndMetadata<Message> msgAndMetadata : >>> stream) { >>> > // do some processing >>> > } >>> > } >>> > }; >>> > } >>> > >>> > And the number of consumerThreads is say, 20, and only 1 stream ends up >>> > receiving messages, then only that 1 stream's thread gets the >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the >>> KafkaStream >>> > iterator gets notified to exit. >>> > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the >>> > 'topicThreadIdAndQueues' list does not contain entries for all the >>> > threadId's, depending on the current state of rebalancing, and thus, >>> the >>> > method, sendShutdownToAllQueues() doesn't actually do what it's >>> intended to >>> > do. >>> > >>> > The result, is that it's not possible to cleanly shutdown a consumer. >>> > >>> > I am using 0.7.2. >>> > >>> > Jason >>> > >>> >> >> >
-
Re: Unused streams not getting shutdown
Joel Koshy 2012-11-07, 00:06
That would be a bug. Can you file a jira? Thanks, Joel On Tue, Nov 6, 2012 at 1:43 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > Ok, > > So one variation on this which does not appear to be working correctly on > trunk, is if I start up a consumer connector, but then never send any > messages to it, and then shut it down, it's consumer threads never get > notified of the shut down. > > It seems there's a dependency on initially receiving at least one message > and processing it, before the topicThreadIdAndQueues object gets > initialized. > > Shall I file this one? > > Jason > > > > On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > > > Interestingly, I just checked out the latest sources, and did the build, > > and it produced jars for 0.7.0! What's that about? > > > > Anyway, it does indeed look like this issue is indeed fixed with the > > latest trunk. > > > > Will there be a released version of this, prior to 0.8? Or will there be > > a beta for 0.8 upcoming? > > > > Thanks, > > > > Jason > > > > > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> > wrote: > > > >> Hi Joel, > >> > >> I'd be happy to try it, but am a bit concerned about porting any other > >> 0.8 api changes to get everything working (I'd rather not expend the > effort > >> unless there's a stable version I can port to). Or should I just be > able > >> to drop latest trunk (or 0.8.x) code in place without any changes? > >> > >> Also, is there a ready bundled download beyond 0.7.2, or do I need > >> download sources and build everything locally? > >> > >> Jason > >> > >> > >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote: > >> > >>> Can you try the latest from trunk? This might be related to > >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it > >>> into > >>> 0.7.2 > >>> > >>> Thanks, > >>> > >>> Joel > >>> > >>> > >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> > >>> wrote: > >>> > >>> > (I suspect this a bug, but thought I'd check with the group before > >>> filing) > >>> > > >>> > If I create consumer streams using a topic filter, and request more > >>> threads > >>> > than are actually allocated (based on the current dynamic topic event > >>> > watcher, etc.), the unused threads don't get the shutdown message, > >>> when the > >>> > connector is shut down. > >>> > > >>> > Specifically, if I have code that looks like: > >>> > > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > >>> > List<KafkaStream<Message>> streams > >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, > >>> > consumerThreads); > >>> > > >>> > ExecutorService executor > >>> > Executors.newFixedThreadPool(consumerThreads); > >>> > > >>> > for (final KafkaStream<Message> stream : streams) { > >>> > executor.submit(new Runnable() { > >>> > @Override public void run() { > >>> > for (MessageAndMetadata<Message> msgAndMetadata : > >>> stream) { > >>> > // do some processing > >>> > } > >>> > } > >>> > }; > >>> > } > >>> > > >>> > And the number of consumerThreads is say, 20, and only 1 stream ends > up > >>> > receiving messages, then only that 1 stream's thread gets the > >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the > >>> KafkaStream > >>> > iterator gets notified to exit. > >>> > > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the > >>> > 'topicThreadIdAndQueues' list does not contain entries for all the > >>> > threadId's, depending on the current state of rebalancing, and thus, > >>> the > >>> > method, sendShutdownToAllQueues() doesn't actually do what it's > >>> intended to > >>> > do. > >>> > > >>> > The result, is that it's not possible to cleanly shutdown a consumer. > >>> > > >>> > I am using 0.7.2. > >>> > > >>> > Jason > >>> > > >>> > >> > >> > > >
-
Re: Unused streams not getting shutdown
Jason Rosenberg 2012-11-07, 16:05
Filed: https://issues.apache.org/jira/browse/KAFKA-602On Tue, Nov 6, 2012 at 4:06 PM, Joel Koshy <[EMAIL PROTECTED]> wrote: > That would be a bug. Can you file a jira? > > Thanks, > > Joel > > > On Tue, Nov 6, 2012 at 1:43 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote: > > > Ok, > > > > So one variation on this which does not appear to be working correctly on > > trunk, is if I start up a consumer connector, but then never send any > > messages to it, and then shut it down, it's consumer threads never get > > notified of the shut down. > > > > It seems there's a dependency on initially receiving at least one message > > and processing it, before the topicThreadIdAndQueues object gets > > initialized. > > > > Shall I file this one? > > > > Jason > > > > > > > > On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> > wrote: > > > > > Interestingly, I just checked out the latest sources, and did the > build, > > > and it produced jars for 0.7.0! What's that about? > > > > > > Anyway, it does indeed look like this issue is indeed fixed with the > > > latest trunk. > > > > > > Will there be a released version of this, prior to 0.8? Or will there > be > > > a beta for 0.8 upcoming? > > > > > > Thanks, > > > > > > Jason > > > > > > > > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]> > > wrote: > > > > > >> Hi Joel, > > >> > > >> I'd be happy to try it, but am a bit concerned about porting any other > > >> 0.8 api changes to get everything working (I'd rather not expend the > > effort > > >> unless there's a stable version I can port to). Or should I just be > > able > > >> to drop latest trunk (or 0.8.x) code in place without any changes? > > >> > > >> Also, is there a ready bundled download beyond 0.7.2, or do I need > > >> download sources and build everything locally? > > >> > > >> Jason > > >> > > >> > > >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> > wrote: > > >> > > >>> Can you try the latest from trunk? This might be related to > > >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make > it > > >>> into > > >>> 0.7.2 > > >>> > > >>> Thanks, > > >>> > > >>> Joel > > >>> > > >>> > > >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> > > >>> wrote: > > >>> > > >>> > (I suspect this a bug, but thought I'd check with the group before > > >>> filing) > > >>> > > > >>> > If I create consumer streams using a topic filter, and request more > > >>> threads > > >>> > than are actually allocated (based on the current dynamic topic > event > > >>> > watcher, etc.), the unused threads don't get the shutdown message, > > >>> when the > > >>> > connector is shut down. > > >>> > > > >>> > Specifically, if I have code that looks like: > > >>> > > > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > > >>> > List<KafkaStream<Message>> streams > > >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, > > >>> > consumerThreads); > > >>> > > > >>> > ExecutorService executor > > >>> > Executors.newFixedThreadPool(consumerThreads); > > >>> > > > >>> > for (final KafkaStream<Message> stream : streams) { > > >>> > executor.submit(new Runnable() { > > >>> > @Override public void run() { > > >>> > for (MessageAndMetadata<Message> msgAndMetadata : > > >>> stream) { > > >>> > // do some processing > > >>> > } > > >>> > } > > >>> > }; > > >>> > } > > >>> > > > >>> > And the number of consumerThreads is say, 20, and only 1 stream > ends > > up > > >>> > receiving messages, then only that 1 stream's thread gets the > > >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the > > >>> KafkaStream > > >>> > iterator gets notified to exit. > > >>> > > > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the > > >>> > 'topicThreadIdAndQueues' list does not contain entries for all the > > >>> > threadId's, depending on the current state of rebalancing, and
|
|