Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> [jira] [Closed] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum


Copy link to this message
-
[jira] [Closed] (KAFKA-946) Kafka Hadoop Consumer fails when verifying message checksum

     [ https://issues.apache.org/jira/browse/KAFKA-946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao closed KAFKA-946.
-------------------------

    
> Kafka Hadoop Consumer fails when verifying message checksum
> -----------------------------------------------------------
>
>                 Key: KAFKA-946
>                 URL: https://issues.apache.org/jira/browse/KAFKA-946
>             Project: Kafka
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Sam Meder
>            Priority: Critical
>             Fix For: 0.8
>
>         Attachments: hadoop_consumer_1.patch
>
>
> The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:
>     protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
> if (_messageIt != null && _messageIt.hasNext()) {
>             MessageAndOffset messageAndOffset = _messageIt.next();
>             ByteBuffer buf = messageAndOffset.message().payload();
>             int origSize = buf.remaining();
>             byte[] bytes = new byte[origSize];
>           buf.get(bytes, buf.position(), origSize);
>             value.set(bytes, 0, origSize);
>             key.set(_index, _offset, messageAndOffset.message().checksum());
>             _offset = messageAndOffset.nextOffset();  //increase offset                                                                                                                                  
>             _count ++;  //increase count                                                                                                                                                                
>             return true;
>         }
>         else return false;
>     }
> Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:
>     @Override
>     public void map(KafkaETLKey key, BytesWritable val,
>             OutputCollector<LongWritable, Text> collector,
>             Reporter reporter) throws IOException {
> byte[] bytes = KafkaETLUtils.getBytes(val);
>         //check the checksum of message                                                                                                                                                                  
>         Message message = new Message(bytes);
>         long checksum = key.getChecksum();
> if (checksum != message.checksum())
>             throw new IOException ("Invalid message checksum "
>                                             + message.checksum() + ". Expected " + key + ".");
> the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB