Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> HDFS sink - Property: hdfs.callTimeout


Copy link to this message
-
HDFS sink - Property: hdfs.callTimeout
Hi

What is the implication of this property "hdfs.callTimeout". What adverse
effect it may have if I change it ?

I am getting timeout exception as:
Noted checkpoint for file: /home/hadoop/flume_channel/dataDir15/log-21,
id: 21, checkpoint position: 1576210481
12/10/03 23:19:45 INFO file.LogFile: Closing
/home/hadoop/flume_channel/dataDir15/log-21
12/10/03 23:19:55 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: Callable timed out
         at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:343)
         at
org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:714)
         at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
         at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
         at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
         at java.lang.Thread.run(Thread.java:736)
Caused by: java.util.concurrent.TimeoutException
         at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
         at java.util.concurrent.FutureTask.get(FutureTask.java:91)
         at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:336)
         ... 5 more
My configuration is:

Agent A: Source
=========
adServerAgent.sources = execSource
adServerAgent.channels = fileChannel
adServerAgent.sinks = avro-forward-sink1
#adServerAgent.sinkgroups = failover_group

# For each one of the sources, the type is defined
adServerAgent.sources.execSource.type = exec
adServerAgent.sources.execSource.command = /usr/bin/perl
/home/http/flume/scripts/logtailDir_trial.pl 2>/tmp/logtail_failure.log
adServerAgent.sources.execSource.restart=false
adServerAgent.sources.execSource.batchSize = 1000

# The channel can be defined as follows.
adServerAgent.sources.execSource.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.avro-forward-sink1.type = avro
adServerAgent.sinks.avro-forward-sink1.hostname=10.0.17.3
adServerAgent.sinks.avro-forward-sink1.port=10012
adServerAgent.sinks.avro-forward-sink1.connect-timeout = 300000

#Specify the channel the sink should use
adServerAgent.sinks.avro-forward-sink1.channel = fileChannel
adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/home/http/flume/channel/dataDir_trial
adServerAgent.channels.fileChannel.checkpointDir=/home/http/flume/channel/checkpointDir_trial
adServerAgent.channels.fileChannel.write-timeout=30

where the script in the exec source just cats the files in the given
directory:
It is :
Exec script is
=======my $DIR = "/TRACKING_FILES/backuped";
#my $MOVED_DIR = "";
my $OFFSET_DIR = "$ENV{'HOME'}/flume/offset_dir_trial";
my $SLEEP_TIME = 145;
my $LOGATAIL_CMD = "$ENV{'HOME'}/flume/logtail_install/usr/sbin/logtail2";
################

while(1)
{
         opendir(DIR,$DIR) or die "Couldn't open dir $DIR. $!";
#       chomp(my @files = `ls $DIR`);
#       foreach $file (@files)
         while(my $file = readdir(DIR))
         {
                 #print $file."\n";
                 #if($file =~ m/\d+impressionthread\d+\.tsv/)
                 #{
                         if(-f "$DIR/$file")
                         {
                         #       print "logtail2 -f $DIR/$file -o
$OFFSET_DIR/$file.offset";
                                 print `$LOGATAIL_CMD -f $DIR/$file -o
$OFFSET_DIR/$file.offset`;
                         }
                 #}
         }
         closedir(DIR);
         sleep($SLEEP_TIME);
#       print "\n @files :".@files;
}
Agent B is: (Destination)
=============
adServerAgent.sources = avro-collection-source
adServerAgent.channels = fileChannel
adServerAgent.sinks = hdfsSink fileSink

# For each one of the sources, the type is defined
adServerAgent.sources.avro-collection-source.type=avro
adServerAgent.sources.avro-collection-source.bind=10.0.17.3
adServerAgent.sources.avro-collection-source.port=10012
adServerAgent.sources.avro-collection-source.interceptors = ts
adServerAgent.sources.avro-collection-source.interceptors.ts.type =
timestamp
# The channel can be defined as follows.
adServerAgent.sources.avro-collection-source.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.hdfsSink.type = hdfs
adServerAgent.sinks.hdfsSink.hdfs.path=
hdfs://mltest2001.pubmatic.com/flume/experiments_1_machine
#adServerAgent.sinks.hdfsSink.hdfs.path=hdfs://mltest2001.pubmatic.com/flume/trackers

#adServerAgent.sinks.hdfsSink.hdfs.fileType =DataStream
adServerAgent.sinks.hdfsSink.hdfs.fileType =CompressedStream
adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_%Y%m%d_%H%M%S_
#adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_
adServerAgent.sinks.hdfsSink.hdfs.rollSize=100000000
adServerAgent.sinks.hdfsSink.hdfs.codeC=bzip2
adServerAgent.sinks.hdfsSink.hdfs.rollCount=20000
adServerAgent.sinks.hdfsSink.hdfs.batchSize=1
#adServerAgent.sinks.hdfsSink.hdfs.writeFormat=Text
adServerAgent.sinks.hdfsSink.hdfs.rollInterval=600
adServerAgent.sinks.hdfsSink.hdfs.txnEventMax=1
#adServerAgent.sinks.hdfsSink.hdfs.maxOpenFiles=20000
#Define file sink
adServerAgent.sinks.fileSink.type = file_roll
adServerAgent.sinks.fileSink.sink.directory = /home/hadoop/flume_sink
adServerAgent.sinks.hdfsSink.channel= fileChannel
#adServerAgent.sinks.fileSink.channel = fileChannel

# Each channel's type is defined.
adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/home/hadoop/flume_channel/dataDir15
adServerAgent.channels.fileChannel.checkpointDir=/home/hadoop/flume_channel/checkpointDir15
adServerAgent.channels.fileChannel.write-timeout=30
Regards,
Jagadish
+
Kathleen Ting 2012-10-04, 07:07
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB