Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> problems with one of my tests and the latest 0.8 branch


Copy link to this message
-
Re: problems with one of my tests and the latest 0.8 branch
Jason,
Did you update your config file with the new name of the zookeeper
settings:

It was renamed from zk.connect to zookeeper.connect.

You should check all of the settings because other setting names have
changed as well.

Cheers,
Eric Sites

On 6/16/13 5:14 PM, "Jason Rosenberg" <[EMAIL PROTECTED]> wrote:

>I've started having problems with the latest version of the 0.8 branch.
> The test below has started failing.  It was working fine with a prior
>version of 0.8, going back to Apr 30
>(sha 988d4d8e65a14390abd748318a64e281e4a37c19).
>
>I haven't figured out when exactly it started failing, but I saw it with a
>version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and now
>also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4).
>
>The test code is essentially this (assume a zk server is running with
>'zkConnect', and a kafka broker running with a metadata port at 'port':
>
>    Properties pProps = new Properties();
>    pProps.put("metadata.broker.list", "localhost:" + port);
>    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
>    ProducerConfig pConfig = new ProducerConfig(pProps);
>    Producer<Integer, String> producer = new Producer<Integer,
>String>(pConfig);
>    KeyedMessage<Integer, String> data =
>        new KeyedMessage<Integer, String>("test-topic", "test-message");
>    producer.send(data);
>    producer.close();
>
>    Properties cProps = new Properties();
>    cProps.put("zookeeper.connect", zkConnect);
>    cProps.put("group.id", "group1");
>    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
>    ConsumerConnector consumerConnector =
>Consumer.createJavaConsumerConnector(consumerConfig);
>
>    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
>
>consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
>    List<KafkaStream<byte[], byte[]>> streams =
>topicMessageStreams.get("test-topic");
>    final KafkaStream<byte[], byte[]> stream = streams.get(0);
>    final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
>
>    // run in a separate thread
>    final AtomicBoolean success = new AtomicBoolean(false);
>    Thread consumeThread = new Thread(new Runnable() {
>      public void run() {
>        while (iter.hasNext()) {
>          byte[] msg = iter.next().message();
>          String msgStr = new String(msg);
>          success.set(msgStr.equals("test-message"));
>          break;
>        }
>      }
>    });
>
>    consumeThread.start();
>    // this now hangs with the latest code
>    consumeThread.join();
>
>    consumerConnector.shutdown();
>    assertTrue(success.get());
>
>The output looks like this:
>
>912 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
>metadata [{TopicMetadata for topic test-topic ->
>No partition metadata for topic test-topic due to
>kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
>kafka.common.LeaderNotAvailableException
>922 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while fetching
>metadata [{TopicMetadata for topic test-topic ->
>No partition metadata for topic test-topic due to
>kafka.common.LeaderNotAvailableException}] for topic [test-topic]: class
>kafka.common.LeaderNotAvailableException
>923 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
>collate messages by topic, partition due to: Failed to fetch topic
>metadata
>for topic: test-topic
>980 [kafka-request-handler-2] WARN kafka.server.HighwaterMarkCheckpoint  -
>No highwatermark file is found. Returning 0 as the highwatermark for
>partition [test-topic,0]
>
>The consumer never receives a message, and so the test hangs....
>
>This test worked fine as I said with an older version of the branch, but
>it
>would output exceptions about LeaderNotAvailable, etc...
>
>Thoughts?
>
>Jason
 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB