Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> Multiple Processes Consuming from Same GroupID


+
prashant amar 2013-09-11, 23:48
+
prashant amar 2013-09-12, 00:28
+
Neha Narkhede 2013-09-12, 00:47
+
prashant amar 2013-09-12, 01:07
Copy link to this message
-
Re: Multiple Processes Consuming from Same GroupID
This means the broker somehow closed the socket connection. Anything in the
broker log around the same time?

Thanks,

Jun
On Wed, Sep 11, 2013 at 6:07 PM, prashant amar <[EMAIL PROTECTED]> wrote:

> Also noticed another issue
>
> Specified below is the current configuration
>
> Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
>
> Notice that I have used the same naming convention on the consumer group
> set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
> topics.
>
> On calling the *ConsumerOffsetChecker* API, I am receiving a
> ClosedChannelException
>
> (Check Trace Below)
>
> Is there any namespace collision occurring here ? This issue is
> reproducible with the following setup above
>
>
> *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
> --zkconnect localhost:2181*
>
>
> 2013-09-12 01:01:59,701] INFO Initiating client connection,
> connectString=localhost:2181 sessionTimeout=30000
> watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
> (org.apache.zookeeper.ZooKeeper)
> [2013-09-12 01:01:59,724] INFO Opening socket connection to server
> localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/
> 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2013-09-12 01:01:59,741] INFO Session establishment complete on server
> localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
> timeout
> = 30000 (org.apache.zookeeper.ClientCnxn)
> [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> Group           Topic                          Pid Offset          logSize
>         Lag             Owner
> gr2             pe1                            0   129985          130625
>        640             none
> gr2             pe1                            1   0               0
>         0               none
> gr2             pe2                            0   130493          130493
>        0               gr2_ip-XXXXXXXXXX-6c6f5d94-0
> [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
>  at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
>  at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
>  at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>  at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
>  at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> gr2             pe2                            1   0               0
>         0               gr2_ip-XXXXXXX-6c6f5d94-1

 
+
prashant amar 2013-09-12, 05:24
+
prashant amar 2013-09-12, 05:29