We've recently come across a scenario where we see consumers resetting their offsets to earliest and which as far as I can tell may also lead to data loss (we're running with ack = -1 to avoid loss). This seems to happen when we time out on doing a regular shutdown and instead kill -9 the kafka broker, but does obviously apply to any scenario that involves a unclean exit. As far as I can tell what happens is
1. On restart the broker truncates the data for the affected partitions, i.e. not all data was written to disk. 2. The new broker then becomes a leader for the affected partitions and consumers get confused because they've already consumed beyond the now available offset.
This should not happen. We have a notion of a "committed" message, which is a message present on all "in sync" nodes. We never hand out a message to any consumer until it is committed, and we guarantee that only "in sync" nodes are electable as leaders. Setting acks=-1 means wait until the message is committed before returning to the producer.
If you kill all nodes however then all bets are off. In this case we will elect whichever node shows up first as leader and use its log as the source of truth. Is it possible this is happening?
-Jay On Thu, Aug 29, 2013 at 8:32 AM, Sam Meder <[EMAIL PROTECTED]>wrote:
Do you know why you timed out on a regular shutdown? If the replica had fallen off of the ISR and shutdown was forced on the leader this could happen. With ack = -1, we guarantee that all the replicas in the in sync set have received the message before exposing the message to the consumer.
On 8/29/13 8:32 AM, "Sam Meder" <[EMAIL PROTECTED]> wrote:
replicated not necessarily synced to disk. That's not the case?
in sync and becomes the leader again? You're saying there should be some code that prevents if from becoming the leader until it has caught up?
When a broker starts up, it cannot become a leader until it has caught up with the current leader, in other words, has joined the in-sync replica list
don't shut down) 2. start brokers 1 & 2 3. controlled shutdown of brokers 3 & 4 in parallel (kill -9 if they don't shut down) 4. start brokers 3 & 4
If replication factor is 3, when you shutdown brokers 1 & 2, the leader should shift to broker 3 or 4. Unless you shutdown 3 or 4, before 1 or 2 join in ISR, you shouldn't lose any data.
I'm curious to know why controlled shutdown wouldn't succeed ? If you configure the timeout and retries properly and if you are not hitting some sort of a bug, controlled shutdown should succeed. Are you saying that you want to test the kill -9 scenario on purpose ?
On Thu, Aug 29, 2013 at 9:28 AM, Sam Meder <[EMAIL PROTECTED]>wrote:
monitoring the process and keep trying to connect to the port?
Every leader in a Kafka cluster exposes the UnderReplicatedPartitionCount metric. The safest way to issue controlled shutdown is to wait until that metric reports 0 on the brokers. If you try to shutdown the last broker in the ISR, the controlled shutdown cannot succeed since there is no other broker to move the leader to. Waiting until under replicated partition count hits 0 prevents you from hitting this issue.
This also solves the problem of waiting until the broker comes up since you will automatically wait until the broker comes up and joins ISR. Thanks, Neha On Thu, Aug 29, 2013 at 12:59 PM, Sam Meder <[EMAIL PROTECTED]>wrote:
On Aug 29, 2013, at 11:12 PM, Neha Narkhede <[EMAIL PROTECTED]> wrote: Maybe I am missing something, but won't the topics for which I have partitions on the broker I am shutting down always report as under-replicated (unless I manually reassign the partition to another broker)? I thought that the shutdown logic really only dealt with transferring the leader status for a partition.
As a side note it would be great to have a minimum replication factor in addition to the regular replication factor so one can enforce durability guarantees (fail the producer when the message can't be sufficiently replicated). Not sure I follow, but one start-up situation I am concerned about is what happens on abnormal termination (whether through a kill -9, OOM, HW failure - what ever floats your boat). For this scenario it would be great if there was a way to wait for the recovery process to finish. For now we can just wait for the server port to become available, but something more explicit would be great.
IIUC it is a pseudo-automation in that you set up the retry interval for controlled shutdown (controlled.shutdown.retry.backoff.ms) and the number of retries (controlled.shutdown.max.retries) high enough so that during a rolling bounce, the likelihood of a controlled shutdown being unsuccessful is low, since you would have brought back up the previous broker and UnderReplicatedPartitionCount would return to zero soon enough. What would complete this procedure is an external hook into whatever deployment system is being used to wait for UnderReplicatedPartitionCount to return to zero before proceeding to issue a controlled shutdown to the next broker in the bounce sequence. On Thu, Aug 29, 2013 at 5:11 PM, Jay Kreps <[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation project and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext