Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro, mail # user - Async Callbacks using Netty


Copy link to this message
-
Re: Async Callbacks using Netty
James Baldassari 2012-02-01, 06:24
Hi William,

Great test.  I ran your code, and it worked as expected for me, but I made
some slight changes to the client side to demonstrate what's happening:

    // Test sync call:
    System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
    CharSequence syncResult = client.hello(); // This should block for 5
seconds
    System.out.println("2. " new Date() + ": Chat.hello() returned \"" +
syncResult + "\"");

    // Test async call:
    final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
    System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
    client.hello(future1); // This should not block.
    System.out.println("4. " + new Date() + ":
Chat.hello(Callback<CharSequence>) returned");
    CharSequence asyncResult = future1.get(); // This should block for 5
seconds
    System.out.println("5. " + new Date() + ": Callback<CharSequence>.get()
returned \"" + asyncResult + "\"");

When I ran that I got the following output:

    1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
    2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"

    3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
    4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>)
returned
    5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned
"Hello"

As you can see, the synchronous call (lines 1-2) blocked for about 5
seconds as expected.  When the asynchronous call was invoked it returned
immediately (note timestamps on lines 3-4).  The part that blocked was the
CallFuture.get() on line 5 of the output.  The result of the callback can't
be obtained until the server returns it (after waiting 5 seconds).

I think I may know why this behavior seems confusing.  In practice I don't
think many people will use CallFuture.  It's basically an adapter to make
an asynchronous call synchronous by blocking until the result returns.
This is useful in unit tests and in situations where the client can't
proceed until the result is available.  However, to really take advantage
of the asynchronous API you never want to wait for the result of an RPC.
The client should just invoke async RPCs with some Callback instance and
then move onto other things, such as invoking more RPCs!

Here's an example.  Let's say we have an e-mail server with an Avro
protocol that allows us to access the users' mailboxes.  We might have a
method to allow us to search for all messages with a subject line that
matches some regular expression.  In IDL it might look something like this:

    protocol Mail {
      record Message {
        string from;
        array<string> to;
        union { string, null } subject;
        union { string, null } body;
      }
      array<Message> findBySubject(string regexp);
    }

It doesn't really matter what the implementation of this protocol looks
like on the server side.  Searching through all messages is likely to take
some time, so what we would want to do is to fire off an async RPC as soon
as the user clicks the search button, then return control to the UI
immediately so that the user can continue doing other things while the
search is running.  Whenever the results come back we would then notify the
user or populate the search results in the UI, e.g. via ajax/comet if it's
a web app.  So we would have a Callback implementation that would look
something like this:

  public class FindBySubjectCallback implements Callback<List<Message>> {
    private final RequestContext context;  // RequestContext is some class
that allows us to send events back to the user
    public FindBySubjectCallback(RequestContext context) {
      this.context = context;
    }
    @Override
    public void handleResult(List<Message> result) {
      // Notify user with results:
      requestContext.fireSearchResultReadyEvent(result);
    }
    @Override
    public void handleError(Throwable error) {
      // Notify user that an error occurred:
      requestContext.fireErrorEvent(error);
    }
  }

The client, which might be running in a servlet container, would then just
invoke the RPC like this:

    private Mail.Callback mailClient; // Client is initialized/injected
somewhere
    ...
    public void findBySubject(String regexp, RequestContext context) {
      mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
      // return immediately without waiting for the search to complete!
    }
    ...

Anyway, hope that makes some sense.  Let me know if you have any questions.

-James
On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[EMAIL PROTECTED]> wrote: