|
|
-
Help with encoding issue.
Patricio Echagüe 2012-04-03, 22:22
Hi, I noticed that String Serializer somehow doesn't do well encoding special characters such as "ü".
I tried to create a ByteBufferEncoder this way:
import java.nio.ByteBuffer;
import kafka.message.Message;
import kafka.serializer.Encoder; public class ByteBufferEncoder implements Encoder<ByteBuffer> {
public Message toMessage(ByteBuffer buffer) {
return new Message(buffer);
}
} but I get this exception [1] Could you guys please advice on how to fix my encoding issue?
Thanks [1]
Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34
at kafka.message.Message.compressionCodec(Message.scala:144)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( ByteBufferMessageSet.scala:112)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( ByteBufferMessageSet.scala:138)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( ByteBufferMessageSet.scala:82)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( SyncProducer.scala:139)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( ProducerPool.scala:116)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
at scala.collection.mutable.ResizableArray$class.foreach( ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
at kafka.producer.Producer.zkSend(Producer.scala:143)
at kafka.producer.Producer.send(Producer.scala:105)
at kafka.javaapi.producer.Producer.send(Producer.scala:104)
at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
-
Re: Help with encoding issue.
Jay Kreps 2012-04-03, 23:32
This is our bug, we were taking the system default encoding (d'oh). I have a patch for it I was adding to 0.8, we can probably backport it for older releases too pretty easily.
-Jay
2012/4/3 Patricio Echagüe <[EMAIL PROTECTED]>: > Hi, I noticed that String Serializer somehow doesn't do well encoding > special characters such as "ü". > > I tried to create a ByteBufferEncoder this way: > > import java.nio.ByteBuffer; > > import kafka.message.Message; > > import kafka.serializer.Encoder; > > > public class ByteBufferEncoder implements Encoder<ByteBuffer> { > > public Message toMessage(ByteBuffer buffer) { > > return new Message(buffer); > > } > > } > > > but I get this exception [1] > > > Could you guys please advice on how to fix my encoding issue? > > Thanks > > > [1] > > Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34 > > at kafka.message.Message.compressionCodec(Message.scala:144) > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( > ByteBufferMessageSet.scala:112) > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > ByteBufferMessageSet.scala:138) > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > ByteBufferMessageSet.scala:82) > > at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( > SyncProducer.scala:139) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( > ProducerPool.scala:116) > > at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:57) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > at kafka.producer.ProducerPool.send(ProducerPool.scala:102) > > at kafka.producer.Producer.zkSend(Producer.scala:143) > > at kafka.producer.Producer.send(Producer.scala:105) > > at kafka.javaapi.producer.Producer.send(Producer.scala:104) > > at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
-
Re: Help with encoding issue.
Eric Tschetter 2012-04-03, 23:37
If it's just an encoding issue like Jay says, you might also consider setting the JVM parameter
-Dfile.encoding=UTF-8
That will set the default encoding for the whole JVM process to be UTF-8 which is probably what you want. It is, of course, better to have the Charset specified explicitly, but this might be a good short-term work-around.
--Eric On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > This is our bug, we were taking the system default encoding (d'oh). I > have a patch for it I was adding to 0.8, we can probably backport it > for older releases too pretty easily. > > -Jay > > 2012/4/3 Patricio Echagüe <[EMAIL PROTECTED]>: >> Hi, I noticed that String Serializer somehow doesn't do well encoding >> special characters such as "ü". >> >> I tried to create a ByteBufferEncoder this way: >> >> import java.nio.ByteBuffer; >> >> import kafka.message.Message; >> >> import kafka.serializer.Encoder; >> >> >> public class ByteBufferEncoder implements Encoder<ByteBuffer> { >> >> public Message toMessage(ByteBuffer buffer) { >> >> return new Message(buffer); >> >> } >> >> } >> >> >> but I get this exception [1] >> >> >> Could you guys please advice on how to fix my encoding issue? >> >> Thanks >> >> >> [1] >> >> Exception in thread "main" java.lang.RuntimeException: Invalid magic byte 34 >> >> at kafka.message.Message.compressionCodec(Message.scala:144) >> >> at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( >> ByteBufferMessageSet.scala:112) >> >> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( >> ByteBufferMessageSet.scala:138) >> >> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( >> ByteBufferMessageSet.scala:82) >> >> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> >> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> >> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >> >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >> >> at kafka.message.MessageSet.foreach(MessageSet.scala:87) >> >> at >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( >> SyncProducer.scala:139) >> >> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >> >> at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( >> ProducerPool.scala:116) >> >> at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) >> >> at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) >> >> at scala.collection.mutable.ResizableArray$class.foreach( >> ResizableArray.scala:57) >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) >> >> at kafka.producer.ProducerPool.send(ProducerPool.scala:102) >> >> at kafka.producer.Producer.zkSend(Producer.scala:143) >> >> at kafka.producer.Producer.send(Producer.scala:105) >> >> at kafka.javaapi.producer.Producer.send(Producer.scala:104) >> >> at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
-
Re: Help with encoding issue.
Patricio Echagüe 2012-04-03, 23:40
Interesting. What I can't explain though is why it works just fine when printing the string this way:
for(Message message: stream) {
ByteBuffer bb = message.payload().duplicate();
ByteBuffer bb2 = message.payload().duplicate();
byte[] bytes = new byte[bb2.remaining()];
bb2.get(bytes);
System.out.println("Message received string: " + new String(bytes));
consumerConnector.commitOffsets();
} do you have a link to your patch Jay ? On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <[EMAIL PROTECTED]> wrote:
> This is our bug, we were taking the system default encoding (d'oh). I > have a patch for it I was adding to 0.8, we can probably backport it > for older releases too pretty easily. > > -Jay > > 2012/4/3 Patricio Echagüe <[EMAIL PROTECTED]>: > > Hi, I noticed that String Serializer somehow doesn't do well encoding > > special characters such as "ü". > > > > I tried to create a ByteBufferEncoder this way: > > > > import java.nio.ByteBuffer; > > > > import kafka.message.Message; > > > > import kafka.serializer.Encoder; > > > > > > public class ByteBufferEncoder implements Encoder<ByteBuffer> { > > > > public Message toMessage(ByteBuffer buffer) { > > > > return new Message(buffer); > > > > } > > > > } > > > > > > but I get this exception [1] > > > > > > Could you guys please advice on how to fix my encoding issue? > > > > Thanks > > > > > > [1] > > > > Exception in thread "main" java.lang.RuntimeException: Invalid magic > byte 34 > > > > at kafka.message.Message.compressionCodec(Message.scala:144) > > > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( > > ByteBufferMessageSet.scala:112) > > > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > > ByteBufferMessageSet.scala:138) > > > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > > ByteBufferMessageSet.scala:82) > > > > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( > > SyncProducer.scala:139) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > > > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( > > ProducerPool.scala:116) > > > > at > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > > > at > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > > > at scala.collection.mutable.ResizableArray$class.foreach( > > ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at kafka.producer.ProducerPool.send(ProducerPool.scala:102) > > > > at kafka.producer.Producer.zkSend(Producer.scala:143) > > > > at kafka.producer.Producer.send(Producer.scala:105) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:104) > > > > at > com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63) >
-
Re: Help with encoding issue.
Jun Rao 2012-04-04, 01:18
I think the issue is that you used the wrong API of Message. A client should only use this(bytes: Array[Byte]), instead of this(buffer: ByteBuffer). The latter is for internal use only and includes kafka metadata in it. We probably should restrict the visibility of the latter api.
Thanks,
Jun
On Tue, Apr 3, 2012 at 3:22 PM, Patricio Echagüe <[EMAIL PROTECTED]>wrote:
> Hi, I noticed that String Serializer somehow doesn't do well encoding > special characters such as "ü". > > I tried to create a ByteBufferEncoder this way: > > import java.nio.ByteBuffer; > > import kafka.message.Message; > > import kafka.serializer.Encoder; > > > public class ByteBufferEncoder implements Encoder<ByteBuffer> { > > public Message toMessage(ByteBuffer buffer) { > > return new Message(buffer); > > } > > } > > > but I get this exception [1] > > > Could you guys please advice on how to fix my encoding issue? > > Thanks > > > [1] > > Exception in thread "main" java.lang.RuntimeException: Invalid magic byte > 34 > > at kafka.message.Message.compressionCodec(Message.scala:144) > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( > ByteBufferMessageSet.scala:112) > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > ByteBufferMessageSet.scala:138) > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > ByteBufferMessageSet.scala:82) > > at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( > SyncProducer.scala:139) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( > ProducerPool.scala:116) > > at > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > at > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:57) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > at kafka.producer.ProducerPool.send(ProducerPool.scala:102) > > at kafka.producer.Producer.zkSend(Producer.scala:143) > > at kafka.producer.Producer.send(Producer.scala:105) > > at kafka.javaapi.producer.Producer.send(Producer.scala:104) > > at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63) >
-
Re: Help with encoding issue.
Jay Kreps 2012-04-04, 21:44
Yeah I think I jumped to conclusions. The issue I was referring to was just assuming the detault encoding, which would not cause the issue you described.
-Jay
2012/4/3 Patricio Echagüe <[EMAIL PROTECTED]>: > Interesting. What I can't explain though is why it works just fine when > printing the string this way: > > for(Message message: stream) { > > ByteBuffer bb = message.payload().duplicate(); > > ByteBuffer bb2 = message.payload().duplicate(); > > byte[] bytes = new byte[bb2.remaining()]; > > bb2.get(bytes); > > System.out.println("Message received string: " + new String(bytes)); > > consumerConnector.commitOffsets(); > > } > do you have a link to your patch Jay ? > On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <[EMAIL PROTECTED]> wrote: > >> This is our bug, we were taking the system default encoding (d'oh). I >> have a patch for it I was adding to 0.8, we can probably backport it >> for older releases too pretty easily. >> >> -Jay >> >> 2012/4/3 Patricio Echagüe <[EMAIL PROTECTED]>: >> > Hi, I noticed that String Serializer somehow doesn't do well encoding >> > special characters such as "ü". >> > >> > I tried to create a ByteBufferEncoder this way: >> > >> > import java.nio.ByteBuffer; >> > >> > import kafka.message.Message; >> > >> > import kafka.serializer.Encoder; >> > >> > >> > public class ByteBufferEncoder implements Encoder<ByteBuffer> { >> > >> > public Message toMessage(ByteBuffer buffer) { >> > >> > return new Message(buffer); >> > >> > } >> > >> > } >> > >> > >> > but I get this exception [1] >> > >> > >> > Could you guys please advice on how to fix my encoding issue? >> > >> > Thanks >> > >> > >> > [1] >> > >> > Exception in thread "main" java.lang.RuntimeException: Invalid magic >> byte 34 >> > >> > at kafka.message.Message.compressionCodec(Message.scala:144) >> > >> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( >> > ByteBufferMessageSet.scala:112) >> > >> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( >> > ByteBufferMessageSet.scala:138) >> > >> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( >> > ByteBufferMessageSet.scala:82) >> > >> > at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> > >> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> > >> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >> > >> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >> > >> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >> > >> > at >> > >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( >> > SyncProducer.scala:139) >> > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >> > >> > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( >> > ProducerPool.scala:116) >> > >> > at >> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) >> > >> > at >> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) >> > >> > at scala.collection.mutable.ResizableArray$class.foreach( >> > ResizableArray.scala:57) >> > >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) >> > >> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102) >> > >> > at kafka.producer.Producer.zkSend(Producer.scala:143) >> > >> > at kafka.producer.Producer.send(Producer.scala:105) >> > >> > at kafka.javaapi.producer.Producer.send(Producer.scala:104) >> > >> > at >> com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63) >>
|
|