Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Consumer Lag increasing but offset not moving.


+
Ramakrishna Mallya 2013-08-14, 23:30
Copy link to this message
-
Re: Consumer Lag increasing but offset not moving.
So those are the logs for one consumer right? How many consumer
instances do you have? Can you run the offset checker tool at an
interval greater than the offset commit interval and verify that
offsets are not moving and that log sizes are growing?

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped%2Cwhy%3F

Thanks,

Joel

On Wed, Aug 14, 2013 at 4:29 PM, Ramakrishna Mallya
<[EMAIL PROTECTED]> wrote:
> Hi,
>
> We are using kafka-0.7.2 with zookeeper (3.4.5)
>
> Our cluster configuration:
> 3 brokers on 3 different machines. Each broker machine has a zookeeper
> instance running as well.
>
> We are currently facing a problem with messages not being consumed by the
> kafka high level consumers. This is a dedicated topic that gets a very high
> number of messages. Consumer had no Full GCs.
>
> Api called def
> createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
> List[KafkaStream[Array[Byte],Array[Byte]]]]
>
> We have already handled the exceptions at the consumers level and we have
> ensured and checked that our threads are not dying.
>
> All our consumer thread stack
>
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x000000065d104ee8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> at java.lang.Thread.run(Thread.java:662)
>
> Kafka consumer auto commit thread stack
>
> "Kafka-consumer-autocommit-0" prio=10 tid=0x00007f99ad3a2000 nid=0x330b in
> Object.wait() [0x00007f9989084000]
>    java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:485)
> at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1309)
> - locked <0x0000000786bdd6c0> (a org.apache.zookeeper.ClientCnxn$Packet)
> at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1264)
> at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:103)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$4.apply(ZookeeperConsumerConnector.scala:251)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$4.apply(ZookeeperConsumerConnector.scala:248)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:248)
> at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:246)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.Pool$$anon$1.foreach(Pool.scala:53)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at kafka.utils.Pool.foreach(Pool.scala:24)
> at
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:246)
> at
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:232)

 
+
Ramakrishna Mallya 2013-08-15, 00:13
+
Jun Rao 2013-08-15, 04:15
+
Ramakrishna Mallya 2013-08-15, 06:00
+
Jun Rao 2013-08-15, 15:11
+
Pavitar Singh 2013-08-16, 08:03
+
Jun Rao 2013-08-16, 14:42
+
Pavitar Singh 2013-08-16, 08:30
+
Pavitar Singh 2013-08-16, 09:22