Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - 'simple' partitioning in 0.8


Copy link to this message
-
'simple' partitioning in 0.8
Chris Curtin 2013-02-25, 19:51
Hi,

In an earlier thread about partitioning on 0.8 I read that you can provide
a key to the KeyedMessage constructor and all the messages with the key
would end up in the same partition, even if you don't provide a partition
function (vs the random assignment of a message to a partition).

When I do this I get an runtime error:

Code:

        long events = Long.parseLong(args[1]);
        int blocks = Integer.parseInt(args[2]);

        Random rnd = new Random();

        Properties props = new Properties();
        props.put("broker.list",
"vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);

        Producer<Integer, String> producer = new Producer<Integer,
String>(config);
          for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String msg = runtime + "," + (50 + nBlocks) + "," +
nEvents+ "," + rnd.nextInt(1000);
                KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test1", nBlocks, msg);
                producer.send(data);
            }
        }
        producer.close();

Runtime error:

0    [main] INFO  kafka.utils.VerifiableProperties  - Verifying properties
33   [main] INFO  kafka.utils.VerifiableProperties  - Property broker.list
is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
33   [main] INFO  kafka.utils.VerifiableProperties  - Property
serializer.class is overridden to kafka.serializer.StringEncoder
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)

Changing the logic to use my own partitioner that accepts a String vs. an
Integer above works correctly. So do I always need to define a partitioning
function?

Thanks,

Chris

 
+
Neha Narkhede 2013-02-25, 20:11