Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Review Request 14865: Patch for KAFKA-1097


Copy link to this message
-
Re: Review Request 14865: Patch for KAFKA-1097
Neha Narkhede 2013-10-24, 18:14


> On Oct. 24, 2013, 4:30 p.m., Jun Rao wrote:
> > Thanks for the patch. There a couple of issues.
> >
> > 1. The main one is during the phase of partition reassignment when we bootstrap new replicas. At this point, the assigned replica list doesn't include the new replicas. If we only allow replicas in assigned replica set to be added to ISR, those new replicas won't be added to ISR, which will prevent partition reassignment from completing. We could include those new replicas in the all replica set in the LeaderAndIsr request. We probably have to think a bit more to see if there is any other impact.
> >
> > 2. Once the assigned replica set in the broker is updated. We need to prevent an old replica from being added back to this set again. Currently, in Partition.updateLeaderHWAndMaybeExpandIsr() (triggered by a fetch request), it will call getOrCreateReplica(), which can cause a replica to be added back to the assigned replica set. What we can do is to only call getOrCreateReplica during makeLeader() and makeFollower(). In the former, we force all replicas to be created. In the latter, we just need to make sure the local replica is created. In Partition.updateLeaderHWAndMaybeExpandIsr(), we can then use getReplica(), instead of getOrCreateReplica().

Those are great points

1. We can't really include the new replicas in the assigned replicas and send it to the leader. The reason is that the request would have the same leaderEpoch and the broker will ignore the leader and isr request. Hacking around this would require the controller to write the state path with a higher leader epoch (even if there is no real change to the partition's state) and then send the LeaderAndIsrRequest with a higher leaderEpoch
2. This is a good suggestion. Partition.updateLeaderHWAndMaybeExpandIsr() should really be using getReplica().
- Neha
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/14865/#review27459
-----------------------------------------------------------
On Oct. 23, 2013, 4:34 a.m., Neha Narkhede wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/14865/
> -----------------------------------------------------------
>
> (Updated Oct. 23, 2013, 4:34 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1097
>     https://issues.apache.org/jira/browse/KAFKA-1097
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-1097 Race condition while reassigning low throughput partition leads to incorrect ISR information in zookeeper; The changes include 1) Adding the ISR shrink logic as part of the OfflineReplica -> NonExistentReplica state change 2) Adding a safety check on the broker where it only expands the ISR if the replica is in the assigned replica list 3) Updating the assigned replica list on the broker on every makeLeader request and also on makeFollower request for safety, though that's not strictly required. These changes will ensure that the ISR is shrunk by the controller and the leader has an updated assigned replica list. So even if a replica sends a fetch request after the ISR is shrunk by the controller, the broker will not be able to update the ISR until it receives the next LeaderAndIsrRequest (which notifies it of the latest zkVersion of the partition state path) that also contains the shrunk ISR and assigned replica list. Using that the broker will avoid expanding the ISR i
 f the replica is not present in the new assigned replica list
>
>
> Diffs
> -----
>
>   core/src/main/scala/kafka/cluster/Partition.scala 60f3ed4e88b08d0dcf2ea84259ae6dbe9d7c3a2d
>   core/src/main/scala/kafka/controller/KafkaController.scala 88d130f55997b72a8590e4cfe92857a7320e70d5
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 212c05d65dcdc147e55f90875bacc940e30342bf
>   core/src/main/scala/kafka/server/ReplicaManager.scala 03ba60e82cdb3dce100603d615894ede47e4b077