Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> 'simple' partitioning in 0.8


Copy link to this message
-
Re: 'simple' partitioning in 0.8
Chris,

Partitioning is independent of serialization. The key data is serialized
using the serializer specified through the "key.serializer.class" property
in the Producer. It defaults to a no-op encoder if you don't specify one.
Here, since you want to use an integer key, you'd have to plug in an
Encoder that can serialize integer data to a byte array.

Thanks,
Neha
On Mon, Feb 25, 2013 at 11:50 AM, Chris Curtin <[EMAIL PROTECTED]>wrote:

> Hi,
>
> In an earlier thread about partitioning on 0.8 I read that you can provide
> a key to the KeyedMessage constructor and all the messages with the key
> would end up in the same partition, even if you don't provide a partition
> function (vs the random assignment of a message to a partition).
>
> When I do this I get an runtime error:
>
> Code:
>
>         long events = Long.parseLong(args[1]);
>         int blocks = Integer.parseInt(args[2]);
>
>         Random rnd = new Random();
>
>         Properties props = new Properties();
>         props.put("broker.list",
> "vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>         ProducerConfig config = new ProducerConfig(props);
>
>         Producer<Integer, String> producer = new Producer<Integer,
> String>(config);
>           for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
>             for (long nEvents = 0; nEvents < events; nEvents++) {
>                 long runtime = new Date().getTime();
>                 String msg = runtime + "," + (50 + nBlocks) + "," +
> nEvents+ "," + rnd.nextInt(1000);
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test1", nBlocks, msg);
>                 producer.send(data);
>             }
>         }
>         producer.close();
>
> Runtime error:
>
> 0    [main] INFO  kafka.utils.VerifiableProperties  - Verifying properties
> 33   [main] INFO  kafka.utils.VerifiableProperties  - Property broker.list
> is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
> 33   [main] INFO  kafka.utils.VerifiableProperties  - Property
> serializer.class is overridden to kafka.serializer.StringEncoder
> Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
> cannot be cast to java.lang.String
> at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
> at
>
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)
>
> Changing the logic to use my own partitioner that accepts a String vs. an
> Integer above works correctly. So do I always need to define a partitioning
> function?
>
> Thanks,
>
> Chris
>

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB