|
|
+
HUI CHEN 2013-01-24, 15:58
-
Re: Different Result in ConsumerNeha Narkhede 2013-01-24, 16:47
Which version of Kafka are you using ?
Your problem is not very clear to me. Do you have a producer sending data to the topic you are consuming from continuously and still your consumer doesn't receive data incrementally ? Thanks, Neha On Thu, Jan 24, 2013 at 5:53 AM, HUI CHEN <[EMAIL PROTECTED]> wrote: > I write the code just same as the code given on kafka website like this: > package com.a2.kafka.consumer; > > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > > import kafka.consumer.Consumer; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.message.Message; > import kafka.message.MessageAndMetadata; > > public class CommonConsumer { > public static void main(String[] args) { > // specify some consumer properties > Properties props = new Properties(); > props.put("zk.connect", "192.168.181.128:2181"); > props.put("zk.connectiontimeout.ms", "1000000"); > props.put("groupid", "test_group"); > > // Create the connection to the cluster > ConsumerConfig consumerConfig = new ConsumerConfig(props); > ConsumerConnector consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > > Map<String, Integer> map=new HashMap<String,Integer>(); > map.put("test", 4); > // create 4 partitions of the stream for topic “test”, to allow 4 threads > to consume > Map<String, List<KafkaStream<Message>>> topicMessageStreams = > consumerConnector.createMessageStreams(map); > List<KafkaStream<Message>> streams = topicMessageStreams.get("test"); > > // create list of 4 threads to consume from each of the partitions > ExecutorService executor = Executors.newFixedThreadPool(4); > > // consume the messages in the threads > for(final KafkaStream<Message> stream: streams) { > executor.submit(new Runnable() { > public void run() { > for(MessageAndMetadata<Message> msgAndMetadata: stream) { > // process message (msgAndMetadata.message()) > System.out.println(msgAndMetadata.message()); > } > } > }); > } > } > } > > > but sometimes the program can get All info from topic ,but sometimes it can > get nothing from the client . > I want to count the total of the log in this topic, what should i write the > program. > thanks very much. > |