Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Re: Multiple Processes Consuming from Same GroupID


Copy link to this message
-
Re: Multiple Processes Consuming from Same GroupID
That's still the consumer log, not the broker log.

Thanks,

Jun
On Wed, Sep 11, 2013 at 10:24 PM, prashant amar <[EMAIL PROTECTED]> wrote:

> From the broker log:
>
>
> INFO Reconnect due to socket error:  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
>
>
> On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
> > This means the broker somehow closed the socket connection. Anything in
> the
> > broker log around the same time?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Sep 11, 2013 at 6:07 PM, prashant amar <[EMAIL PROTECTED]>
> > wrote:
> >
> > > Also noticed another issue
> > >
> > > Specified below is the current configuration
> > >
> > > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> > > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> > >
> > > Notice that I have used the same naming convention on the consumer
> group
> > > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
> > > topics.
> > >
> > > On calling the *ConsumerOffsetChecker* API, I am receiving a
> > > ClosedChannelException
> > >
> > > (Check Trace Below)
> > >
> > > Is there any namespace collision occurring here ? This issue is
> > > reproducible with the following setup above
> > >
> > >
> > > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
> > > --zkconnect localhost:2181*
> > >
> > >
> > > 2013-09-12 01:01:59,701] INFO Initiating client connection,
> > > connectString=localhost:2181 sessionTimeout=30000
> > > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-09-12 01:01:59,724] INFO Opening socket connection to server
> > > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2013-09-12 01:01:59,732] INFO Socket connection established to
> > localhost/
> > > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> > > [2013-09-12 01:01:59,741] INFO Session establishment complete on server
> > > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
> > > timeout
> > > = 30000 (org.apache.zookeeper.ClientCnxn)
> > > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)
> > > Group           Topic                          Pid Offset
> >  logSize
> > >         Lag             Owner