Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Re: Flume-NG : Spooling dir source : java.io.IOException: Stream closed


Copy link to this message
-
Re: Flume-NG : Spooling dir source : java.io.IOException: Stream closed
Mike Percy 2013-01-28, 05:06
bcc: [EMAIL PROTECTED]

No version of CDH currently ships with Flume 1.3.1, so redirecting this
question to the [EMAIL PROTECTED] user list.

Regards,
Mike

On Sun, Jan 27, 2013 at 8:56 PM, NGuyen thi Kim Tuyen <[EMAIL PROTECTED]
> wrote:

> I'm using Flume-Ng 1.3.1 .
>
> Vào 11:33:49 UTC+7 Thứ hai, ngày 28 tháng một năm 2013, NGuyen thi Kim
> Tuyen đã viết:
>
>> Hi ,
>>
>> Please help me .
>>
>> I want to use Flume in the following case :
>> Spooling directory source --> FileChannel --> HBase sink . But I have
>> some problems with Spooling directory source :
>>
>> Here is my test flume.conf :
>> t-game-db194.sources = test-hbase
>>
>> t-game-db194.sinks = sink-hbase
>>
>> t-game-db194.channels = hbase-channel
>>
>> #source spoolDir
>> t-game-db194.sources.test-**hbase.type = spooldir
>>
>> t-game-db194.sources.test-**hbase.spoolDir =/var/log/testhbase
>>
>> t-game-db194.sources.test-**hbase.fileHeader = true
>>
>> t-game-db194.sources.test-**hbase.channels = hbase-channel
>>
>> #file Channel
>> t-game-db194.channels.hbase-**channel.type = file
>>
>> t-game-db194.channels.hbase-**channel.checkpointDir >> /var/log/flume-ng/checkpoint
>>
>> t-game-db194.channels.hbase-**channel.dataDir >> /var/log/flume-ng/filedata
>>
>>
>> #sink
>> t-game-db194.sinks.sink-hbase.**type = logger
>>
>> t-game-db194.sinks.sink-hbase.**channel = hbase-channel
>>
>> And I tested : echo "tuyen ssssssssss " >> "/var/log/testhbase/hbase_1.**log"
>> . The first event is OK , but the next events are not work . Here is
>> flume.log
>>
>> 28 Jan 2013 13:16:47,424 INFO  [lifecycleSupervisor-1-0]
>> (org.apache.flume.source.**SpoolDirectorySource.start:64)  -
>> SpoolDirectorySource source starting with directory:/var/log/testhbase
>> 28 Jan 2013 13:16:47,732 INFO  [pool-7-thread-1]
>> (org.apache.flume.client.avro.**SpoolingFileLineReader.**retireCurrentFile:229)
>>  - Preparing to move file /var/log/testhbase/hbase_1.log to
>> /var/log/testhbase/hbase_1.**log.COMPLETED
>> 28 Jan 2013 13:16:48,436 INFO  [SinkRunner-PollingRunner-**DefaultSinkProcessor]
>> (org.apache.flume.sink.**LoggerSink.process:70)  - Event: {
>> headers:{file=/var/log/**testhbase/hbase_1.log} body: 74 75 79 65 6E 20
>> 73 73 73 73 73 73 73 73 73 73 tuyen ssssssssss }
>>
>> 28 Jan 2013 13:17:08,836 INFO  [pool-7-thread-1]
>> (org.apache.flume.client.avro.**SpoolingFileLineReader.**retireCurrentFile:229)
>>  - Preparing to move file /var/log/testhbase/hbase_1.log to
>> /var/log/testhbase/hbase_1.**log.COMPLETED
>> 28 Jan 2013 13:17:08,837 ERROR [pool-7-thread-1] (org.apache.flume.source.
>> **SpoolDirectorySource$**SpoolDirectoryRunnable.run:**148)  - Uncaught
>> exception in Runnable
>> java.lang.**IllegalStateException: File name has been re-used with
>> different files. Spooling assumption violated for
>> /var/log/testhbase/hbase_1.**log.COMPLETED
>>  at org.apache.flume.client.avro.**SpoolingFileLineReader.**
>> retireCurrentFile(**SpoolingFileLineReader.java:**272)
>> at org.apache.flume.client.avro.**SpoolingFileLineReader.**readLines(**
>> SpoolingFileLineReader.java:**185)
>>  at org.apache.flume.source.**SpoolDirectorySource$**
>> SpoolDirectoryRunnable.run(**SpoolDirectorySource.java:135)
>> at java.util.concurrent.**Executors$RunnableAdapter.**
>> call(Executors.java:441)
>>  at java.util.concurrent.**FutureTask$Sync.**innerRunAndReset(FutureTask.
>> **java:317)
>> at java.util.concurrent.**FutureTask.runAndReset(**FutureTask.java:150)
>>  at java.util.concurrent.**ScheduledThreadPoolExecutor$**
>> ScheduledFutureTask.access$**101(**ScheduledThreadPoolExecutor.**java:98)
>> at java.util.concurrent.**ScheduledThreadPoolExecutor$**
>> ScheduledFutureTask.**runPeriodic(**ScheduledThreadPoolExecutor.**
>> java:180)
>>  at java.util.concurrent.**ScheduledThreadPoolExecutor$**
>> ScheduledFutureTask.run(**ScheduledThreadPoolExecutor.**java:204)
>> at java.util.concurrent.**ThreadPoolExecutor$Worker.**
>> runTask(ThreadPoolExecutor.**java:886)
>>  at java.util.concurrent.**ThreadPoolExecutor$Worker.run(**