Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> [jira] [Updated] (KAFKA-554) Move all per-topic configuration into ZK and add to the CreateTopicCommand

Copy link to this message
[jira] [Updated] (KAFKA-554) Move all per-topic configuration into ZK and add to the CreateTopicCommand

     [ https://issues.apache.org/jira/browse/KAFKA-554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-554:

    Attachment: KAFKA-554-v1.patch

This patch does two things:
1. Implement a dynamic configuration mechanism for topics
2. Remove the scripts bin/kafka-list-topic.sh, bin/kafka-delete-topic.sh, bin/kafka-create-topic.sh and create a new more powerful tool:
jay@ahab:kafka> bin/kafka-topics.sh
Command must include exactly one action: --list, --describe, --create, --delete, or --alter
Option                                  Description                            
------                                  -----------                            
--alter                                 Alter the configuration for the topic.
--config <name=value>                   A topic configuration for this topic.  
--create                                Create a new topic.                    
--delete                                Delete the topic.                      
--describe                              List details for the given topics.    
--help                                  Print usage information.              
--list                                  List all available topics.            
--partitions <Integer: # of partitions> The number of partitions for the topic.
--replica-assignment                    A list of manual partition-to-broker  
  <broker_id_for_part1_replica1 :         assignments.                        
  broker_id_for_part1_replica2 ,                                              
  broker_id_for_part2_replica1 :                                              
  broker_id_for_part2_replica2 , ...>                                          
--replication-factor <Integer:          The replication factor for each        
  replication factor>                     partition in the topic.              
--topic <topic>                         The topic to be created.              
--zookeeper <urls>                      REQUIRED: The connection string for    
                                          the zookeeper connection in the form
                                          host:port. Multiple URLS can be      
                                          given to allow fail-over.  

This command line tool can either list topics, describe topics, create topics, delete topics, or change the configuration for topics.

Here is an example of creating two topics with overrides:
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic first_topic --topic second_topic --replication-factor 1 --partitions 4 --config segment.bytes=1073741824 --config retention.ms=1000000
Created topic "first_topic".
Created topic "second_topic".

(Any command that takes a topic option can run on a list of topics by giving more than one topic flag.)

./bin/kafka-topics.sh  --zookeeper localhost:2181 --list

./bin/kafka-topics.sh  --zookeeper localhost:2181 --describe --topic second_topic
configs: segment.bytes = 1073741824, retention.ms = 1000000
partitions: 4
partition 0
leader: 0 (ahab.linkedin.biz:9092)
replicas: 0 (ahab.linkedin.biz:9092)
isr: 0 (ahab.linkedin.biz:9092)
partition 1
leader: 0 (ahab.linkedin.biz:9092)
replicas: 0 (ahab.linkedin.biz:9092)
isr: 0 (ahab.linkedin.biz:9092)
partition 2
leader: 0 (ahab.linkedin.biz:9092)
replicas: 0 (ahab.linkedin.biz:9092)
isr: 0 (ahab.linkedin.biz:9092)
partition 3
leader: 0 (ahab.linkedin.biz:9092)
replicas: 0 (ahab.linkedin.biz:9092)
isr: 0 (ahab.linkedin.biz:9092)

This configuration could be changed later for a topic by running
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first_topic --config segment.bytes=673741824 --config retention.ms=500000
Updated config for topic "first_topic".

The implementation of the dynamic config is to add a new zookeeper path
This path has two subdirectories
The per-topic path contains any override properties specified for the topic stored in java.util.Properties format. If no overrides are given then that znode will not exist. The defaults are still taken from the server.properties file.

The /config/changes path is used to reduce the number of watches required. Instead of keeping a watch on each config override znode, whenever we update a config entry we add a sequential entry under the changes directory containing the name of the topic whose config changed. Each broker keeps a watch on this directory and caches the last change it has executed. When the watch fires it executes any new config changes. Old change entries are garbage collected after 10 minutes. The config changes are managed by a new class TopicConfigManager which executes these changes.

This patch also has two refactorings:
1. Renamed KafkaZookeeper to KafkaHealthcheck
2. Moved logic for creating topics out of CreateTopicCommand and replaced it with two utilities in AdminUtils:
       def createTopic(zkClient: ZkClient,
                  topic: String,
                  partitions: Int,
                  replicationFactor: Int,
                  topicConfig: Properties = new Properties)
       def createTopicWithAssignment(zkClient: ZkClient,
                                topic: String,
                                partitionReplicaAssignment: Map[Int, Seq[Int]],
                                config: Properties = new Properties)
The first method will choose a partition assignment, and the second just sanity checks the assignment it is given.

I had originally planned to implement an RPC api to create and delete and alter topics, but I backed away from this since we don't seem to have a sane way to organize admin functionality yet.

I think the first step in cleaning up is probably to refactor AdminUtils into a sane Admin client with methods that match the high-level administrative operations. This