Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> having problem with 0.8 gzip compression


Copy link to this message
-
Re: having problem with 0.8 gzip compression
Did you start the consumer before the producer? Be default, the consumer
gets only the new data?

Thanks,

Jun
On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
[EMAIL PROTECTED]> wrote:

> I am testing with Kafka 0.8 beta and having problem of receiving message in
> consumer.  There is no error so does anyone have any insights.  When I
> commented out the "compression.code" everything works fine.
>
> My producer:
> public class TestKafka08Prod {
>
>     public static void main(String [] args) {
>
>         Producer<Integer, String> producer = null;
>         try {
>             Properties props = new Properties();
>             props.put("metadata.broker.list", "localhost:9092");
>             props.put("serializer.class",
> "kafka.serializer.StringEncoder");
>             props.put("producer.type", "sync");
>             props.put("request.required.acks","1");
>             props.put("compression.codec", "gzip");
>             ProducerConfig config = new ProducerConfig(props);
>             producer = new Producer<Integer, String>(config);
>             int j=0;
>             for(int i=0; i<10; i++) {
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> "+System.currentTimeMillis());
>                 producer.send(data);
>
>             }
>
>         } catch (Exception e) {
>             System.out.println("Error happened: ");
>             e.printStackTrace();
>         } finally {
>             if(null != null) {
>                 producer.close();
>             }
>
>             System.out.println("Ened of Sending");
>         }
>
>         System.exit(0);
>     }
> }
>
>
> My consumer:
>
> public class TestKafka08Consumer {
>     public static void main(String [] args) throws UnknownHostException,
> SocketException {
>
>         Properties props = new Properties();
>         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>         props.put("group.id", "test08ConsumerId");
>         props.put("zk.sessiontimeout.ms", "4000");
>         props.put("zk.synctime.ms", "2000");
>         props.put("autocommit.interval.ms", "1000");
>
>         ConsumerConfig consumerConfig = new ConsumerConfig(props);
>
>         ConsumerConnector consumerConnector =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>
>         String topic = "test-topic";
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
>         KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
>
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
>         int counter=0;
>         while(it.hasNext()) {
>             try {
>                 String fromPlatform = new String(it.next().message());
>                 System.out.println("The messages: "+fromPlatform);
>             } catch(Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         System.out.println("SystemOut");
>     }
> }
>
>
> Thanks
>