Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # dev - Review Request: Ability to specify the capacity of MemoryChannel in bytes


Copy link to this message
-
Re: Review Request: Ability to specify the capacity of MemoryChannel in bytes
Ted Malaska 2012-10-03, 10:37


> On Oct. 2, 2012, 12:48 a.m., Hari Shreedharan wrote:
> > flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java, lines 60-64
> > <https://reviews.apache.org/r/6982/diff/3/?file=155699#file155699line60>
> >
> >     Semaphores and synchronized/monitors are essentially different ways of achieving the same goal.
> >    
> >     The only difference is that when you have permits > 1, semaphores are much better since each semaphore operation is atomic and you don't need to worry about handling thread context switches happening in between.
> >    
> >     Also in Java, traditionally java.util.concurrent.* classes have been implemented more efficiently than the language constructs themselves - though this seems to be fading over time, as synchornized/wait/notify have been getting better.
>
> Hari Shreedharan wrote:
>     I intended to post this as reply to your query in the previous review. Accidentally ended up posting it as a new review. Sorry about that.
>
> Ted Malaska wrote:
>     So all you want is a semaphore permit = 1 blocking the adding of new events.  That's easy to do, I just remember a review saying they didn't like the sync because the blocking would be too slow.  
>    
>     So as I see it I can use the semaphores and have blocking or use the AtomicLong and error on the safe side of not blowing up.
>    
>     Or in other words: Speed verse the rare chance of not getting the memory channel to it highest level of fullness.
>
> Hari Shreedharan wrote:
>     Sempahore.tryAcquire does not block or can be configured to block only for a fixed time. By using the Atomic Long you need additional thread safety mechanics to prevent threads accessing the same code in the way I mentioned above - and still you are simply re-implementing a semaphore.
>
> Hari Shreedharan wrote:
>     To be clear, I am not saying we should be replacing synchronized with semaphore.acquire() and release(). I am saying we should replace the atomic long operations :
>      if ( currentByteUsage.addAndGet(eventByteSize) > bufferedByteCapacity) {
>             currentByteUsage.addAndGet(-eventByteSize);
>    
>     with something like:
>     semaphore.tryAcquire(celiing(eventByteSize/slotSize)) - this essentially makes sure we have enough space before we put the event. OTOH - you could do the same thing which you are doing, just that all of the atomic long operations should acquire a channel-wide reentrant lock. So your current code becomes:
>     Lock lock = new ReentrantLock();
>    
>     lock.lock();
>     try{
>     if ( currentByteUsage.addAndGet(eventByteSize) > bufferedByteCapacity) {
>             currentByteUsage.addAndGet(-eventByteSize);
>             throw new ChannelException("Put queue for MemoryTransaction of byteCapacity ..";
>     }
>     } finally {
>     lock.unlock();
>     }
>    
>    
>     This ensures that multiple transactions don't end up adjusting the byte value at the same time.
>    
>
>
> Ted Malaska wrote:
>     ohhhhhh I get it.  Sorry I'm slow.  
>    
>     I'll see what I can do tonight.

OK the code is done with Semaphores.  Once I update the unit test I will submit the patch.

Thanks again for all the help.
- Ted
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6982/#review12089
-----------------------------------------------------------
On Sept. 16, 2012, 3:54 a.m., Ted Malaska wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6982/
> -----------------------------------------------------------
>
> (Updated Sept. 16, 2012, 3:54 a.m.)
>
>
> Review request for Flume.
>
>
> Description
> -------
>
> 1. The user will be able to define a byteCapacity and a byteCapacityBufferPercentage.
> 2. Events byte size will be estimated from there body contents
> 3. On put bytes are added to current total