I'm working on trying on having seamless rolling restarts for my kafka servers, running 0.8. I have it so that each server will be restarted sequentially. Each server takes itself out of the load balancer (e.g. sets a status that the lb will recognize, and then waits more than long enough for the lb to stop sending meta-data requests to that server). Then I initiate the shutdown (with controlled.shutdown.enable=true). This seems to work well, however, I occasionally see warnings like this in the log from the server, after restart:
2013-06-23 08:28:46,770 WARN [kafka-request-handler-2] server.KafkaApis - [KafkaApi-508818741] Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741
This WARN seems to persistently repeat, until the producer client initiates a new meta-data request (e.g. every 10 minutes, by default). However, the producer doesn't log any errors/exceptions when the server is logging this WARN.
What's happening here? Is the message silently being forwarded on to the correct leader for the partition? Is the message dropped? Are these WARNS particularly useful?
The producer on failure initiates a metadata request to refresh its state and should issue subsequent requests to the new leader. The errors that you see should only happen once per topic partition per producer. Let me know if this is not what you see. On the producer end you should see the following info logging -
"Back off for x ms before retrying send. Remaining retries = y"
If all the retries of the producer failed, you should see error message below -
"Failed to send requests for topics"
On 6/23/13 1:45 AM, "Jason Rosenberg" <[EMAIL PROTECTED]> wrote:
I don't see any indication at all on the producer that there's a problem. Only the above logging on the server (and it repeats continually). I think what may be happening is that the producer for that topic did not actually try to send a message between the start of the controlled shutdown (which changed the leader for the topic) and the time the server was restarted. So the client never sees that the leader changed, but also never got an exception returned, so it just keeps on sending messages to the former leader.
I do see the sequence you describe, for errors relating to a broken connection (e.g when the server gets restarted as part of the rolling restart, and the producer actually tries to send a message while the server is down). In that case I do see on the client (I've renamed identifying topic/host names here):
2013-06-23 08:25:28,420 WARN [ProducerSendThread-] async.DefaultEventHandler - Failed to send producer request with correlation id 474527 to broker 508818741 with data for partitions [mytopic,0] java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:33) at sun.nio.ch.IOUtil.write(IOUtil.java:125) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:367) at java.nio.channels.SocketChannel.write(SocketChannel.java:360) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) 2013-06-23 08:25:28,421 INFO [ProducerSendThread-] async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 3 2013-06-23 08:25:28,521 INFO [ProducerSendThread-] client.ClientUtils$ - Fetching metadata from broker id:0,host:mykafkavip:12345 with correlation id 474528 for 1 topic(s) Set(mytopic) 2013-06-23 08:25:28,522 INFO [ProducerSendThread-] producer.SyncProducer - Connected to mykafkavip:12345 for producing 2013-06-23 08:25:28,524 INFO [ProducerSendThread-] producer.SyncProducer - Disconnecting from mykafkavip:12345 2013-06-23 08:25:28,525 INFO [ProducerSendThread-] producer.SyncProducer - Connected to kafkaserver1:12345 for producing On Sun, Jun 23, 2013 at 1:55 AM, Sriram Subramanian < [EMAIL PROTECTED]> wrote:
Yeah I am using ack = 0, so that makes sense. I'll need to rethink that, it would seem. It would be nice, wouldn't it, in this case, for the broker to realize this and just forward the messages to the correct leader. Would that be possible?
Also, it would be nice to have a second option to the controlled shutdown (e.g. controlled.shutdown.quiescence.ms), to allow the broker to wait after the controlled shutdown, a prescribed amount of time before actually shutting down the server. Then, I could set this value to something a little greater than the producer's 'topic.metadata.refresh.interval.ms'. This would help with hitless rolling restarts too. Currently, every producer gets a very loud "Connection Reset" with a tall stack trace each time I restart a broker. Would be nicer to have the producers still be able to produce until the metadata refresh interval expires, then get the word that the leader has moved due to the controlled shutdown, and then start producing to the new leader, all before the shutting down server actually shuts down. Does that seem feasible?
Jason On Sun, Jun 23, 2013 at 8:23 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
After we implement non-blocking IO for the producer, there may not be much incentive left to use ack = 0, but this is an interesting idea - not just for the controlled shutdown case, but also when leadership moves due to say, a broker's zk session expiring. Will have to think about it a bit more.
On Mon, Jun 24, 2013 at 12:22 AM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
The quiescence time that you proposed won't work. The reason is that with ack=0, the producer starts losing data silently from the moment the leader is moved (by controlled shutdown) until the broker is shut down. So, the sooner that you can shut down the broker, the better. What we realized is that if you can use a larger batch size, ack=1 can still deliver very good throughput.
Jun On Mon, Jun 24, 2013 at 12:22 AM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
I see that with ack=0, the producer will be in a bad state anytime the leader for it's partition has changed, while the broker that it thinks is the leader is still up. So this is a problem in general, not only for controlled shutdown, but even for the case where you've restarted a server (without controlled shutdown), which in and of itself can force a leader change. If the producer doesn't attempt to send a message during the time the broker was down, it will never get a connection failure, and never get fresh metadata, and subsequently start sending messages to the non-leader.
Thus, I'd say this is a problem with ack=0, regardless of controlled shutdown. Any time there's a leader change, the producer will send messages into the ether. I think this is actually a severe condition, that could be considered a bug. How hard would it be to have the receiving broker forward on to the leader, in this case?
Jason On Mon, Jun 24, 2013 at 8:44 AM, Joel Koshy <[EMAIL PROTECTED]> wrote:
Other than controlled shutdown, the only other case that can cause the leader to change when the underlying broker is alive is when the broker expires its ZK session (likely due to GC), which should be rare. That being said, forwarding in the broker may not be a bad idea. Could you file a jira to track this?
Jun On Mon, Jun 24, 2013 at 2:50 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
What about a non-controlled shutdown, and a restart, but the producer never attempts to send anything during the time the broker was down? That could have caused a leader change, but without the producer knowing to refresh it's metadata, no? On Mon, Jun 24, 2013 at 9:05 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
To be clear, this whole discussion was started, because I am clearly seeing "failed due to Leader not local" on the last broker restarted, after all the controlled shutting down has completed and all brokers restarted.
This leads me to believe that a client made a meta data request and found out that server A was the leader for it's partition, and then server A was restarted, and then the client makes repeated producer requests to server A, without encountering a broken socket. Thus, I'm not sure it's correct that the socket is invalidated in that case after a restart.
Alternatively, could it be that the client (which sends messages to multiple topics), gets metadata updates for multiple topics, but doesn't attempt to send a message to topicX until after the leader has changed and server A has been restarted. In this case, if it's the first time the producer sends to topicX, does it only then create a new socket?
Jason On Mon, Jun 24, 2013 at 10:00 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
Also, looking back at my logs, I'm wondering if a producer will reuse the same socket to send data to the same broker, for multiple topics (I'm guessing yes). In which case, it looks like I'm seeing this scenario:
1. producer1 is happily sending messages for topicX and topicY to serverA (serverA is the leader for both topics, only 1 partition for each topic for simplicity). 2. serverA is restarted, and in the process, serverB becomes the new leader for both topicX and topicY. 3. producer1 decides to send a new message to topicX to serverA. 3a. this results in an exception ("Connection reset by peer"). producer1's connection to serverA is invalidated. 3b. producer1 makes a new metadata request for topicX, and learns that serverB is now the leader for topicX. 3c. producer1 resends the message to topicX, on serverB. 4. producer1 decides to send a new message to topicY to serverA. 4a. producer1 notes that it's socket to serverA is invalid, so it creates a new connection to serverA. 4b. producer1 successfully sends it's message to serverA (without realizing that serverA is no longer the leader for topicY). 4c. serverA logs to it's console: 2013-06-23 08:28:46,770 WARN [kafka-request-handler-2] server.KafkaApis - [KafkaApi-508818741] Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741 5. producer1 continues to send messages for topicY to serverA, and serverA continues to log the same messages. 6. 10 minutes later, producer1 decides to update it's metadata for topicY, and learns that serverB is now the leader for topidY. 7. the warning messages finally stop in the console for serverA.
I am pretty sure this scenario, or one very close to it, is what I'm seeing in my logs, after doing a rolling restart, with controlled shutdown.
Does this scenario make sense?
One thing I notice, is that in the steady state, every 10 minutes the producer refreshes it's metadata for all topics. However, when sending a message to a specific topic fails, only the metadata for that topic is refreshed, even though the ramifications should be that all topics which have the same leader might need to be refreshed, especially in response to a "connection reset by peer".
On Mon, Jun 24, 2013 at 10:14 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
I'm thinking that this scenario could be a problem for ack=0 in general (even without controlled shutdown). If we do an "uncontrolled" shutdown, it seems that some topics won't ever know there could have been a leader change. Would it make sense to force a meta-data refresh for all topics on a broker, any time an IOException happens on a socket (e.g. "connection reset")? Currently, it looks like only the topic that experiences the failure will have a metadata refresh issued for it.
Maybe this should be a separate jira issue, now that I think about it.
Jason On Mon, Jun 24, 2013 at 10:52 PM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
Jun On Sat, Jun 29, 2013 at 6:21 AM, Jason Rosenberg <[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation project and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext