Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Java junit test for a Kafka producer returns "failed to collate messages by topic"


Copy link to this message
-
Re: Java junit test for a Kafka producer returns "failed to collate messages by topic"
Joe Stein 2013-09-06, 11:52
The topic maybe is not created at the broker yet ... take a look at
ProducerTest.scala as example

You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
500) to assert that the topic is in fact at the broker before sending after
creating it

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/
On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas <[EMAIL PROTECTED]
> wrote:

> Hello,
>
> I wrote a simple junit test to test a Kafka producer.
>
> public class KafkaProducerTest {
>
>     private int brokerId = 0;
>     private String topic = "test";
>
>     @Test
>     public void producerTest() throws InterruptedException {
>
>         // setup Zookeeper
>         String zkConnect = TestZKUtils.zookeeperConnect();
>         EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>         ZkClient zkClient = new ZkClient(zkServer.connectString());
>
>         // setup Broker
>         int port = TestUtils.choosePort();
>         Properties props = TestUtils.createBrokerConfig(brokerId, port);
>
>         KafkaConfig config = new KafkaConfig(props);
>         Time mock = new MockTime();
>         KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>
>         // create topic
>        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>
>         // setup producer
>         Properties properties =  TestUtils.getProducerConfig("localhost:"
> + port, "kafka.producer.DefaultPartitioner");
>
>         ProducerConfig pConfig = new ProducerConfig(properties);
>         Producer producer = new Producer(pConfig);
>
>         // send message
>         KeyedMessage<Integer, String> data = new KeyedMessage(topic,
> "test-message");
>
>         List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>         messages.add(data);
>
>         producer.send(scala.collection.JavaConversions.asBuffer(messages));
>
>         // cleanup
>         producer.close();
>         kafkaServer.shutdown();
>         zkClient.close();
>         zkServer.shutdown();
>     }
>
> }
>
>
> However when I run the test I get the following error messages
>
> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
> 0)
>         at kafka.producer.Producer.send(Producer.scala:74)
>         at
> de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
> java:57)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
> l.java:43)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
> .java:47)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
> java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
> ava:44)
>         at