Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - problems with one of my tests and the latest 0.8 branch


Copy link to this message
-
Re: problems with one of my tests and the latest 0.8 branch
Jason Rosenberg 2013-06-17, 03:15
Yep,

The configs are good.  And my apps are working fine too.  It seems only to
be an issue with this test (and a few others like it).

Jason
On Sun, Jun 16, 2013 at 3:02 PM, Eric Sites <[EMAIL PROTECTED]>wrote:

> Jason,
>
>
> Did you update your config file with the new name of the zookeeper
> settings:
>
> It was renamed from zk.connect to zookeeper.connect.
>
> You should check all of the settings because other setting names have
> changed as well.
>
> Cheers,
> Eric Sites
>
> On 6/16/13 5:14 PM, "Jason Rosenberg" <[EMAIL PROTECTED]> wrote:
>
> >I've started having problems with the latest version of the 0.8 branch.
> > The test below has started failing.  It was working fine with a prior
> >version of 0.8, going back to Apr 30
> >(sha 988d4d8e65a14390abd748318a64e281e4a37c19).
> >
> >I haven't figured out when exactly it started failing, but I saw it with a
> >version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and now
> >also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4).
> >
> >The test code is essentially this (assume a zk server is running with
> >'zkConnect', and a kafka broker running with a metadata port at 'port':
> >
> >    Properties pProps = new Properties();
> >    pProps.put("metadata.broker.list", "localhost:" + port);
> >    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
> >    ProducerConfig pConfig = new ProducerConfig(pProps);
> >    Producer<Integer, String> producer = new Producer<Integer,
> >String>(pConfig);
> >    KeyedMessage<Integer, String> data =
> >        new KeyedMessage<Integer, String>("test-topic", "test-message");
> >    producer.send(data);
> >    producer.close();
> >
> >    Properties cProps = new Properties();
> >    cProps.put("zookeeper.connect", zkConnect);
> >    cProps.put("group.id", "group1");
> >    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
> >    ConsumerConnector consumerConnector =
> >Consumer.createJavaConsumerConnector(consumerConfig);
> >
> >    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
> >
> >consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
> >    List<KafkaStream<byte[], byte[]>> streams =
> >topicMessageStreams.get("test-topic");
> >    final KafkaStream<byte[], byte[]> stream = streams.get(0);
> >    final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
> >
> >    // run in a separate thread
> >    final AtomicBoolean success = new AtomicBoolean(false);
> >    Thread consumeThread = new Thread(new Runnable() {
> >      public void run() {
> >        while (iter.hasNext()) {
> >          byte[] msg = iter.next().message();
> >          String msgStr = new String(msg);
> >          success.set(msgStr.equals("test-message"));
> >          break;
> >        }
> >      }
> >    });
> >
> >    consumeThread.start();
> >    // this now hangs with the latest code
> >    consumeThread.join();
> >
> >    consumerConnector.shutdown();
> >    assertTrue(success.get());
> >
> >The output looks like this:
> >
> >912 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
> >metadata [{TopicMetadata for topic test-topic ->
> >No partition metadata for topic test-topic due to
> >kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
> >kafka.common.LeaderNotAvailableException
> >922 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
> >metadata [{TopicMetadata for topic test-topic ->
> >No partition metadata for topic test-topic due to
> >kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
> >kafka.common.LeaderNotAvailableException
> >923 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> >collate messages by topic, partition due to: Failed to fetch topic
> >metadata
> >for topic: test-topic
> >980 [kafka-request-handler-2] WARN kafka.server.HighwaterMarkCheckpoint  -
> >No highwatermark file is found. Returning 0 as the highwatermark for
> >partition [test-topic,0]