Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> 'simple' partitioning in 0.8


Copy link to this message
-
Re: 'simple' partitioning in 0.8
Chris,

Partitioning is independent of serialization. The key data is serialized
using the serializer specified through the "key.serializer.class" property
in the Producer. It defaults to a no-op encoder if you don't specify one.
Here, since you want to use an integer key, you'd have to plug in an
Encoder that can serialize integer data to a byte array.

Thanks,
Neha
On Mon, Feb 25, 2013 at 11:50 AM, Chris Curtin <[EMAIL PROTECTED]>wrote:

> Hi,
>
> In an earlier thread about partitioning on 0.8 I read that you can provide
> a key to the KeyedMessage constructor and all the messages with the key
> would end up in the same partition, even if you don't provide a partition
> function (vs the random assignment of a message to a partition).
>
> When I do this I get an runtime error:
>
> Code:
>
>         long events = Long.parseLong(args[1]);
>         int blocks = Integer.parseInt(args[2]);
>
>         Random rnd = new Random();
>
>         Properties props = new Properties();
>         props.put("broker.list",
> "vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>         ProducerConfig config = new ProducerConfig(props);
>
>         Producer<Integer, String> producer = new Producer<Integer,
> String>(config);
>           for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
>             for (long nEvents = 0; nEvents < events; nEvents++) {
>                 long runtime = new Date().getTime();
>                 String msg = runtime + "," + (50 + nBlocks) + "," +
> nEvents+ "," + rnd.nextInt(1000);
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test1", nBlocks, msg);
>                 producer.send(data);
>             }
>         }
>         producer.close();
>
> Runtime error:
>
> 0    [main] INFO  kafka.utils.VerifiableProperties  - Verifying properties
> 33   [main] INFO  kafka.utils.VerifiableProperties  - Property broker.list
> is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
> 33   [main] INFO  kafka.utils.VerifiableProperties  - Property
> serializer.class is overridden to kafka.serializer.StringEncoder
> Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
> cannot be cast to java.lang.String
> at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
> at
>
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)
>
> Changing the logic to use my own partitioner that accepts a String vs. an
> Integer above works correctly. So do I always need to define a partitioning
> function?
>
> Thanks,
>
> Chris
>