Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> 'simple' partitioning in 0.8


Copy link to this message
-
'simple' partitioning in 0.8
Hi,

In an earlier thread about partitioning on 0.8 I read that you can provide
a key to the KeyedMessage constructor and all the messages with the key
would end up in the same partition, even if you don't provide a partition
function (vs the random assignment of a message to a partition).

When I do this I get an runtime error:

Code:

        long events = Long.parseLong(args[1]);
        int blocks = Integer.parseInt(args[2]);

        Random rnd = new Random();

        Properties props = new Properties();
        props.put("broker.list",
"vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);

        Producer<Integer, String> producer = new Producer<Integer,
String>(config);
          for (int nBlocks = 0; nBlocks < blocks; nBlocks++) {
            for (long nEvents = 0; nEvents < events; nEvents++) {
                long runtime = new Date().getTime();
                String msg = runtime + "," + (50 + nBlocks) + "," +
nEvents+ "," + rnd.nextInt(1000);
                KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test1", nBlocks, msg);
                producer.send(data);
            }
        }
        producer.close();

Runtime error:

0    [main] INFO  kafka.utils.VerifiableProperties  - Verifying properties
33   [main] INFO  kafka.utils.VerifiableProperties  - Property broker.list
is overridden to vrd01.atlnp1:9092,vrd02.atlnp1:9092,vrd03.atlnp1:9092
33   [main] INFO  kafka.utils.VerifiableProperties  - Property
serializer.class is overridden to kafka.serializer.StringEncoder
Exception in thread "main" java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Encoder.scala:46)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:126)
at
kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:123)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
at
kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:123)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at com.silverpop.kafka.playproducer.TestProducer.main(TestProducer.java:31)

Changing the logic to use my own partitioner that accepts a String vs. an
Integer above works correctly. So do I always need to define a partitioning
function?

Thanks,

Chris

 
+
Neha Narkhede 2013-02-25, 20:11
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB