Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Review Request 14730: Patch for KAFKA-1001


Copy link to this message
-
Re: Review Request 14730: Patch for KAFKA-1001

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/14730/#review27309
-----------------------------------------------------------

core/src/main/scala/kafka/server/AbstractFetcherManager.scala
<https://reviews.apache.org/r/14730/#comment53135>

    This method can be written in a clearer way. There is code there that does the equivalent of collection.groupBy. If we change the input a set, we can first make a pass to compute the fetcherId and then use collection.groupBy to group the set by BrokerFetcherId. Finally, we can add the set per BokerFetcherId to the fetcherThreadMap and call addPartitions.

core/src/main/scala/kafka/server/AbstractFetcherManager.scala
<https://reviews.apache.org/r/14730/#comment53136>

    This needs to be protected by the mapLock. For simplicity, we can probably just hold the lock in the whole logic. Adding all partitions should be cheap and there is no strong reason to optimize that.

core/src/main/scala/kafka/server/AbstractFetcherManager.scala
<https://reviews.apache.org/r/14730/#comment53138>

    This can be written as
    fetcherThreadMap(brokerAndFetcherId).

core/src/main/scala/kafka/server/AbstractFetcherThread.scala
<https://reviews.apache.org/r/14730/#comment53141>

    I recommend that we hold the lock for the whole method. This may not be necessary for the logic we have now. However, this may change if the logic evolves in the future. Also, iterating an in-memory data structure should be cheap. So, optimizing the locking period is not necessary.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53142>

    Hmm, I don't see the code for marking them as stopped?

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53145>

    I think we need to move replicaStateChangeLock to here. Otherwise, the checking and the updating of controllerEpoch may not be atomic.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53144>

    Could we use a better val name? Something like leaderPartitionInfos?

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53143>

    Could we use named fields instead of _2?

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53148>

    Can this logic be moved to becomeLeaderOrFollower since it's duplicated in becomeFollower()?

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53146>

    Since we serialize the processing of all requests from the controller, we should never hit the code here. So, we should change the logging to WARN and change the logging message accordingly.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment53149>

    Are we expecting any exceptions here? If so, what about the error code in the response?
- Jun Rao
On Oct. 21, 2013, 8:35 p.m., Guozhang Wang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/14730/
> -----------------------------------------------------------
>
> (Updated Oct. 21, 2013, 8:35 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1001
>     https://issues.apache.org/jira/browse/KAFKA-1001
>
>
> Repository: kafka
>
>
> Description
> -------
>
> Incorporated review comments
>
>
> Diffs
> -----
>
>   core/src/main/scala/kafka/cluster/Partition.scala 5ccecd179d33abfc14dcefc35dd68de7474c6978
>   core/src/main/scala/kafka/common/ErrorMapping.scala 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4
>   core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 566ca46d113ee7da4b38ee57302ba183b59ab5d6
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala dda0a8f041f242bf8a501a8cbd2b9c0258323f96