Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> Exec source doesn't flush the last data


Copy link to this message
-
Exec source doesn't flush the last data
Hi,
    I did a simple test about exec source, and found it didn't flush the
last data. Here's the steps:
*a. create the source file 1.test, which has sequence number from 1 to
15, like this:*
     ----------
           1
           2
          ...
           15
    ----------
*b. create the configure file flume_simple.conf like this:*
-------------------------
        a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -n +0 -F
/opt/scripts/tvhadoop/flume/flume-1.3.0/source/1.test
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 10

a1.channels.c1.type = memory

a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /opt/scripts/tvhadoop/flume/flume-1.3.0/sink
---------------------
*c. run flume with command: *
              bin/flume-ng agent --conf conf -f conf/flume_simple.conf
-Dflume.root.logger=DEBUG,console -n a1

      After more than 1 minute(file roll interval), I check the output
directory, there are 2 files, one has number from 1 to 10, and the other
has nothing.
*I think this is because the batchSize was set to 10, the the last 5
numbers didn't get flushed and lost.* Even I apply the patch in
'https://issues.apache.org/jira/browse/FLUME-1819', nothing changed. If
I debug into the code, *I found the red codes outside while clause never
get executed*.
        ----------------
           while ((line = reader.readLine()) != null) {
             counterGroup.incrementAndGet("exec.lines.read");
             eventList.add(EventBuilder.withBody(line.getBytes()));
             if(eventList.size() >= bufferCount) {
               channelProcessor.processEventBatch(eventList);
               eventList.clear();
             }
           }
if(!eventList.isEmpty()) {
             channelProcessor.processEventBatch(eventList);
           }
        --------------
     In my scenario, the source log files are divided by hour, so I need
to change the file name in flume configure file. Because of the above
bug, I can only set the batchsize of execSource to 1, which
significantly slowdown the through pass.   I wonder how to solve this
problem. Any suggestions are most welcomed.
Best Regards,
larry

+
larryzhang 2013-03-13, 03:34
+
Hari Shreedharan 2013-03-13, 05:02
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB