Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> At flume shutdown only one HDFSSink is closed properly


+
Bhaskar V. Karambelkar 2012-08-23, 22:24
+
Mike Percy 2012-08-24, 01:35
+
Bhaskar V. Karambelkar 2012-08-24, 19:13
Copy link to this message
-
Re: At flume shutdown only one HDFSSink is closed properly
Thanks for the additional info Bhaskar. So is this a known issue in vanilla
Hadoop 1.0.3? If so do you have a JIRA number?

Regards,
Mike

On Fri, Aug 24, 2012 at 12:13 PM, Bhaskar V. Karambelkar <
[EMAIL PROTECTED]> wrote:

> oops, this is just the same Hadoop's FileSystem.close() shutdown hook
> issue. I was getting the exception no matter whether I had 1 HDFS sink or
> more.
> I was using hadoop vanilla 1.0.3, and looks like that one doesn't respect
> the fs.automatic.close option.
> Switched to CDH3u5, and no more problems, all the HDFS sinks correctly
> rename the file on shutdown.
>
> In conclusion, the vanilla hadoop 1.x series is not an option for flume.
>
> Go with Hadoop 2.x or CDH3u5, CDH4
>
> thanks
> Bhaskar
>
>
> On Thu, Aug 23, 2012 at 9:35 PM, Mike Percy <[EMAIL PROTECTED]> wrote:
>
>> Hmm... this likely happens because Hadoop statically caches the
>> FileSystem object, so as it turns out, the multiple Sinks are sharing the
>> same FileSystem objects.
>>
>> I think the only reason we need to explicitly close the FileSystem
>> objects is to support the deleteOnExit feature. We are explicitly closing
>> them because we removed the automatic shutdown hook typically installed by
>> Hadoop to invoke FileSystem.close(), since it was interfering with the .tmp
>> rolling. I wonder if we can get away with never closing them in our
>> case... I'm not sure if we need the deleteOnExit() functionality implicitly
>> for any reason, or if there are other more important reasons behind why the
>> FileSystem objects should be closed.
>>
>> Regards,
>> Mike
>>
>>
>> On Thu, Aug 23, 2012 at 3:24 PM, Bhaskar V. Karambelkar <
>> [EMAIL PROTECTED]> wrote:
>>
>>> I have 3 HDFS sinks all writing to the same namenode, but different
>>> paths.
>>>
>>> e.g. sink1 = hdfs://namenode/path1
>>> sink2 = hdfs://namenode/path2
>>> etc.
>>>
>>> When flume is shutdown (kill <flume-pid>), the file for the first sink
>>> is closed correctly and renamed to remove the .tmp extension
>>> but the second file's closing throws the following exception and the
>>> file's .tmp extension is also not removed.
>>> I see this happening very consistently, for 1+ HDFS sinks, only the
>>> first one is closed properly and renamed, the rest all throw exception
>>> when being closed, and are not renamed to remove the .tmp extension.
>>>
>>> 2012-08-23 19:51:39,837 WARN hdfs.BucketWriter: failed to close()
>>> HDFSWriter for file
>>> (hdfs://hadoop-namnode:9000/flume/avro/2012/08/23/l/avro_.1345751470750.tmp).
>>> Exception follows.
>>> java.io.IOException: Filesystem closed
>>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>     at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74)
>>>     at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3667)
>>>     at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:250)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
+
Bhaskar V. Karambelkar 2012-08-27, 14:53
+
Mike Percy 2012-08-29, 08:30