Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> having problem with 0.8 gzip compression


Copy link to this message
-
Re: having problem with 0.8 gzip compression
No, I did not start the consumer before the producer.  I actually started
the producer first and nothing showed up in the consumer unless I commented
out this line -- props.put("compression.codec", "gzip").    If I commented
out the compression codec, everything just works.
On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Did you start the consumer before the producer? Be default, the consumer
> gets only the new data?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> [EMAIL PROTECTED]> wrote:
>
> > I am testing with Kafka 0.8 beta and having problem of receiving message
> in
> > consumer.  There is no error so does anyone have any insights.  When I
> > commented out the "compression.code" everything works fine.
> >
> > My producer:
> > public class TestKafka08Prod {
> >
> >     public static void main(String [] args) {
> >
> >         Producer<Integer, String> producer = null;
> >         try {
> >             Properties props = new Properties();
> >             props.put("metadata.broker.list", "localhost:9092");
> >             props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> >             props.put("producer.type", "sync");
> >             props.put("request.required.acks","1");
> >             props.put("compression.codec", "gzip");
> >             ProducerConfig config = new ProducerConfig(props);
> >             producer = new Producer<Integer, String>(config);
> >             int j=0;
> >             for(int i=0; i<10; i++) {
> >                 KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > "+System.currentTimeMillis());
> >                 producer.send(data);
> >
> >             }
> >
> >         } catch (Exception e) {
> >             System.out.println("Error happened: ");
> >             e.printStackTrace();
> >         } finally {
> >             if(null != null) {
> >                 producer.close();
> >             }
> >
> >             System.out.println("Ened of Sending");
> >         }
> >
> >         System.exit(0);
> >     }
> > }
> >
> >
> > My consumer:
> >
> > public class TestKafka08Consumer {
> >     public static void main(String [] args) throws UnknownHostException,
> > SocketException {
> >
> >         Properties props = new Properties();
> >         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> >         props.put("group.id", "test08ConsumerId");
> >         props.put("zk.sessiontimeout.ms", "4000");
> >         props.put("zk.synctime.ms", "2000");
> >         props.put("autocommit.interval.ms", "1000");
> >
> >         ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >
> >         ConsumerConnector consumerConnector =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >
> >         String topic = "test-topic";
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> >         topicCountMap.put(topic, new Integer(1));
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumerConnector.createMessageStreams(topicCountMap);
> >         KafkaStream<byte[], byte[]> stream =
> >  consumerMap.get(topic).get(0);
> >
> >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >
> >         int counter=0;
> >         while(it.hasNext()) {
> >             try {
> >                 String fromPlatform = new String(it.next().message());
> >                 System.out.println("The messages: "+fromPlatform);
> >             } catch(Exception e) {
> >                 e.printStackTrace();
> >             }
> >         }
> >         System.out.println("SystemOut");
> >     }
> > }
> >
> >
> > Thanks
> >
>

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB