Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Unused streams not getting shutdown


+
Jason Rosenberg 2012-11-06, 00:34
+
Joel Koshy 2012-11-06, 03:10
Copy link to this message
-
Re: Unused streams not getting shutdown
Hi Joel,

I'd be happy to try it, but am a bit concerned about porting any other 0.8
api changes to get everything working (I'd rather not expend the effort
unless there's a stable version I can port to).  Or should I just be able
to drop latest trunk (or 0.8.x) code in place without any changes?

Also, is there a ready bundled download beyond 0.7.2, or do I need download
sources and build everything locally?

Jason

On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:

> Can you try the latest from trunk? This might be related to
> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into
> 0.7.2
>
> Thanks,
>
> Joel
>
>
> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>
> > (I suspect this a bug, but thought I'd check with the group before
> filing)
> >
> > If I create consumer streams using a topic filter, and request more
> threads
> > than are actually allocated (based on the current dynamic topic event
> > watcher, etc.), the unused threads don't get the shutdown message, when
> the
> > connector is shut down.
> >
> > Specifically, if I have code that looks like:
> >
> >    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
> >     List<KafkaStream<Message>> streams > >         consumerConnector.createMessageStreamsByFilter(topicRegex,
> > consumerThreads);
> >
> >     ExecutorService executor > > Executors.newFixedThreadPool(consumerThreads);
> >
> >     for (final KafkaStream<Message> stream : streams) {
> >       executor.submit(new Runnable() {
> >          @Override public void run() {
> >               for (MessageAndMetadata<Message> msgAndMetadata : stream) {
> >                  // do some processing
> >               }
> >           }
> >       };
> >     }
> >
> > And the number of consumerThreads is say, 20, and only 1 stream ends up
> > receiving messages, then only that 1 stream's thread gets the
> > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream
> > iterator gets notified to exit.
> >
> > Looking at ZookeeperConsumerConnector.scala, it looks like the
> > 'topicThreadIdAndQueues' list does not contain entries for all the
> > threadId's, depending on the current state of rebalancing, and thus, the
> > method, sendShutdownToAllQueues() doesn't actually do what it's intended
> to
> > do.
> >
> > The result, is that it's not possible to cleanly shutdown a consumer.
> >
> > I am using 0.7.2.
> >
> > Jason
> >
>
+
Jason Rosenberg 2012-11-06, 04:01
+
Neha Narkhede 2012-11-06, 05:00
+
Jason Rosenberg 2012-11-06, 21:43
+
Joel Koshy 2012-11-07, 00:06
+
Jason Rosenberg 2012-11-07, 16:05