Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Java junit test for a Kafka producer returns "failed to collate messages by topic"


Copy link to this message
-
Re: Java junit test for a Kafka producer returns "failed to collate messages by topic"
The topic maybe is not created at the broker yet ... take a look at
ProducerTest.scala as example

You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
500) to assert that the topic is in fact at the broker before sending after
creating it

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/
On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas <[EMAIL PROTECTED]
> wrote:

> Hello,
>
> I wrote a simple junit test to test a Kafka producer.
>
> public class KafkaProducerTest {
>
>     private int brokerId = 0;
>     private String topic = "test";
>
>     @Test
>     public void producerTest() throws InterruptedException {
>
>         // setup Zookeeper
>         String zkConnect = TestZKUtils.zookeeperConnect();
>         EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>         ZkClient zkClient = new ZkClient(zkServer.connectString());
>
>         // setup Broker
>         int port = TestUtils.choosePort();
>         Properties props = TestUtils.createBrokerConfig(brokerId, port);
>
>         KafkaConfig config = new KafkaConfig(props);
>         Time mock = new MockTime();
>         KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>
>         // create topic
>        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>
>         // setup producer
>         Properties properties =  TestUtils.getProducerConfig("localhost:"
> + port, "kafka.producer.DefaultPartitioner");
>
>         ProducerConfig pConfig = new ProducerConfig(properties);
>         Producer producer = new Producer(pConfig);
>
>         // send message
>         KeyedMessage<Integer, String> data = new KeyedMessage(topic,
> "test-message");
>
>         List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>         messages.add(data);
>
>         producer.send(scala.collection.JavaConversions.asBuffer(messages));
>
>         // cleanup
>         producer.close();
>         kafkaServer.shutdown();
>         zkClient.close();
>         zkServer.shutdown();
>     }
>
> }
>
>
> However when I run the test I get the following error messages
>
> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
> 0)
>         at kafka.producer.Producer.send(Producer.scala:74)
>         at
> de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
> java:57)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
> l.java:43)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
> .java:47)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
> java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
> ava:44)
>         at

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB