|
|
-
compression over-the-wire with 1.3.1 ?Langston, Jim 2013-02-23, 15:17
Hi all,
A question on sending compressed files from a remote source to HDFS. I have been working with .94 of flume and gzip a file before sending it from a remote location to a HDFS cluster. Works great. Now, I'm looking to move to 1.2 or 1.3.1 (CDH4 installs 1.2 by default through the management tool), but I don't see the equivalent in 1.2 or 1.3.1. I found the reference to utilize the new source in 1.3.1, spoolDir, but when I try to pick up a compressed file in the spool directory I'm getting an error: 13/02/22 19:32:41 ERROR source.SpoolDirectorySource: Uncaught exception in Runnable org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: c1} at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:143) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192) ... 9 more I have tried to increase the buffer size but it did not change the error. My current configuration file which generated the error: # Compressed Source agent_compressed.sources = r1 agent_compressed.channels = c1 agent_compressed.channels.c1.type = memory agent_compressed.sources.r1.type = spooldir agent_compressed.sources.r1.bufferMaxLineLength = 50000 agent_compressed.sources.r1.spoolDir = /tmp/COMPRESS agent_compressed.sources.r1.fileHeader = true agent_compressed.sources.r1.channels = c1 # Sink for Avro agent_compressed.sinks = avroSink-2 agent_compressed.sinks.avroSink-2.type = avro agent_compressed.sinks.avroSink-2.channel = c1 agent_compressed.sinks.avroSink-2.hostname = xxx.xxx.xxx.xxx agent_compressed.sinks.avroSink-2.port = xxxxx Thoughts? Hints ? Thanks, Jim |