|
|
-
[jira] [Updated] (KAFKA-732) MirrorMaker with shallow.iterator.enable=true produces unreadble messages"Neha Narkhede 2013-01-24, 16:49
[ https://issues.apache.org/jira/browse/KAFKA-732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-732: -------------------------------- Labels: p2 (was: ) > MirrorMaker with shallow.iterator.enable=true produces unreadble messages > ------------------------------------------------------------------------- > > Key: KAFKA-732 > URL: https://issues.apache.org/jira/browse/KAFKA-732 > Project: Kafka > Issue Type: Bug > Components: core, producer > Affects Versions: 0.8 > Reporter: Maxime Brugidou > Assignee: Neha Narkhede > Priority: Blocker > Labels: p2 > > Trying to use MirrorMaker between two 0.8 clusters > When using shallow.iterator.enable=true on the consumer side, the performance gain is big (when incoming messages are compressed) and the producer does not complain but write the messages uncompressed without the compression flag. > If you try: > - enable compression on the producer, it obviously makes things worse since the data get double-compressed (the wiki warns about this) > - disable compression and the compressed messages are written in bulk in an uncompressed message, thus making it unreadable. > If I follow correctly the current state of code from MirrorMaker to the produce request, there is no way for the producer to know whether the message is deep or not. So I wonder how it worked on 0.7? > Here is the code as i read it (correct me if i'm wrong): > 1. MirrorMakerThread.run(): create KeyedMessage[Array[Byte],Array[Byte]](topic, message) > 2. Producer.send() -> DefaultEventHandler.handle() > 3. DefaultEventHandler.serialize(): use DefaultEncoder for the message (does nothing) > 4. DefaultEventHandler.dispatchSerializedData(): > 4.1 DefaultEventHandler.partitionAndCollate(): group messages by broker/partition/topic > 4.2 DefaultEventHandler.dispatchSerializeData(): cycle through each broker > 4.3 DefaultEventHandler.groupMessagesToSet(): Create a ByteBufferMessageSet for each partition/topic grouping all the messages together, and compressing them if needed > 4.4 DefaultEventHandler.send(): send the ByteBufferMessageSets for this broker in one ProduceRequest > The gist is that in DEH.groupMessagesToSet(), you don't know wether the raw message in KeyedMessage.message is shallow or not. So I think I missed something... Also it doesn't seem possible to send batch of deep messages in one ProduceRequest. > I would love to provide a patch (or if you tell me that i'm doing it wrong, it's even better), since I can easily test it on my test clusters but I will need guidance here. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira |