Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> compression over-the-wire with 1.3.1 ?


Copy link to this message
-
Re: compression over-the-wire with 1.3.1 ?
Hi,

The spool dir source does not have built in functionality to read
compressed files. One could be built, but I think it would either require a
subclass of
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializer.javaor
changes to
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.javaso
that it recognized compressed files.

In regards to over the wire compression, there is an open review on that
now:

https://reviews.apache.org/r/9427/

Brock
On Sat, Feb 23, 2013 at 9:17 AM, Langston, Jim
<[EMAIL PROTECTED]>wrote:

>  Hi all,
>
>  A question on sending compressed files from a remote source
> to HDFS.
>
>  I have been working with .94 of flume and gzip a file before sending
> it from a remote location to a HDFS cluster. Works great.
>
>  Now, I'm looking to move to 1.2 or 1.3.1 (CDH4 installs 1.2 by
> default through the management tool), but I don't see the
> equivalent in 1.2 or 1.3.1. I found the reference to utilize the
> new source in 1.3.1, spoolDir, but when I try to pick up a compressed
> file in the spool directory I'm getting an error:
>
>  13/02/22 19:32:41 ERROR source.SpoolDirectorySource: Uncaught exception
> in Runnable
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: org.apache.flume.channel.MemoryChannel{name: c1}
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:143)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
> at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> ... 9 more
>
>
>  I have tried to increase the buffer size but it did not change the
> error. My current configuration file which
> generated the error:
>
>  # Compressed Source
> agent_compressed.sources = r1
> agent_compressed.channels = c1
> agent_compressed.channels.c1.type = memory
> agent_compressed.sources.r1.type = spooldir
> agent_compressed.sources.r1.bufferMaxLineLength = 50000
> agent_compressed.sources.r1.spoolDir = /tmp/COMPRESS
> agent_compressed.sources.r1.fileHeader = true
> agent_compressed.sources.r1.channels = c1
>
>  # Sink for Avro
> agent_compressed.sinks = avroSink-2
> agent_compressed.sinks.avroSink-2.type = avro
> agent_compressed.sinks.avroSink-2.channel = c1
> agent_compressed.sinks.avroSink-2.hostname = xxx.xxx.xxx.xxx
> agent_compressed.sinks.avroSink-2.port = xxxxx
>
>
>  Thoughts? Hints ?
>
>
>  Thanks,
>
>  Jim
>
>
--
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/