Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Hadoop, mail # user - RE: Question related to Decompressor interface


Copy link to this message
-
RE: Question related to Decompressor interface
David Parks 2013-02-10, 02:36
I can't answer your question about the Decompressor interface, but I have a
query for you.

 

Why not just create an EncryptedWritable object? Encrypt/decrypt the bytes
on the read/write method, that should be darn near trivial. Then stick with
good 'ol SequenceFile, which, as you note, is splittable. Otherwise you'd
have to deal with making the output splittable, and given encrypted data,
the only solution that I see is basically rolling your own SequenceFile with
encrypted innards.

 

Come to think of it, a simple, standardized EncryptedWritable object out of
the box with Hadoop would be great. Or perhaps better yet, an
EncryptedWritableWrapper<T extends Writable> so we can convert any existing
Writable into an encrypted form.

 

Dave

 

 

From: java8964 java8964 [mailto:[EMAIL PROTECTED]]
Sent: Sunday, February 10, 2013 3:50 AM
To: [EMAIL PROTECTED]
Subject: Question related to Decompressor interface

 

HI,

 

Currently I am researching about options of encrypting the data in the
MapReduce, as we plan to use the Amazon EMR or EC2 services for our data.

 

I am thinking that the compression codec is good place to integrate with the
encryption logic, and I found out there are some people having the same idea
as mine.

 

I google around and found out this code:

 

https://github.com/geisbruch/HadoopCryptoCompressor/

 

It doesn't seem maintained any more, but it gave me a starting point. I
download the source code, and try to do some tests with it.

 

It doesn't work out of box. There are some bugs I have to fix to make it
work. I believe it contains 'AES' as an example algorithm.

 

But right now, I faced a problem when I tried to use it in my testing
MapReduer program. Here is the stack trace I got:

 

2013-02-08 23:16:47,038 INFO
org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor: buf length 512, and offset = 0, length = -132967308

java.lang.IndexOutOfBoundsException

    at java.nio.ByteBuffer.wrap(ByteBuffer.java:352)

    at
org.apache.hadoop.io.compress.crypto.CryptoBasicDecompressor.setInput(Crypto
BasicDecompressor.java:100)

    at
org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecomp
ressorStream.java:97)

    at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.jav
a:83)

    at java.io.InputStream.read(InputStream.java:82)

    at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)

    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)

    at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineReco
rdReader.java:114)

    at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTas
k.java:458)

    at
org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.
java:76)

    at
org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(Wrapp
edMapper.java:85)

    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)

    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)

    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)

    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.Subject.doAs(Subject.java:396)

    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.ja
va:1332)

    at org.apache.hadoop.mapred.Child.main(Child.java:262)

 

I know the error is thrown out of this custom CryptoBasicDecompressor class,
but I really have questions related to the interface it implemented:
Decompressor.

 

There is limited document about this interface, for example, when and how
the method setInput() will be invoked. If I want to write my own
Decompressor, what do these methods mean in the interface?

In the above case, I enable some debug information, you can see that in this
case, the byte[] array passed to setInput method, only have 512 as the
length, but the 3rd parameter of length passed in is a negative number:
-132967308. That caused the IndexOutOfBoundsException. If I check the
GzipDecompressor class of this method in the hadoop, the code will also
throw IndexOutoutBoundsException in this case, so this is a RuntimeException
case. Why it happened in my test case?

 

Here is my test case:

 

I have a simpel log text file about 700k. I encrypted it with above code
using 'AES'. I can encrypted and decrypted to get my original content. The
file name is foo.log.crypto, this file extension is registered to invoke
this CryptoBasicDecompressor in my testing hadoop using CDH4.1.2 release
(hadoop 2.0). Everything works as I expected. The CryptoBasicDecompressor is
invoked when the input file is foo.log.crypto, as you can see in the above
stack trace. But I don't know why the 3rd parameter (length) in setInput()
is a negative number at runtime.

 

In additional to it, I also have further questions related to use
Compressor/Decompressor to handle the encrypting/decrypting file. Ideally, I
wonder if the encrypting/decrypting can support file splits. This maybe
depends the algorithm we are using, is that right? If so, what kind of
algorithm can do that? I am not sure if it likes the compressor cases, most
of them do not support file split. If so, it maybe not good for my
requirements.

 

If we have a 1G file, encrypted in the Amazone S3, after it copied to the
HDFS of Amazon EMR, can each block of the date be decrypted independently by
each mapper, then passed to the underline RecorderReader to be processed
totally concurrently? Does any one do this before? If so, what encryption
algorithm does support it? Any idea?

 

Thanks

 

Yong
+
Ted Dunning 2013-02-11, 06:08
+
java8964 java8964 2013-02-12, 20:21