Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Different Result in Consumer


Copy link to this message
-
Re: Different Result in Consumer
Which version of Kafka are you using ?

Your problem is not very clear to me. Do you have a producer sending data
to the topic you are consuming from continuously and still your consumer
doesn't receive data incrementally ?

Thanks,
Neha
On Thu, Jan 24, 2013 at 5:53 AM, HUI CHEN <[EMAIL PROTECTED]> wrote:

> I write the code just same as the code given on kafka website like this:
> package com.a2.kafka.consumer;
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
> import kafka.message.MessageAndMetadata;
>
> public class CommonConsumer {
> public static void main(String[] args) {
> // specify some consumer properties
> Properties props = new Properties();
> props.put("zk.connect", "192.168.181.128:2181");
> props.put("zk.connectiontimeout.ms", "1000000");
> props.put("groupid", "test_group");
>
> // Create the connection to the cluster
> ConsumerConfig consumerConfig = new ConsumerConfig(props);
> ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
>
> Map<String, Integer> map=new HashMap<String,Integer>();
> map.put("test", 4);
> // create 4 partitions of the stream for topic “test”, to allow 4 threads
> to consume
> Map<String, List<KafkaStream<Message>>> topicMessageStreams =
>     consumerConnector.createMessageStreams(map);
> List<KafkaStream<Message>> streams = topicMessageStreams.get("test");
>
> // create list of 4 threads to consume from each of the partitions
> ExecutorService executor = Executors.newFixedThreadPool(4);
>
> // consume the messages in the threads
> for(final KafkaStream<Message> stream: streams) {
>   executor.submit(new Runnable() {
>     public void run() {
>       for(MessageAndMetadata<Message> msgAndMetadata: stream) {
>         // process message (msgAndMetadata.message())
>       System.out.println(msgAndMetadata.message());
>       }
>     }
>   });
> }
> }
> }
>
>
> but sometimes the program can get All info from topic ,but sometimes it can
> get nothing from the client .
> I want to count the total of the log in this topic, what should i write the
> program.
> thanks very much.
>

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB