Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume, mail # user - HDFS sink - Property: hdfs.callTimeout


Copy link to this message
-
HDFS sink - Property: hdfs.callTimeout
Jagadish Bihani 2012-10-04, 06:39
Hi

What is the implication of this property "hdfs.callTimeout". What adverse
effect it may have if I change it ?

I am getting timeout exception as:
Noted checkpoint for file: /home/hadoop/flume_channel/dataDir15/log-21,
id: 21, checkpoint position: 1576210481
12/10/03 23:19:45 INFO file.LogFile: Closing
/home/hadoop/flume_channel/dataDir15/log-21
12/10/03 23:19:55 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: Callable timed out
         at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:343)
         at
org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:714)
         at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
         at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
         at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
         at java.lang.Thread.run(Thread.java:736)
Caused by: java.util.concurrent.TimeoutException
         at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
         at java.util.concurrent.FutureTask.get(FutureTask.java:91)
         at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:336)
         ... 5 more
My configuration is:

Agent A: Source
=========
adServerAgent.sources = execSource
adServerAgent.channels = fileChannel
adServerAgent.sinks = avro-forward-sink1
#adServerAgent.sinkgroups = failover_group

# For each one of the sources, the type is defined
adServerAgent.sources.execSource.type = exec
adServerAgent.sources.execSource.command = /usr/bin/perl
/home/http/flume/scripts/logtailDir_trial.pl 2>/tmp/logtail_failure.log
adServerAgent.sources.execSource.restart=false
adServerAgent.sources.execSource.batchSize = 1000

# The channel can be defined as follows.
adServerAgent.sources.execSource.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.avro-forward-sink1.type = avro
adServerAgent.sinks.avro-forward-sink1.hostname=10.0.17.3
adServerAgent.sinks.avro-forward-sink1.port=10012
adServerAgent.sinks.avro-forward-sink1.connect-timeout = 300000

#Specify the channel the sink should use
adServerAgent.sinks.avro-forward-sink1.channel = fileChannel
adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/home/http/flume/channel/dataDir_trial
adServerAgent.channels.fileChannel.checkpointDir=/home/http/flume/channel/checkpointDir_trial
adServerAgent.channels.fileChannel.write-timeout=30

where the script in the exec source just cats the files in the given
directory:
It is :
Exec script is
=======my $DIR = "/TRACKING_FILES/backuped";
#my $MOVED_DIR = "";
my $OFFSET_DIR = "$ENV{'HOME'}/flume/offset_dir_trial";
my $SLEEP_TIME = 145;
my $LOGATAIL_CMD = "$ENV{'HOME'}/flume/logtail_install/usr/sbin/logtail2";
################

while(1)
{
         opendir(DIR,$DIR) or die "Couldn't open dir $DIR. $!";
#       chomp(my @files = `ls $DIR`);
#       foreach $file (@files)
         while(my $file = readdir(DIR))
         {
                 #print $file."\n";
                 #if($file =~ m/\d+impressionthread\d+\.tsv/)
                 #{
                         if(-f "$DIR/$file")
                         {
                         #       print "logtail2 -f $DIR/$file -o
$OFFSET_DIR/$file.offset";
                                 print `$LOGATAIL_CMD -f $DIR/$file -o
$OFFSET_DIR/$file.offset`;
                         }
                 #}
         }
         closedir(DIR);
         sleep($SLEEP_TIME);
#       print "\n @files :".@files;
}
Agent B is: (Destination)
=============
adServerAgent.sources = avro-collection-source
adServerAgent.channels = fileChannel
adServerAgent.sinks = hdfsSink fileSink

# For each one of the sources, the type is defined
adServerAgent.sources.avro-collection-source.type=avro
adServerAgent.sources.avro-collection-source.bind=10.0.17.3
adServerAgent.sources.avro-collection-source.port=10012
adServerAgent.sources.avro-collection-source.interceptors = ts
adServerAgent.sources.avro-collection-source.interceptors.ts.type =
timestamp
# The channel can be defined as follows.
adServerAgent.sources.avro-collection-source.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.hdfsSink.type = hdfs
adServerAgent.sinks.hdfsSink.hdfs.path=
hdfs://mltest2001.pubmatic.com/flume/experiments_1_machine
#adServerAgent.sinks.hdfsSink.hdfs.path=hdfs://mltest2001.pubmatic.com/flume/trackers

#adServerAgent.sinks.hdfsSink.hdfs.fileType =DataStream
adServerAgent.sinks.hdfsSink.hdfs.fileType =CompressedStream
adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_%Y%m%d_%H%M%S_
#adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_
adServerAgent.sinks.hdfsSink.hdfs.rollSize=100000000
adServerAgent.sinks.hdfsSink.hdfs.codeC=bzip2
adServerAgent.sinks.hdfsSink.hdfs.rollCount=20000
adServerAgent.sinks.hdfsSink.hdfs.batchSize=1
#adServerAgent.sinks.hdfsSink.hdfs.writeFormat=Text
adServerAgent.sinks.hdfsSink.hdfs.rollInterval=600
adServerAgent.sinks.hdfsSink.hdfs.txnEventMax=1
#adServerAgent.sinks.hdfsSink.hdfs.maxOpenFiles=20000
#Define file sink
adServerAgent.sinks.fileSink.type = file_roll
adServerAgent.sinks.fileSink.sink.directory = /home/hadoop/flume_sink
adServerAgent.sinks.hdfsSink.channel= fileChannel
#adServerAgent.sinks.fileSink.channel = fileChannel

# Each channel's type is defined.
adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/home/hadoop/flume_channel/dataDir15
adServerAgent.channels.fileChannel.checkpointDir=/home/hadoop/flume_channel/checkpointDir15
adServerAgent.channels.fileChannel.write-timeout=30
Regards,
Jagadish
+
Kathleen Ting 2012-10-04, 07:07