Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> problems with one of my tests and the latest 0.8 branch


Copy link to this message
-
problems with one of my tests and the latest 0.8 branch
I've started having problems with the latest version of the 0.8 branch.
 The test below has started failing.  It was working fine with a prior
version of 0.8, going back to Apr 30
(sha 988d4d8e65a14390abd748318a64e281e4a37c19).

I haven't figured out when exactly it started failing, but I saw it with a
version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and now
also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4).

The test code is essentially this (assume a zk server is running with
'zkConnect', and a kafka broker running with a metadata port at 'port':

    Properties pProps = new Properties();
    pProps.put("metadata.broker.list", "localhost:" + port);
    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
    ProducerConfig pConfig = new ProducerConfig(pProps);
    Producer<Integer, String> producer = new Producer<Integer,
String>(pConfig);
    KeyedMessage<Integer, String> data =
        new KeyedMessage<Integer, String>("test-topic", "test-message");
    producer.send(data);
    producer.close();

    Properties cProps = new Properties();
    cProps.put("zookeeper.connect", zkConnect);
    cProps.put("group.id", "group1");
    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
    ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =

consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
    List<KafkaStream<byte[], byte[]>> streams =
topicMessageStreams.get("test-topic");
    final KafkaStream<byte[], byte[]> stream = streams.get(0);
    final ConsumerIterator<byte[], byte[]> iter = stream.iterator();

    // run in a separate thread
    final AtomicBoolean success = new AtomicBoolean(false);
    Thread consumeThread = new Thread(new Runnable() {
      public void run() {
        while (iter.hasNext()) {
          byte[] msg = iter.next().message();
          String msgStr = new String(msg);
          success.set(msgStr.equals("test-message"));
          break;
        }
      }
    });

    consumeThread.start();
    // this now hangs with the latest code
    consumeThread.join();

    consumerConnector.shutdown();
    assertTrue(success.get());

The output looks like this:

912 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
metadata [{TopicMetadata for topic test-topic ->
No partition metadata for topic test-topic due to
kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
kafka.common.LeaderNotAvailableException
922 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
metadata [{TopicMetadata for topic test-topic ->
No partition metadata for topic test-topic due to
kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
kafka.common.LeaderNotAvailableException
923 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
collate messages by topic, partition due to: Failed to fetch topic metadata
for topic: test-topic
980 [kafka-request-handler-2] WARN kafka.server.HighwaterMarkCheckpoint  -
No highwatermark file is found. Returning 0 as the highwatermark for
partition [test-topic,0]

The consumer never receives a message, and so the test hangs....

This test worked fine as I said with an older version of the branch, but it
would output exceptions about LeaderNotAvailable, etc...

Thoughts?

Jason