Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> Exec source doesn't flush the last data


Copy link to this message
-
Exec source doesn't flush the last data
Hi,
    I did a simple test about exec source, and found it didn't flush the
last data. Here's the steps:
*a. create the source file 1.test, which has sequence number from 1 to
15, like this:*
     ----------
           1
           2
          ...
           15
    ----------
*b. create the configure file flume_simple.conf like this:*
-------------------------
        a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -n +0 -F
/opt/scripts/tvhadoop/flume/flume-1.3.0/source/1.test
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 10

a1.channels.c1.type = memory

a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /opt/scripts/tvhadoop/flume/flume-1.3.0/sink
---------------------
*c. run flume with command: *
              bin/flume-ng agent --conf conf -f conf/flume_simple.conf
-Dflume.root.logger=DEBUG,console -n a1

      After more than 1 minute(file roll interval), I check the output
directory, there are 2 files, one has number from 1 to 10, and the other
has nothing.
*I think this is because the batchSize was set to 10, the the last 5
numbers didn't get flushed and lost.* Even I apply the patch in
'https://issues.apache.org/jira/browse/FLUME-1819', nothing changed. If
I debug into the code, *I found the red codes outside while clause never
get executed*.
        ----------------
           while ((line = reader.readLine()) != null) {
             counterGroup.incrementAndGet("exec.lines.read");
             eventList.add(EventBuilder.withBody(line.getBytes()));
             if(eventList.size() >= bufferCount) {
               channelProcessor.processEventBatch(eventList);
               eventList.clear();
             }
           }
if(!eventList.isEmpty()) {
             channelProcessor.processEventBatch(eventList);
           }
        --------------
     In my scenario, the source log files are divided by hour, so I need
to change the file name in flume configure file. Because of the above
bug, I can only set the batchsize of execSource to 1, which
significantly slowdown the through pass.   I wonder how to solve this
problem. Any suggestions are most welcomed.
Best Regards,
larry

+
larryzhang 2013-03-13, 03:34
+
Hari Shreedharan 2013-03-13, 05:02