Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Multiple Processes Consuming from Same GroupID


Copy link to this message
-
Re: Multiple Processes Consuming from Same GroupID
Jun Rao 2013-09-12, 05:12
This means the broker somehow closed the socket connection. Anything in the
broker log around the same time?

Thanks,

Jun
On Wed, Sep 11, 2013 at 6:07 PM, prashant amar <[EMAIL PROTECTED]> wrote:

> Also noticed another issue
>
> Specified below is the current configuration
>
> Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
>
> Notice that I have used the same naming convention on the consumer group
> set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
> topics.
>
> On calling the *ConsumerOffsetChecker* API, I am receiving a
> ClosedChannelException
>
> (Check Trace Below)
>
> Is there any namespace collision occurring here ? This issue is
> reproducible with the following setup above
>
>
> *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
> --zkconnect localhost:2181*
>
>
> 2013-09-12 01:01:59,701] INFO Initiating client connection,
> connectString=localhost:2181 sessionTimeout=30000
> watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
> (org.apache.zookeeper.ZooKeeper)
> [2013-09-12 01:01:59,724] INFO Opening socket connection to server
> localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/
> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2013-09-12 01:01:59,741] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
> timeout
> = 30000 (org.apache.zookeeper.ClientCnxn)
> [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> Group           Topic                          Pid Offset          logSize
>         Lag             Owner
> gr2             pe1                            0   129985          130625
>        640             none
> gr2             pe1                            1   0               0
>         0               none
> gr2             pe2                            0   130493          130493
>        0               gr2_ip-XXXXXXXXXX-6c6f5d94-0
> [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
>  at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
>  at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
>  at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>  at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
>  at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> gr2             pe2                            1   0               0
>         0               gr2_ip-XXXXXXX-6c6f5d94-1