Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase, mail # user - problem in testing coprocessor function


Copy link to this message
-
Re: problem in testing coprocessor endpoint
ch huang 2013-07-12, 04:57
what your describe is how to load endpoint coprocessor for every region in
the hbase, what i want to do is just load it into my test table ,only for
the regions of the table

On Fri, Jul 12, 2013 at 12:07 PM, Asaf Mesika <[EMAIL PROTECTED]> wrote:

> The only way to register endpoint coprocessor jars is by placing them in
> lib dir if hbase and modifying hbase-site.xml to point to it under a
> property name I forgot at the moment.
> What you described is a way to register an Observer type coprocessor.
>
>
> On Friday, July 12, 2013, ch huang wrote:
>
> > i am testing coprocessor endpoint function, here is my testing process
> ,and
> > error i get ,hope any expert on coprocessor can help me out
> >
> >
> > # vi ColumnAggregationProtocol.java
> >
> > import java.io.IOException;
> > import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
> > // A sample protocol for performing aggregation at regions.
> > public interface ColumnAggregationProtocol
> > extends CoprocessorProtocol {
> > // Perform aggregation for a given column at the region. The aggregation
> > // will include all the rows inside the region. It can be extended to
> > // allow passing start and end rows for a fine-grained aggregation.
> >    public long sum(byte[] family, byte[] qualifier) throws IOException;
> > }
> >
> >
> > # vi ColumnAggregationEndpoint.java
> >
> >
> > import java.io.FileWriter;
> > import java.io.IOException;
> > import java.util.ArrayList;
> > import java.util.List;
> > import org.apache.hadoop.hbase.CoprocessorEnvironment;
> > import org.apache.hadoop.hbase.KeyValue;
> > import org.apache.hadoop.hbase.client.Scan;
> > import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
> > import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
> > import org.apache.hadoop.hbase.ipc.ProtocolSignature;
> > import org.apache.hadoop.hbase.regionserver.HRegion;
> > import org.apache.hadoop.hbase.regionserver.InternalScanner;
> > import org.apache.hadoop.hbase.util.Bytes;
> >
> > //Aggregation implementation at a region.
> >
> > public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
> >   implements ColumnAggregationProtocol {
> >      @Override
> >      public long sum(byte[] family, byte[] qualifier)
> >      throws IOException {
> >        // aggregate at each region
> >          Scan scan = new Scan();
> >          scan.addColumn(family, qualifier);
> >          long sumResult = 0;
> >
> >          CoprocessorEnvironment ce = getEnvironment();
> >          HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion();
> >          InternalScanner scanner = hr.getScanner(scan);
> >
> >          try {
> >            List<KeyValue> curVals = new ArrayList<KeyValue>();
> >            boolean hasMore = false;
> >            do {
> >          curVals.clear();
> >          hasMore = scanner.next(curVals);
> >          KeyValue kv = curVals.get(0);
> >          sumResult += Long.parseLong(Bytes.toString(kv.getValue()));
> >
> >            } while (hasMore);
> >          } finally {
> >              scanner.close();
> >          }
> >          return sumResult;
> >       }
> >
> >       @Override
> >       public long getProtocolVersion(String protocol, long clientVersion)
> >              throws IOException {
> >          // TODO Auto-generated method stub
> >          return 0;
> >       }
> >
> >       @Override
> >
> >       public ProtocolSignature getProtocolSignature(String protocol,
> >              long clientVersion, int clientMethodsHash) throws
> IOException
> > {
> >           // TODO Auto-generated method stub
> >           return null;
> >       }
> > }
> >
> > i compile and pack the two into test.jar,and put it into my HDFS
> filesystem
> >
> > and load it into my test table
> >
> > hbase(main):006:0> alter 'mytest', METHOD =>
> > 'table_att','coprocessor'=>'hdfs:///
> > 192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001<http://192.168.10.22:9000/alex/test.jar%7CColumnAggregationEndpoint%7C1001>