Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Unused streams not getting shutdown


Copy link to this message
-
Re: Unused streams not getting shutdown
Hi Joel,

I'd be happy to try it, but am a bit concerned about porting any other 0.8
api changes to get everything working (I'd rather not expend the effort
unless there's a stable version I can port to).  Or should I just be able
to drop latest trunk (or 0.8.x) code in place without any changes?

Also, is there a ready bundled download beyond 0.7.2, or do I need download
sources and build everything locally?

Jason

On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[EMAIL PROTECTED]> wrote:

> Can you try the latest from trunk? This might be related to
> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into
> 0.7.2
>
> Thanks,
>
> Joel
>
>
> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
>
> > (I suspect this a bug, but thought I'd check with the group before
> filing)
> >
> > If I create consumer streams using a topic filter, and request more
> threads
> > than are actually allocated (based on the current dynamic topic event
> > watcher, etc.), the unused threads don't get the shutdown message, when
> the
> > connector is shut down.
> >
> > Specifically, if I have code that looks like:
> >
> >    Whitelist topicRegex = new Whitelist("^metrics\\..*$");
> >     List<KafkaStream<Message>> streams > >         consumerConnector.createMessageStreamsByFilter(topicRegex,
> > consumerThreads);
> >
> >     ExecutorService executor > > Executors.newFixedThreadPool(consumerThreads);
> >
> >     for (final KafkaStream<Message> stream : streams) {
> >       executor.submit(new Runnable() {
> >          @Override public void run() {
> >               for (MessageAndMetadata<Message> msgAndMetadata : stream) {
> >                  // do some processing
> >               }
> >           }
> >       };
> >     }
> >
> > And the number of consumerThreads is say, 20, and only 1 stream ends up
> > receiving messages, then only that 1 stream's thread gets the
> > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream
> > iterator gets notified to exit.
> >
> > Looking at ZookeeperConsumerConnector.scala, it looks like the
> > 'topicThreadIdAndQueues' list does not contain entries for all the
> > threadId's, depending on the current state of rebalancing, and thus, the
> > method, sendShutdownToAllQueues() doesn't actually do what it's intended
> to
> > do.
> >
> > The result, is that it's not possible to cleanly shutdown a consumer.
> >
> > I am using 0.7.2.
> >
> > Jason
> >
>
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB