|
|
-
Different Result in Consumer
HUI CHEN 2013-01-24, 15:58
I write the code just same as the code given on kafka website like this: package com.a2.kafka.consumer;
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata;
public class CommonConsumer { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zk.connect", "192.168.181.128:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group");
// Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map=new HashMap<String,Integer>(); map.put("test", 4); // create 4 partitions of the stream for topic ¡°test¡±, to allow 4 threads to consume Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads for(final KafkaStream<Message> stream: streams) { executor.submit(new Runnable() { public void run() { for(MessageAndMetadata<Message> msgAndMetadata: stream) { // process message (msgAndMetadata.message()) System.out.println(msgAndMetadata.message()); } } }); } } } but sometimes the program can get All info from topic £¬but sometimes it can get nothing from the client . I want to count the total of the log in this topic, what should i write the program. thanks very much.
-
Re: Different Result in Consumer
Neha Narkhede 2013-01-24, 16:47
Which version of Kafka are you using ?
Your problem is not very clear to me. Do you have a producer sending data to the topic you are consuming from continuously and still your consumer doesn't receive data incrementally ?
Thanks, Neha On Thu, Jan 24, 2013 at 5:53 AM, HUI CHEN <[EMAIL PROTECTED]> wrote:
> I write the code just same as the code given on kafka website like this: > package com.a2.kafka.consumer; > > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > > import kafka.consumer.Consumer; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.message.Message; > import kafka.message.MessageAndMetadata; > > public class CommonConsumer { > public static void main(String[] args) { > // specify some consumer properties > Properties props = new Properties(); > props.put("zk.connect", "192.168.181.128:2181"); > props.put("zk.connectiontimeout.ms", "1000000"); > props.put("groupid", "test_group"); > > // Create the connection to the cluster > ConsumerConfig consumerConfig = new ConsumerConfig(props); > ConsumerConnector consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > > Map<String, Integer> map=new HashMap<String,Integer>(); > map.put("test", 4); > // create 4 partitions of the stream for topic “test”, to allow 4 threads > to consume > Map<String, List<KafkaStream<Message>>> topicMessageStreams = > consumerConnector.createMessageStreams(map); > List<KafkaStream<Message>> streams = topicMessageStreams.get("test"); > > // create list of 4 threads to consume from each of the partitions > ExecutorService executor = Executors.newFixedThreadPool(4); > > // consume the messages in the threads > for(final KafkaStream<Message> stream: streams) { > executor.submit(new Runnable() { > public void run() { > for(MessageAndMetadata<Message> msgAndMetadata: stream) { > // process message (msgAndMetadata.message()) > System.out.println(msgAndMetadata.message()); > } > } > }); > } > } > } > > > but sometimes the program can get All info from topic ,but sometimes it can > get nothing from the client . > I want to count the total of the log in this topic, what should i write the > program. > thanks very much. >
|
|
All projects made searchable here are trademarks of the Apache Software Foundation.
Service operated by
Sematext