Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> having problem with 0.8 gzip compression


+
Scott Wang 2013-07-08, 21:54
Copy link to this message
-
Re: having problem with 0.8 gzip compression
Did you start the consumer before the producer? Be default, the consumer
gets only the new data?

Thanks,

Jun
On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
[EMAIL PROTECTED]> wrote:

> I am testing with Kafka 0.8 beta and having problem of receiving message in
> consumer.  There is no error so does anyone have any insights.  When I
> commented out the "compression.code" everything works fine.
>
> My producer:
> public class TestKafka08Prod {
>
>     public static void main(String [] args) {
>
>         Producer<Integer, String> producer = null;
>         try {
>             Properties props = new Properties();
>             props.put("metadata.broker.list", "localhost:9092");
>             props.put("serializer.class",
> "kafka.serializer.StringEncoder");
>             props.put("producer.type", "sync");
>             props.put("request.required.acks","1");
>             props.put("compression.codec", "gzip");
>             ProducerConfig config = new ProducerConfig(props);
>             producer = new Producer<Integer, String>(config);
>             int j=0;
>             for(int i=0; i<10; i++) {
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> "+System.currentTimeMillis());
>                 producer.send(data);
>
>             }
>
>         } catch (Exception e) {
>             System.out.println("Error happened: ");
>             e.printStackTrace();
>         } finally {
>             if(null != null) {
>                 producer.close();
>             }
>
>             System.out.println("Ened of Sending");
>         }
>
>         System.exit(0);
>     }
> }
>
>
> My consumer:
>
> public class TestKafka08Consumer {
>     public static void main(String [] args) throws UnknownHostException,
> SocketException {
>
>         Properties props = new Properties();
>         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>         props.put("group.id", "test08ConsumerId");
>         props.put("zk.sessiontimeout.ms", "4000");
>         props.put("zk.synctime.ms", "2000");
>         props.put("autocommit.interval.ms", "1000");
>
>         ConsumerConfig consumerConfig = new ConsumerConfig(props);
>
>         ConsumerConnector consumerConnector =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>
>         String topic = "test-topic";
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
>         KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
>
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
>         int counter=0;
>         while(it.hasNext()) {
>             try {
>                 String fromPlatform = new String(it.next().message());
>                 System.out.println("The messages: "+fromPlatform);
>             } catch(Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         System.out.println("SystemOut");
>     }
> }
>
>
> Thanks
>

 
+
Scott Wang 2013-07-09, 04:38
+
Jun Rao 2013-07-09, 15:07
+
Scott Wang 2013-07-09, 18:08
+
Scott Wang 2013-07-09, 22:23
+
Jun Rao 2013-07-10, 03:40
+
Scott Wang 2013-07-10, 17:24
+
Joel Koshy 2013-07-11, 00:33
+
Scott Wang 2013-07-11, 18:42
+
Scott Wang 2013-07-11, 23:02
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB