Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> compression over-the-wire with 1.3.1 ?

Langston, Jim 2013-02-23, 15:17
Brock Noland 2013-02-23, 19:31
Copy link to this message
Re: compression over-the-wire with 1.3.1 ?
Thanks Brock,

It seems that the compression was dropped. Was that intended ? There
was a lot of work done in the .94 version. I move a tremendous amount
of data between clusters and use compression and batching.


looks like I will need to spend time working with both versions.


From: Brock Noland <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>>
Date: Sat, 23 Feb 2013 13:31:40 -0600
Subject: Re: compression over-the-wire with 1.3.1 ?


The spool dir source does not have built in functionality to read compressed files. One could be built, but I think it would either require a subclass of https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializer.java or changes to https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java so that it recognized compressed files.

In regards to over the wire compression, there is an open review on that now:


On Sat, Feb 23, 2013 at 9:17 AM, Langston, Jim <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> wrote:
Hi all,

A question on sending compressed files from a remote source
to HDFS.

I have been working with .94 of flume and gzip a file before sending
it from a remote location to a HDFS cluster. Works great.

Now, I'm looking to move to 1.2 or 1.3.1 (CDH4 installs 1.2 by
default through the management tool), but I don't see the
equivalent in 1.2 or 1.3.1. I found the reference to utilize the
new source in 1.3.1, spoolDir, but when I try to pick up a compressed
file in the spool directory I'm getting an error:

13/02/22 19:32:41 ERROR source.SpoolDirectorySource: Uncaught exception in Runnable
org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: c1}
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:143)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 9 more
I have tried to increase the buffer size but it did not change the error. My current configuration file which
generated the error:

# Compressed Source
agent_compressed.sources = r1
agent_compressed.channels = c1
agent_compressed.channels.c1.type = memory
agent_compressed.sources.r1.type = spooldir
agent_compressed.sources.r1.bufferMaxLineLength = 50000
agent_compressed.sources.r1.spoolDir = /tmp/COMPRESS
agent_compressed.sources.r1.fileHeader = true
agent_compressed.sources.r1.channels = c1

# Sink for Avro
agent_compressed.sinks = avroSink-2
agent_compressed.sinks.avroSink-2.type = avro
agent_compressed.sinks.avroSink-2.channel = c1
agent_compressed.sinks.avroSink-2.hostname = xxx.xxx.xxx.xxx
agent_compressed.sinks.avroSink-2.port = xxxxx
Thoughts? Hints ?

Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
Brock Noland 2013-02-25, 15:12
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB