Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Unused streams not getting shutdown


Copy link to this message
-
Re: Unused streams not getting shutdown
Joel Koshy 2012-11-07, 00:06
That would be a bug. Can you file a jira?

Thanks,

Joel
On Tue, Nov 6, 2012 at 1:43 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:

> Ok,
>
> So one variation on this which does not appear to be working correctly on
> trunk, is if I start up a consumer connector, but then never send any
> messages to it, and then shut it down, it's consumer threads never get
> notified of the shut down.
>
> It seems there's a dependency on initially receiving at least one message
> and processing it, before the topicThreadIdAndQueues object gets
> initialized.
>
> Shall I file this one?
>
> Jason
>
>
>
> On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>
> > Interestingly, I just checked out the latest sources, and did the build,
> > and it produced jars for 0.7.0!  What's that about?
> >
> > Anyway, it does indeed look like this issue is indeed fixed with the
> > latest trunk.
> >
> > Will there be a released version of this, prior to 0.8?  Or will there be
> > a beta for 0.8 upcoming?
> >
> > Thanks,
> >
> > Jason
> >
> >
> > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> wrote:
> >
> >> Hi Joel,
> >>
> >> I'd be happy to try it, but am a bit concerned about porting any other
> >> 0.8 api changes to get everything working (I'd rather not expend the
> effort
> >> unless there's a stable version I can port to).  Or should I just be
> able
> >> to drop latest trunk (or 0.8.x) code in place without any changes?
> >>
> >> Also, is there a ready bundled download beyond 0.7.2, or do I need
> >> download sources and build everything locally?
> >>
> >> Jason
> >>
> >>
> >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:
> >>
> >>> Can you try the latest from trunk? This might be related to
> >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it
> >>> into
> >>> 0.7.2
> >>>
> >>> Thanks,
> >>>
> >>> Joel
> >>>
> >>>
> >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> >>> wrote:
> >>>
> >>> > (I suspect this a bug, but thought I'd check with the group before
> >>> filing)
> >>> >
> >>> > If I create consumer streams using a topic filter, and request more
> >>> threads
> >>> > than are actually allocated (based on the current dynamic topic event
> >>> > watcher, etc.), the unused threads don't get the shutdown message,
> >>> when the
> >>> > connector is shut down.
> >>> >
> >>> > Specifically, if I have code that looks like:
> >>> >
> >>> >    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
> >>> >     List<KafkaStream<Message>> streams > >>> >         consumerConnector.createMessageStreamsByFilter(topicRegex,
> >>> > consumerThreads);
> >>> >
> >>> >     ExecutorService executor > >>> > Executors.newFixedThreadPool(consumerThreads);
> >>> >
> >>> >     for (final KafkaStream<Message> stream : streams) {
> >>> >       executor.submit(new Runnable() {
> >>> >          @Override public void run() {
> >>> >               for (MessageAndMetadata<Message> msgAndMetadata :
> >>> stream) {
> >>> >                  // do some processing
> >>> >               }
> >>> >           }
> >>> >       };
> >>> >     }
> >>> >
> >>> > And the number of consumerThreads is say, 20, and only 1 stream ends
> up
> >>> > receiving messages, then only that 1 stream's thread gets the
> >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the
> >>> KafkaStream
> >>> > iterator gets notified to exit.
> >>> >
> >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the
> >>> > 'topicThreadIdAndQueues' list does not contain entries for all the
> >>> > threadId's, depending on the current state of rebalancing, and thus,
> >>> the
> >>> > method, sendShutdownToAllQueues() doesn't actually do what it's
> >>> intended to
> >>> > do.
> >>> >
> >>> > The result, is that it's not possible to cleanly shutdown a consumer.
> >>> >
> >>> > I am using 0.7.2.
> >>> >
> >>> > Jason
> >>> >
> >>>
> >>
> >>
> >
>