Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Different Result in Consumer


Copy link to this message
-
Different Result in Consumer
I write the code just same as the code given on kafka website like this:
package com.a2.kafka.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class CommonConsumer {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "192.168.181.128:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);

Map<String, Integer> map=new HashMap<String,Integer>();
map.put("test", 4);
// create 4 partitions of the stream for topic ¡°test¡±, to allow 4 threads
to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
    consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("test");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);

// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
  executor.submit(new Runnable() {
    public void run() {
      for(MessageAndMetadata<Message> msgAndMetadata: stream) {
        // process message (msgAndMetadata.message())
      System.out.println(msgAndMetadata.message());
      }
    }
  });
}
}
}
but sometimes the program can get All info from topic £¬but sometimes it can
get nothing from the client .
I want to count the total of the log in this topic, what should i write the
program.
thanks very much.

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB