Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - produce request failed: due to Leader not local for partition


Copy link to this message
-
Re: produce request failed: due to Leader not local for partition
Jason Rosenberg 2013-06-23, 09:04
Hi Sriram,

I don't see any indication at all on the producer that there's a problem.
 Only the above logging on the server (and it repeats continually).  I
think what may be happening is that the producer for that topic did not
actually try to send a message between the start of the controlled shutdown
(which changed the leader for the topic) and the time the server was
restarted.  So the client never sees that the leader changed, but also
never got an exception returned, so it just keeps on sending messages to
the former leader.

I do see the sequence you describe, for errors relating to a broken
connection (e.g when the server gets restarted as part of the rolling
restart, and the producer actually tries to send a message while the server
is down).  In that case I do see on the client (I've renamed identifying
topic/host names here):

2013-06-23 08:25:28,420  WARN [ProducerSendThread-]
async.DefaultEventHandler - Failed to send producer request with
correlation id 474527 to broker 508818741 with data for partitions
[mytopic,0]
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.writev0(Native Method)
        at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:33)
        at sun.nio.ch.IOUtil.write(IOUtil.java:125)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:367)
        at java.nio.channels.SocketChannel.write(SocketChannel.java:360)
        at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
        at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
        at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
        at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
2013-06-23 08:25:28,421  INFO [ProducerSendThread-]
async.DefaultEventHandler - Back off for 100 ms before retrying send.
Remaining retries = 3
2013-06-23 08:25:28,521  INFO [ProducerSendThread-] client.ClientUtils$ -
Fetching metadata from broker id:0,host:mykafkavip:12345 with correlation
id 474528 for 1 topic(s) Set(mytopic)
2013-06-23 08:25:28,522  INFO [ProducerSendThread-] producer.SyncProducer -
Connected to mykafkavip:12345 for producing
2013-06-23 08:25:28,524  INFO [ProducerSendThread-] producer.SyncProducer -
Disconnecting from mykafkavip:12345
2013-06-23 08:25:28,525  INFO [ProducerSendThread-] producer.SyncProducer -
Connected to kafkaserver1:12345 for producing
On Sun, Jun 23, 2013 at 1:55 AM, Sriram Subramanian <
[EMAIL PROTECTED]> wrote: