Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Best way to start / stop / restart a Consumer?


Copy link to this message
-
Best way to start / stop / restart a Consumer?
Re: starting/stopping - right now you would have to call shutdown and
create a new consumer connector since you can call createMessageStreams
only once per connector.

On Friday, October 11, 2013, Tanguy tlrx wrote:

> Thanks Jun,
>
> Jira issue has been filled:
> https://issues.apache.org/jira/browse/KAFKA-1083
>
> By the way, what is the recommended way to start, stop and restart a
> ConsumerConnector in the same running JMV?
>
> Thanks,
>
>
> 2013/10/10 Jun Rao <[EMAIL PROTECTED]>
>
> > Each time we create a new consumer connector, we assign a random consumer
> > id by default. You can try setting "consumer.id" to use a fixed consumer
> > id. In any case, we probably should deregister those beans when shutting
> > down the connector. Could you file a jira?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 10, 2013 at 7:55 AM, Tanguy tlrx <[EMAIL PROTECTED]> wrote:
> >
> > > It's in 0.8.
> > >
> > > The JMX names are not exactly the same, but I see 2 beans with similar
> > > names, something like:
> > >
> > >
> > >
> >
> "my_consumer_group-my_consumer-mytopic-my_consumer_hostname.thing.com-1381416134138-3573c8bf-7-FetchQueueSize"
> > >
> > > -- Tanguy
> > >
> > >
> > >
> > >
> > >
> > > 2013/10/10 Jun Rao <[EMAIL PROTECTED]>
> > >
> > > > Is that in 0.7 or 0.8? JMX won't allow a bean with the same name to
> be
> > > > registered twice. Do you see 2 beans with similar names? What are the
> > > exact
> > > > bean names?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 10, 2013 at 2:42 AM, Tanguy tlrx <[EMAIL PROTECTED]>
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > In our application, we are currently starting a Kafka Consumer with
> > the
> > > > > following lines of code:
> > > > >
> > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > > streams = connector .createMessageStreams(map);
> > > > >
> > > > > Then, each KafkaStream is processed in a dedicated thread per topic
> > and
> > > > > partition, as documented here
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > > >
> > > > > We need to stop (halt) and restart the consumer. Today, we just
> call:
> > > > >
> > > > > connector.shutdown()
> > > > >
> > > > > and wait for threads to terminate.
> > > > >
> > > > > To restart the consumer, we create a new connector:
> > > > >
> > > > > connector = Consumer.createJavaConsumerConnector(consumerConfig);
> > > > >
> > > > > When restarting is complete, I can see that a JMX MBean (we  use
> > > Metrics
> > > > > JMXReporter) like "ZookeeperConsumerConnector" is registered twice.
> > > This
> > > > > bean is not registered when the previous connector instance is shut
> > > down.
> > > > >
> > > > > What is the best way to stop/halt and restart a Consumer using the
> > Java
> > > > > API?
> > > > >
> > > > > Is it normal that the MBean is not unregistered at shutdown time?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -- Tanguy
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Tanguy
> > > twitter @tlrx
> > > https://github.com/tlrx
> > >
> >
>
>
>
> --
> -- Tanguy
> twitter @tlrx
> https://github.com/tlrx
>
--
Sent from Gmail Mobile