Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Accumulo, mail # user - Using iterators to add columns to a row


Copy link to this message
-
Using iterators to add columns to a row
Peter Tillotson 2013-04-18, 16:28


Apologies in advanced - this is some of my first Accumulo code, but I suspect there is a much better way to do this. 

Basically I'm trying to add an edge count column to each row of my table, so I get rows along the following line
 - node1 {  to:count:3, to:node2:, to:node3:, to:node3:  }

But on the client side I only need write 
 - node1 {  to:node2:, to:node3:, to:node3:  }

I'd like to use the same approach to add indexes to separate column families, and combiners to aggregate.  

Aside from the inefficiency of a BatchWriter for each mutation 
 - is this the correct approach? or
 - is there a simpler way to achieve this?

Many thanks in advance

Peter T
 
--- code compiles but not tested ---

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.hadoop.io.Text;

public class EdgeCountIterator extends SortedKeyIterator
{
    private boolean isDisabled = false;
    private Connector connector;
    private Key currentRowStart = null;
    private String tableId;
    private int count = 0;
    
    @Override
    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException
    {
        super.init(source, options, env);
        if(env.getIteratorScope() == IteratorScope.scan)
        {
            isDisabled = true;
            return;
        }
        
        String user = options.get("username");
        String password = options.get("password");
        String instanceId = options.get("instance");
        tableId = options.get("tableId");
        
        AuthInfo authInfo = new AuthInfo();
        authInfo.setUser(user);
        authInfo.setPassword(password.getBytes());
        authInfo.setInstanceId(instanceId);
        
        authInfo.setInstanceId(instanceId);
        
        
        try
        {
            connector  = HdfsZooInstance.getInstance().getConnector(authInfo);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void next() throws IOException
    {
        if( isDisabled )
        {
            super.next();
            return;
        }
        
        SortedKeyValueIterator<Key, Value> source = getSource();
        while(source.hasTop())
        {
            Key key = source.getTopKey();
            Value val = source.getTopValue();
            
            source.next();
        }
        doMutations(currentRowStart);
        super.next();
    }
    
    public void process( Key key, Value val )
    {
        if(currentRowStart == null)
        {
            currentRowStart = key;
        }
        else
        {
            if( !currentRowStart.getRow().equals(key.getRow()) )
            {
                doMutations(currentRowStart);
                currentRowStart = key;
                count = 0;
            }
        }
        count++;
    }

    public void doMutations( Key rowStartKey )
    {
        BatchWriter writer = null;
        try
        {
            writer = connector.createBatchWriter(tableId, 0, 0, 2);
            Mutation mutation = new Mutation( rowStartKey.getRow() );
            Text colQ = new Text("count");
            ByteBuffer b = ByteBuffer.allocate(4);
            b.putInt(count).array();
            mutation.put(rowStartKey.getColumnFamily(),
                    colQ,
                    new Value(b));
            writer.addMutation(mutation);
        }
        catch(Exception e)
        {
            throw new RuntimeException(e);
            
        }
        finally
        {
            if(writer != null)
            {
                try
                {
                    writer.close();
                }
                catch (MutationsRejectedException e)
                {
                    throw new RuntimeException(e);              
                }
            }
        }
        
    }
}
+
Billie Rinaldi 2013-04-18, 17:43
+
Peter Tillotson 2013-04-19, 08:09
+
Bell, Philip S CIV SPAWAR... 2013-04-18, 17:04