Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - Java junit test for a Kafka producer returns "failed to collate messages by topic"


Copy link to this message
-
Java junit test for a Kafka producer returns "failed to collate messages by topic"
"Maier, Dr. Andreas" 2013-09-06, 08:42
Hello,

I wrote a simple junit test to test a Kafka producer.

public class KafkaProducerTest {

    private int brokerId = 0;
    private String topic = "test";

    @Test
    public void producerTest() throws InterruptedException {

        // setup Zookeeper
        String zkConnect = TestZKUtils.zookeeperConnect();
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
        ZkClient zkClient = new ZkClient(zkServer.connectString());

        // setup Broker
        int port = TestUtils.choosePort();
        Properties props = TestUtils.createBrokerConfig(brokerId, port);

        KafkaConfig config = new KafkaConfig(props);
        Time mock = new MockTime();
        KafkaServer kafkaServer = TestUtils.createServer(config, mock);

        // create topic
       AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());

        // setup producer
        Properties properties =  TestUtils.getProducerConfig("localhost:"
+ port, "kafka.producer.DefaultPartitioner");

        ProducerConfig pConfig = new ProducerConfig(properties);
        Producer producer = new Producer(pConfig);

        // send message
        KeyedMessage<Integer, String> data = new KeyedMessage(topic,
"test-message");

        List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
        messages.add(data);

        producer.send(scala.collection.JavaConversions.asBuffer(messages));

        // cleanup
        producer.close();
        kafkaServer.shutdown();
        zkClient.close();
        zkServer.shutdown();
    }

}
However when I run the test I get the following error messages

[2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)

kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
0)
at kafka.producer.Producer.send(Producer.scala:74)
at
de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
7)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
l.java:43)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
ava:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja
va:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
va:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
va:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest
Runner.java:77)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt
arter.java:195)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
7)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

[2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)

I tried to write my unit test following the scala unit tests in the kafka
core.
But it seems like I'm still missing something basic to make it work.
Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
compiled the latest Kafka
(plus the TestUtils) from the git repository using Scala 2.9.2.
Regards,

Andreas Maier
 
+
Joe Stein 2013-09-06, 11:52
+
Maier, Dr. Andreas 2013-09-06, 13:26