Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # dev - Duplicate on failover


Copy link to this message
-
Re: Duplicate on failover
Ralph Goers 2012-08-31, 16:38
Which file?  The files from the FileChannel, the source or …?   If you want the FileChannel stuff, unfortunately it only is failing on the machine where Gump runs and I don't have a clue how to get access to it or if it is even left around after the run.  As I said, I've never had this fail on the Mac(s), my Linux system or in Jenkins. I have no idea what is peculiar about that system but I do know the tests take about twice as long as they do on my Mac.

If you want to look at the actual unit test source, it is at https://svn.apache.org/repos/asf/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java

Ralph

On Aug 31, 2012, at 12:15 AM, Hari Shreedharan wrote:

> It looks like the channel has already got the events before the source stops. Can you send me a link to the actual file, so I can take a look?
>
>
> Hari
>
> --
> Hari Shreedharan
>
>
> On Thursday, August 30, 2012 at 9:44 AM, Ralph Goers wrote:
>
>> Thanks Hari,
>>
>> First, remember that the Flume agent is embedded in the Appender. So the Log4j EventSource is passing the event to the FileChannel. The Avro Sink then reads from the channel and sends it on. The unit test has two Avro Sources listening with each associated with its own MemoryChannel. The test logs 10 events then reads the 10 events from the primary MemoryChannel, each within its own transaction. The test then stops the primary source. Then it logs 10 more events and tries to read them from the alternate MemoryChannel.
>>
>>
>> The code to read from the channel looks like:
>>
>> for (int i = 0; i < 10; ++i) {
>> StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
>> EventLogger.logEvent(msg);
>> }
>> for (int i = 0; i < 10; ++i) {
>> Transaction transaction = primaryChannel.getTransaction();
>> transaction.begin();
>>
>> Event event = primaryChannel.take();
>> Assert.assertNotNull(event);
>> String body = getBody(event);
>> String expected = "Test Primary " + i;
>> Assert.assertTrue("Channel contained event, but not expected message. Received: " + body,
>> body.endsWith(expected));
>> transaction.commit();
>> transaction.close();
>> }
>>
>> primarySource.stop();
>>
>>
>> for (int i = 0; i < 10; ++i) {
>> StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
>> EventLogger.logEvent(msg);
>> }
>> for (int i = 0; i < 10; ++i) {
>> Transaction transaction = alternateChannel.getTransaction();
>> transaction.begin();
>>
>> Event event = alternateChannel.take();
>> Assert.assertNotNull(event);
>> String body = getBody(event);
>> String expected = "Test Alternate " + i;
>> /* When running in Gump Flume consistently returns the last event from the primary channel after
>> the failover, which fails this test */
>> Assert.assertTrue("Channel contained event, but not expected message. Expected: " + expected +
>> " Received: " + body, body.endsWith(expected));
>> transaction.commit();
>> transaction.close();
>> }
>> When I run this on my Mac it never fails. But Gump fails almost every time returning "Channel contained event, but not expected message. Expected: Test Alternate 0 Received: <128>1 2012-08-30T05:50:04.143Z vmgump MyApp - Test [Test@18060][mdc@18060] Test Primary 9"
>>
>> Do we have any tests that are similar to this? I didn't see anything that tests failover in this way but I might have missed it.
>>
>> Ralph
>>
>> On Aug 30, 2012, at 9:22 AM, Hari Shreedharan wrote:
>>
>>> Hi Ralph,
>>>
>>> Sorry missed this message earlier. How are you simulating failover in your test - I did not look at your code. If the message was written by the Avro Source on the client and the Avro Sink on the other side simply did not get a success would cause the failover sink processor to retry the same message since it would be rolled back by the sink, and hence the channel will end up making it available for another sink. Generally, if a message is not ack-ed as being successfully written to the channel by the Avro Source, the sink will rollback the transaction - and throw an EventDeliveryException - and in case of Failover SinkProcessor, it will cause the next sink to pick it up.