Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Accumulo >> mail # user >> Using iterators to add columns to a row


Copy link to this message
-
Using iterators to add columns to a row


Apologies in advanced - this is some of my first Accumulo code, but I suspect there is a much better way to do this. 

Basically I'm trying to add an edge count column to each row of my table, so I get rows along the following line
 - node1 {  to:count:3, to:node2:, to:node3:, to:node3:  }

But on the client side I only need write 
 - node1 {  to:node2:, to:node3:, to:node3:  }

I'd like to use the same approach to add indexes to separate column families, and combiners to aggregate.  

Aside from the inefficiency of a BatchWriter for each mutation 
 - is this the correct approach? or
 - is there a simpler way to achieve this?

Many thanks in advance

Peter T
 
--- code compiles but not tested ---

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.hadoop.io.Text;

public class EdgeCountIterator extends SortedKeyIterator
{
    private boolean isDisabled = false;
    private Connector connector;
    private Key currentRowStart = null;
    private String tableId;
    private int count = 0;
    
    @Override
    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException
    {
        super.init(source, options, env);
        if(env.getIteratorScope() == IteratorScope.scan)
        {
            isDisabled = true;
            return;
        }
        
        String user = options.get("username");
        String password = options.get("password");
        String instanceId = options.get("instance");
        tableId = options.get("tableId");
        
        AuthInfo authInfo = new AuthInfo();
        authInfo.setUser(user);
        authInfo.setPassword(password.getBytes());
        authInfo.setInstanceId(instanceId);
        
        authInfo.setInstanceId(instanceId);
        
        
        try
        {
            connector  = HdfsZooInstance.getInstance().getConnector(authInfo);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void next() throws IOException
    {
        if( isDisabled )
        {
            super.next();
            return;
        }
        
        SortedKeyValueIterator<Key, Value> source = getSource();
        while(source.hasTop())
        {
            Key key = source.getTopKey();
            Value val = source.getTopValue();
            
            source.next();
        }
        doMutations(currentRowStart);
        super.next();
    }
    
    public void process( Key key, Value val )
    {
        if(currentRowStart == null)
        {
            currentRowStart = key;
        }
        else
        {
            if( !currentRowStart.getRow().equals(key.getRow()) )
            {
                doMutations(currentRowStart);
                currentRowStart = key;
                count = 0;
            }
        }
        count++;
    }

    public void doMutations( Key rowStartKey )
    {
        BatchWriter writer = null;
        try
        {
            writer = connector.createBatchWriter(tableId, 0, 0, 2);
            Mutation mutation = new Mutation( rowStartKey.getRow() );
            Text colQ = new Text("count");
            ByteBuffer b = ByteBuffer.allocate(4);
            b.putInt(count).array();
            mutation.put(rowStartKey.getColumnFamily(),
                    colQ,
                    new Value(b));
            writer.addMutation(mutation);
        }
        catch(Exception e)
        {
            throw new RuntimeException(e);
            
        }
        finally
        {
            if(writer != null)
            {
                try
                {
                    writer.close();
                }
                catch (MutationsRejectedException e)
                {
                    throw new RuntimeException(e);              
                }
            }
        }
        
    }
}
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB