Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro >> mail # user >> Record extensions?

Copy link to this message
Re: Record extensions?
On Tue, Jun 12, 2012 at 6:09 PM, Christophe Taton <[EMAIL PROTECTED]> wrote:
> In practice, I have a bunch of independent records, each of them carrying at
> most one "extension field".
> I was especially hoping there would be a way to avoid serializing an
> "extension" record twice (once from the record object into a bytes field,
> and then a second time as a bytes field into the destination output
> stream). Ideally, such an extension field should not require its content to
> be bytes, but should accept any record object, so that it is encoded only
> once.
> As I understand it, Avro does not allow me to do this right now. Is this
> correct?

I think that can be done too if the schema for the extension field is
known when the client opens a connection.  This is a bit like
org.apache.avro.mapred.Pair<K,V>, where in different files K and V can
have different schemas.  You'd construct a GenericRequestor passing a
protocol that incorporates the particular extensions in use for that
session.  The server would then subclass GenericResponder overriding
getLocal() to return the value of getRemote(), so that the remote
protocol that contains the extensions is used to both read and write
data.  (You could also make this work with specific or reflect.)  This
way a different protocol would be used for each client session.  The
server's implementation of Responder#respond() would have to be
implemented to handle these variations.

The patch below would be required to make sure that Responder always
uses the value of getLocal() so that you can meaningfully override it.
 If this sounds useful we can file a Jira.


Index: lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
==================================================================--- lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (revision
+++ lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (working
@@ -65,14 +65,11 @@
     = new ConcurrentHashMap<MD5,Protocol>();

   private final Protocol local;
-  private final MD5 localHash;
   protected final List<RPCPlugin> rpcMetaPlugins;

   protected Responder(Protocol local) {
     this.local = local;
-    this.localHash = new MD5();
-    localHash.bytes(local.getMD5());
-    protocols.put(localHash, local);
+    protocols.put(new MD5(local.getMD5()), local);
     this.rpcMetaPlugins        new CopyOnWriteArrayList<RPCPlugin>();
@@ -211,6 +208,11 @@
       remote = Protocol.parse(request.clientProtocol.toString());
       protocols.put(request.clientHash, remote);
+    if (connection != null && response.match != HandshakeMatch.NONE)
+      connection.setRemote(remote);
+    MD5 localHash = new MD5(getLocal().getMD5());
     HandshakeResponse response = new HandshakeResponse();
     if (localHash.equals(request.serverHash)) {
       response.match @@ -220,7 +222,7 @@
         remote == null ? HandshakeMatch.NONE : HandshakeMatch.CLIENT;
     if (response.match != HandshakeMatch.BOTH) {
-      response.serverProtocol = local.toString();
+      response.serverProtocol = getLocal().toString();
       response.serverHash = localHash;

@@ -232,9 +234,6 @@
     handshakeWriter.write(response, out);

-    if (connection != null && response.match != HandshakeMatch.NONE)
-      connection.setRemote(remote);
     return remote;