Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Unused streams not getting shutdown


+
Jason Rosenberg 2012-11-06, 00:34
+
Joel Koshy 2012-11-06, 03:10
+
Jason Rosenberg 2012-11-06, 03:35
+
Jason Rosenberg 2012-11-06, 04:01
+
Neha Narkhede 2012-11-06, 05:00
+
Jason Rosenberg 2012-11-06, 21:43
+
Joel Koshy 2012-11-07, 00:06
Copy link to this message
-
Re: Unused streams not getting shutdown
Filed: https://issues.apache.org/jira/browse/KAFKA-602

On Tue, Nov 6, 2012 at 4:06 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:

> That would be a bug. Can you file a jira?
>
> Thanks,
>
> Joel
>
>
> On Tue, Nov 6, 2012 at 1:43 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>
> > Ok,
> >
> > So one variation on this which does not appear to be working correctly on
> > trunk, is if I start up a consumer connector, but then never send any
> > messages to it, and then shut it down, it's consumer threads never get
> > notified of the shut down.
> >
> > It seems there's a dependency on initially receiving at least one message
> > and processing it, before the topicThreadIdAndQueues object gets
> > initialized.
> >
> > Shall I file this one?
> >
> > Jason
> >
> >
> >
> > On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> wrote:
> >
> > > Interestingly, I just checked out the latest sources, and did the
> build,
> > > and it produced jars for 0.7.0!  What's that about?
> > >
> > > Anyway, it does indeed look like this issue is indeed fixed with the
> > > latest trunk.
> > >
> > > Will there be a released version of this, prior to 0.8?  Or will there
> be
> > > a beta for 0.8 upcoming?
> > >
> > > Thanks,
> > >
> > > Jason
> > >
> > >
> > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> > wrote:
> > >
> > >> Hi Joel,
> > >>
> > >> I'd be happy to try it, but am a bit concerned about porting any other
> > >> 0.8 api changes to get everything working (I'd rather not expend the
> > effort
> > >> unless there's a stable version I can port to).  Or should I just be
> > able
> > >> to drop latest trunk (or 0.8.x) code in place without any changes?
> > >>
> > >> Also, is there a ready bundled download beyond 0.7.2, or do I need
> > >> download sources and build everything locally?
> > >>
> > >> Jason
> > >>
> > >>
> > >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]>
> wrote:
> > >>
> > >>> Can you try the latest from trunk? This might be related to
> > >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make
> it
> > >>> into
> > >>> 0.7.2
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Joel
> > >>>
> > >>>
> > >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]>
> > >>> wrote:
> > >>>
> > >>> > (I suspect this a bug, but thought I'd check with the group before
> > >>> filing)
> > >>> >
> > >>> > If I create consumer streams using a topic filter, and request more
> > >>> threads
> > >>> > than are actually allocated (based on the current dynamic topic
> event
> > >>> > watcher, etc.), the unused threads don't get the shutdown message,
> > >>> when the
> > >>> > connector is shut down.
> > >>> >
> > >>> > Specifically, if I have code that looks like:
> > >>> >
> > >>> >    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
> > >>> >     List<KafkaStream<Message>> streams > > >>> >         consumerConnector.createMessageStreamsByFilter(topicRegex,
> > >>> > consumerThreads);
> > >>> >
> > >>> >     ExecutorService executor > > >>> > Executors.newFixedThreadPool(consumerThreads);
> > >>> >
> > >>> >     for (final KafkaStream<Message> stream : streams) {
> > >>> >       executor.submit(new Runnable() {
> > >>> >          @Override public void run() {
> > >>> >               for (MessageAndMetadata<Message> msgAndMetadata :
> > >>> stream) {
> > >>> >                  // do some processing
> > >>> >               }
> > >>> >           }
> > >>> >       };
> > >>> >     }
> > >>> >
> > >>> > And the number of consumerThreads is say, 20, and only 1 stream
> ends
> > up
> > >>> > receiving messages, then only that 1 stream's thread gets the
> > >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the
> > >>> KafkaStream
> > >>> > iterator gets notified to exit.
> > >>> >
> > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the
> > >>> > 'topicThreadIdAndQueues' list does not contain entries for all the
> > >>> > threadId's, depending on the current state of rebalancing, and