I sent around a wiki a few weeks back proposing a set of client
improvements that essentially amount to a rewrite of the producer and
consumer java clients.
The below discussion assumes you have read this wiki.
I started to do a little prototyping for the producer and wanted to share
some of the ideas that came up to get early feedback.
First, a few simple but perhaps controversial things to discuss.
Phase 1: We add the new clients. No change on the server. Old clients still
exist. The new clients will be entirely in a new package so there will be
no possibility of name collision.
Phase 2: We swap out all shared code on the server to use the new client
stuff. At this point the old clients still exist but are essentially
Phase 3: We remove the old client code.
I think we should do the clients in java. Making our users deal with
scala's non-compatability issues and crazy stack traces causes people a lot
of pain. Furthermore we end up having to wrap everything now to get a
usable java api anyway for non-scala people. This does mean maintaining a
substantial chunk of java code, which is maybe less fun than scala. But
basically i think we should optimize for the end user and produce a
standalone pure-java jar with no dependencies.
We definitely want to separate out the client jar. There is also a fair
amount of code shared between both (exceptions, protocol definition, utils,
and the message set implementation). Two approaches.
Two jar approach: split kafka.jar into kafka-clients.jar and
kafka-server.jar with the server depending on the clients. The advantage of
this is that it is simple. The disadvantage is that things like utils and
protocol definition will be in the client jar though technical they belong
equally to the server.
Many jar approach: split kafka.jar into kafka-common.jar,
kafka-producer.jar, kafka-consumer.jar, kafka-admin.jar, and
kafka-server.jar. The disadvantage of this is that the user needs two jars
(common + something) which is for sure going to confuse people. I also
think this will tend to spawn more jars over time.
I am thinking of moving both serialization and compression out of the
background send thread. I will explain a little about this idea below.
I am not sure if we should handle serialization in the client at all.
Basically I wonder if our own API wouldn't just be a lot simpler if we took
a byte key and byte value and let people serialize stuff themselves.
Injecting a class name for us to create the serializer is more roundabout
and has a lot of problems if the serializer itself requires a lot of
configuration or other objects to be instantiated.
The real question with serialization is whether the partitioning should
happen on the java object or on the byte array key. The argument for doing
it on the java object is that it is easier to do something like a range
partition on the object. The problem with doing it on the object is that
the consumer may not be in java and so may not be able to reproduce the
partitioning. For example we currently use Object.hashCode which is a
little sketchy. We would be better off doing a standardized hash function
on the key bytes. If we want to give the partitioner access to the original
java object then obviously we need to handle serialization behind our api.
I think good names are important. I would like to rename the following
classes in the new client:
Message=>Record: Now that the message has both a message and a key it is
more of a KeyedMessage. Another name for a KeyedMessage is a Record.
MessageSet=>Records: This isn't too important but nit pickers complain
that it is not technically a Set but rather a List or Sequence but
MessageList sounds funny to me.
The actual clients will not interact with these classes. They will interact
with a ProducerRecord and ConsumerRecord. The reason for having different
fields is because the different clients
Proposed producer API:
SendResponse r = producer.send(new ProducerRecord(topic, key, value))
Here is what I am thinking about protocol definition. I see a couple of
problems with what we are doing currently. First the protocol definition is
spread throughout a bunch of custom java objects. The error reporting in
these object is really terrible because they don't record the field names.
Furthermore people keep adding business logic into the protocol objects
which is pretty nasty.
I would like to move to having a single Protocol.java file that defines the
protocol in a readable DSL. Here is what I am thinking:
public static Schema REQUEST_HEADER =
new Schema(new Field("api_key", INT16, "The id of the request type."),
new Field("api_version", INT16, "The version of the API."),
new Field("correlation_id", INT32, "A user-supplied
integer value that will be passed back with the response"),
new Field("client_id", STRING, "A user specified
identifier for the client making the request."));
To parse one of these requests you would do
Struct struct = REQUEST_HEADER.parse(bytebuffer);
short apiKey = struct.get("api_key");
Internally Struct is just an Object with one entry per field which is
populated from the schema. The mapping of name to array index is a hash
table lookup. We can optimize access for performance critical areas by
static Field apiKeyField = REQUEST_HEADER.getField("api_key"); // do
this once to lookup the index of the field
Struct struct = REQUEST_HEADER.parse(bytebuffer);
short apiKey = struct.get(apiKeyField); // now this is just an array
One advantage of this is this level of indirection will make it really easy
for us to handle backwards compatability in a more principled way. The
protocol file will actually contain ALL versions of the schema and we will
always use the appropriate vers