Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Review Request 14730: Patch for KAFKA-1001


Copy link to this message
-
Re: Review Request 14730: Patch for KAFKA-1001
Jun Rao 2013-10-18, 17:35

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/14730/#review27195
-----------------------------------------------------------
Thanks for the patch. Using ._1 and ._2 tend to be confusing since it's not obvious what the referenced fields are. So, unless the context is very clear, I recommend that we use named fields. Some detailed comments below.
core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
<https://reviews.apache.org/r/14730/#comment52893>

    For better clarity, we can do
    
    map{ case(topicAndPartition, broker) => }

core/src/main/scala/kafka/log/LogManager.scala
<https://reviews.apache.org/r/14730/#comment52892>

    For better clarity, can write as
    
    for ( (topicAndPartition, truncateOffset) <- partitionAndOffsets)

core/src/main/scala/kafka/server/AbstractFetcherManager.scala
<https://reviews.apache.org/r/14730/#comment52890>

    tp._2._1 is going to be confusing since it's not obvious what field this refers to. It's probably worthwhile to create a new case class BrokerAndInitialOffset. Then we can do
    
    for ( (topicAndPartition, brokerAndInitialOffset) <- partitionAndOffsets)

core/src/main/scala/kafka/server/AbstractFetcherManager.scala
<https://reviews.apache.org/r/14730/#comment52891>

    We need to think through this change. What if a broker is restarted quickly with a new host/port, but same broker id? Can we guarantee that a new fetcher thread with the new host/port will be added if we just keep broker id in the fetcherThreadMap?

core/src/main/scala/kafka/server/AbstractFetcherThread.scala
<https://reviews.apache.org/r/14730/#comment52888>

    It may not be obvious what ._1 and ._2 are. We can write the code like the following:
    
    for ( (topicAndPartition, offset) <- partitionAndOffsets)
    
    Then we can refer to the named fields.
    

core/src/main/scala/kafka/server/AbstractFetcherThread.scala
<https://reviews.apache.org/r/14730/#comment52889>

    Not sure if doing this outside of the lock is safe since immediately after the check, new partitions can be added.
    
    Removing all partitions inside a lock should still be cheap.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment52894>

    We probably should synchronize on the processing of stop replica requests as well. So, we need a more general name for the lock.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment52895>

    can write as map{ case(topic, partition) => }

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment52896>

    Ditto as above.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment52897>

    Ditto as above.

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment52898>

    can write as foreach{ case(topicAndPartition, partitionStateInfo) => }
    

core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/14730/#comment52899>

    Not sure that I understand the comment. Why would HW be overwritten to 0?
    
    Also, could we explain the importance of the ordering of the steps, i.e., removing fetchers, truncate log(and checkpoint flushing offset) and adding fetchers?
- Jun Rao
On Oct. 18, 2013, 2:15 a.m., Guozhang Wang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/14730/
> -----------------------------------------------------------
>
> (Updated Oct. 18, 2013, 2:15 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1001
>     https://issues.apache.org/jira/browse/KAFKA-1001
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-1001.v1.6
>
>
> KAFKA-1001.v1.5