Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Different Result in Consumer


+
HUI CHEN 2013-01-24, 15:58
Copy link to this message
-
Re: Different Result in Consumer
Which version of Kafka are you using ?

Your problem is not very clear to me. Do you have a producer sending data
to the topic you are consuming from continuously and still your consumer
doesn't receive data incrementally ?

Thanks,
Neha
On Thu, Jan 24, 2013 at 5:53 AM, HUI CHEN <[EMAIL PROTECTED]> wrote:

> I write the code just same as the code given on kafka website like this:
> package com.a2.kafka.consumer;
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
> import kafka.message.MessageAndMetadata;
>
> public class CommonConsumer {
> public static void main(String[] args) {
> // specify some consumer properties
> Properties props = new Properties();
> props.put("zk.connect", "192.168.181.128:2181");
> props.put("zk.connectiontimeout.ms", "1000000");
> props.put("groupid", "test_group");
>
> // Create the connection to the cluster
> ConsumerConfig consumerConfig = new ConsumerConfig(props);
> ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
>
> Map<String, Integer> map=new HashMap<String,Integer>();
> map.put("test", 4);
> // create 4 partitions of the stream for topic “test”, to allow 4 threads
> to consume
> Map<String, List<KafkaStream<Message>>> topicMessageStreams =
>     consumerConnector.createMessageStreams(map);
> List<KafkaStream<Message>> streams = topicMessageStreams.get("test");
>
> // create list of 4 threads to consume from each of the partitions
> ExecutorService executor = Executors.newFixedThreadPool(4);
>
> // consume the messages in the threads
> for(final KafkaStream<Message> stream: streams) {
>   executor.submit(new Runnable() {
>     public void run() {
>       for(MessageAndMetadata<Message> msgAndMetadata: stream) {
>         // process message (msgAndMetadata.message())
>       System.out.println(msgAndMetadata.message());
>       }
>     }
>   });
> }
> }
> }
>
>
> but sometimes the program can get All info from topic ,but sometimes it can
> get nothing from the client .
> I want to count the total of the log in this topic, what should i write the
> program.
> thanks very much.
>