Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> HDF Sink Additional Bytes added for File Events


+
Peyman Mohajerian 2013-07-12, 23:36
Copy link to this message
-
Re: HDF Sink Additional Bytes added for File Events
Peyman,

I would like to understand the original types for the gzip files your
custom source is consuming. Are these binary files or text files before
they are compressed?

Is the entire file a single event, or does it contain delimiters that mark
where one event ends and another one starts?

You may be able to get around consuming the gzip files by decompressing it
first before reading it.

This way, the uncompressed bytes are not corrupted if additional data gets
appended to the event body or headers.

Here are some tools that could help:

http://docs.oracle.com/javase/6/docs/api/java/util/zip/GZIPInputStream.html
http://docs.oracle.com/javase/6/docs/api/java/util/zip/ZipInputStream.html
http://commons.apache.org/proper/commons-compress/apidocs/org/apache/commons/compress/compressors/gzip/package-summary.html

What do you think about this direction?

*Author and Instructor for the Upcoming Book and Lecture Series*
*Massive Log Data Aggregation, Processing, Searching and Visualization with
Open Source Software*
*http://massivelogdata.com*
On 12 July 2013 19:36, Peyman Mohajerian <[EMAIL PROTECTED]> wrote:

>  Hi Guys,****
>
> ** **
>
> I have a custom source and consuming whole ‘gz’ files as byte arrays and
> each file is a single event. I’d like to write the file to HDFS. During the
> write some additional bytes are added and therefore file is corrupted, not
> able to unzip it any more. I know this is not a good usecase for Flume but
> I’d like to keep a consistent data collection design and was hoping I could
> pass full gz files to HDFS without the file being corrupted. Either the
> ‘timestamp’ header is causing issue or the ‘text’ file format, but I’m not
> sure. Any solution?****
>
> Thanks,****
>
> Peyman****
>
> ** **
>
> XXX.sources = *xxx*****
>
> XXX.channels = MemChannel****
>
> XXX.sinks = HDFS****
>
> ** **
>
> XXX.sources.xxx.type = com.xxx.xxx.xxx.Source****
>
> XXX.sources.xxx.channels = MemChannel****
>
> ** **
>
> XXX.sinks.HDFS.channel = MemChannel****
>
> XXX.sinks.HDFS.type = *hdfs*****
>
> XXX.sinks.HDFS.hdfs.path = *hdfs*://xxxx/user/xxx/*xxx*/*gzfiles*
> /%Y/%m/%d/****
>
> XXX.sinks.HDFS.hdfs.fileType = DataStream****
>
> XXX.sinks.HDFS.hdfs.filePrefix = xxxx****
>
> XXX.sinks.HDFS.hdfs.batchSize = 1****
>
> XXX.sinks.HDFS.hdfs.rollSize = 0****
>
> XXX.sinks.HDFS.hdfs.idleTimeout = 3****
>
> XXX.sinks.HDFS.hdfs.rollInterval = 0****
>
> XXX.sinks.HDFS.hdfs.rollCount = 1****
>
> ** **
>
> XXX.channels.MemChannel.type = memory****
>
> XXX.channels.MemChannel.capacity = 1****
>
> XXX.channels.MemChannel.transactionCapacity = 1****
>
> XXX.channels.MemChannel.byteCapacityBufferPercentag = 100****
>
> ** **
>
> ** **
>
> InputStream in = Toolbox.*inputStreamUrlConnection*(url, account.getAuth1(),
> account.getAuth2());               ****
>
> outputStream = *new* ByteArrayOutputStream();****
>
> *byte*[] buf = *new* *byte*[1024]; // optimize the size of buffer to your
> need****
>
> *int* num;****
>
> *while* ((num = in.read(buf)) != -1) {****
>
>        outputStream.write(buf, 0, num);****
>
> }****
>
> headers.put("timestamp", String.*valueOf*(*new* Date().getTime()));****
>
> Event e = EventBuilder.*withBody*(outputStream.toByteArray(), headers);***
> *
>
> getChannelProcessor().processEvent(e);****
>
> ** **
>
> ** **
>
+
Peyman Mohajerian 2013-07-19, 16:20