Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # dev - Duplicate on failover


Copy link to this message
-
Re: Duplicate on failover
Hari Shreedharan 2012-09-08, 17:36
Hi Ralph,  

The code looks good. I think this might be due to some timing issues, though I am not sure what. I'd suggest you add a Thead.sleep(2000) before this line: primarySource.stop();, so once your whole transaction is done, wait for the Avro sink/Rpc Client to get the Success message, so it will not send duplicates.  

Please let me know if that causes the failures to disappear.
Thanks
Hari

--  
Hari Shreedharan
On Saturday, September 8, 2012 at 10:01 AM, Ralph Goers wrote:

> Did you ever get a chance to look at this? I am still getting these failures almost every time it runs.
>  
> Ralph
>  
> On Aug 31, 2012, at 10:35 AM, Hari Shreedharan wrote:
>  
> > Thanks Ralph. Let me take a look at the code.  
> >  
> > --  
> > Hari Shreedharan
> >  
> >  
> > On Friday, August 31, 2012 at 9:38 AM, Ralph Goers wrote:
> >  
> > > Which file? The files from the FileChannel, the source or …? If you want the FileChannel stuff, unfortunately it only is failing on the machine where Gump runs and I don't have a clue how to get access to it or if it is even left around after the run. As I said, I've never had this fail on the Mac(s), my Linux system or in Jenkins. I have no idea what is peculiar about that system but I do know the tests take about twice as long as they do on my Mac.
> > >  
> > > If you want to look at the actual unit test source, it is at https://svn.apache.org/repos/asf/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
> > >  
> > > Ralph
> > >  
> > >  
> > >  
> > > On Aug 31, 2012, at 12:15 AM, Hari Shreedharan wrote:
> > >  
> > > > It looks like the channel has already got the events before the source stops. Can you send me a link to the actual file, so I can take a look?  
> > > >  
> > > >  
> > > > Hari  
> > > >  
> > > > --  
> > > > Hari Shreedharan
> > > >  
> > > >  
> > > > On Thursday, August 30, 2012 at 9:44 AM, Ralph Goers wrote:
> > > >  
> > > > > Thanks Hari,
> > > > >  
> > > > > First, remember that the Flume agent is embedded in the Appender. So the Log4j EventSource is passing the event to the FileChannel. The Avro Sink then reads from the channel and sends it on. The unit test has two Avro Sources listening with each associated with its own MemoryChannel. The test logs 10 events then reads the 10 events from the primary MemoryChannel, each within its own transaction. The test then stops the primary source. Then it logs 10 more events and tries to read them from the alternate MemoryChannel.  
> > > > >  
> > > > >  
> > > > > The code to read from the channel looks like:
> > > > >  
> > > > > for (int i = 0; i < 10; ++i) {
> > > > > StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
> > > > > EventLogger.logEvent(msg);
> > > > > }  
> > > > > for (int i = 0; i < 10; ++i) {
> > > > > Transaction transaction = primaryChannel.getTransaction();
> > > > > transaction.begin();
> > > > >  
> > > > > Event event = primaryChannel.take();
> > > > > Assert.assertNotNull(event);
> > > > > String body = getBody(event);
> > > > > String expected = "Test Primary " + i;
> > > > > Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
> > > > > body.endsWith(expected));
> > > > > transaction.commit();
> > > > > transaction.close();
> > > > > }
> > > > >  
> > > > > primarySource.stop();
> > > > >  
> > > > >  
> > > > > for (int i = 0; i < 10; ++i) {
> > > > > StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
> > > > > EventLogger.logEvent(msg);
> > > > > }
> > > > > for (int i = 0; i < 10; ++i) {
> > > > > Transaction transaction = alternateChannel.getTransaction();
> > > > > transaction.begin();
> > > > >  
> > > > > Event event = alternateChannel.take();
> > > > > Assert.assertNotNull(event);
> > > > > String body = getBody(event);
> > > > > String expected = "Test Alternate " + i;
> > > > > /* When running in Gump Flume consistently returns the last event from the primary channel after