Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase >> mail # user >> Endpoint and Observer work together?


Copy link to this message
-
Re: Endpoint and Observer work together?
Your DemoObserver is not being invoked because DemoEndpoint is opening a
scanner directly on the region:

    RegionCoprocessorEnvironment env
=(RegionCoprocessorEnvironment)getEnvironment();
    InternalScanner scanner = env.getRegion().getScanner(scan);

The RegionObserver.postScannerNext() hook is invoked higher up in the
client call stack.

If the processing of these two coprocessors is so tightly related, then I'd
recommend just combining them to a single class (a RegionObserver can also
be an endpoint):

public class DemoObserver extends BaseRegionObserver implements
DemoProtocol  {

Or if for some reason this is difficult to do, then separate out your
KeyValue handling into a shared class that can be use by both
DemoObserver.postScannerNext() and the InternalScanner result handling in
DemoEndpoint.scanRows().
On Wed, Oct 2, 2013 at 7:21 AM, rgaimari <[EMAIL PROTECTED]> wrote:

> Hi,
>
> I've created some demo code to show the problem.  Here's the Observer:
>
> ...
> public class DemoObserver extends BaseRegionObserver {
>   byte[] personFamily = Bytes.toBytes("Person");
>
>   @Override
>   public boolean postScannerNext(
>         ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
>         List<Result> results, int limit, boolean hasMore)
>         throws IOException {
>     List<Result> newResults = new ArrayList<Result>();
>     for (Result result : results) {
>         List<KeyValue> newKVList = new ArrayList<KeyValue>();
>         for (KeyValue kv : result.list()) {
>           String newVal = Bytes.toString(kv.getValue()).toUpperCase();
>           newKVList.add(new KeyValue(kv.getRow(), kv.getFamily(),
> kv.getQualifier(), kv.getTimestamp(),
>                                 Bytes.toBytes(newVal)));
>         }
>         newResults.add(new Result(newKVList));
>     }
>     results.clear();
>     results.addAll(newResults);
>     return super.postScannerNext(e, s, results, limit, hasMore);
>   }
> }
>
>
> And here's the Endpoint:
>
> ...
> public class DemoEndpoint extends BaseEndpointCoprocessor implements
>         DemoProtocol {
>
>   @Override
>   public List<KeyValue> scanRows(Filter filter) throws IOException {
>     Scan scan = new Scan();
>     scan.setFilter(filter);
>     RegionCoprocessorEnvironment env > (RegionCoprocessorEnvironment)getEnvironment();
>     InternalScanner scanner = env.getRegion().getScanner(scan);
>
>     List<KeyValue> retValues = new ArrayList<KeyValue>();
>     boolean more = false;
>     List<KeyValue> res = new ArrayList<KeyValue>();
>     do {
>         res.clear();
>         more = scanner.next(res);
>         if (res != null)
>           retValues.addAll(res);
>     } while (more);
>
>     scanner.close();
>     return retValues;
>   }
> }
>
> They are loaded in separate jar files, and they are both attached to the
> table:
>
> 1.9.3p448 :009 >   describe 'Demo'
> DESCRIPTION                                                        ENABLED
>  {NAME => 'Demo', coprocessor$2 => 'hdfs:///user/hduser/DemoEndpoi true
>  nt.jar|demo.DemoEndpoint|1001', coprocessor$1 => 'hdfs:///user/hd
>  user/DemoObserver.jar|demo.DemoObserver|1', FAMILIES => [{NAME =>
>   'Person', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE',
>  REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE',
>   MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS =>
>  'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCODE_ON_DI
>  SK => 'true', BLOCKCACHE => 'true'}]}
> 1 row(s) in 0.0880 seconds
>
> If I run a test where I do a scan directly on the client (with no filter),
> I
> get the following results:
>
> <row01>, <Person>, <Address>, <123 MAIN STREET>
> <row01>, <Person>, <Name>, <JOHN SMITH>
> <row02>, <Person>, <Address>, <234 ELM STREET>
> <row02>, <Person>, <Name>, <LARRY DAVID>
> <row03>, <Person>, <Address>, <345 SCOTT STREET>
> <row03>, <Person>, <Name>, <JOHN DOE>
>
> The values are all capitalized, as the Observer was supposed to do.
> However, if I then run a scan through the Endpoint coprocessor (with a
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB