Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
HBase, mail # user - Endpoint and Observer work together?


Copy link to this message
-
Re: Endpoint and Observer work together?
Gary Helmling 2013-10-02, 21:41
Your DemoObserver is not being invoked because DemoEndpoint is opening a
scanner directly on the region:

    RegionCoprocessorEnvironment env
=(RegionCoprocessorEnvironment)getEnvironment();
    InternalScanner scanner = env.getRegion().getScanner(scan);

The RegionObserver.postScannerNext() hook is invoked higher up in the
client call stack.

If the processing of these two coprocessors is so tightly related, then I'd
recommend just combining them to a single class (a RegionObserver can also
be an endpoint):

public class DemoObserver extends BaseRegionObserver implements
DemoProtocol  {

Or if for some reason this is difficult to do, then separate out your
KeyValue handling into a shared class that can be use by both
DemoObserver.postScannerNext() and the InternalScanner result handling in
DemoEndpoint.scanRows().
On Wed, Oct 2, 2013 at 7:21 AM, rgaimari <[EMAIL PROTECTED]> wrote:

> Hi,
>
> I've created some demo code to show the problem.  Here's the Observer:
>
> ...
> public class DemoObserver extends BaseRegionObserver {
>   byte[] personFamily = Bytes.toBytes("Person");
>
>   @Override
>   public boolean postScannerNext(
>         ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
>         List<Result> results, int limit, boolean hasMore)
>         throws IOException {
>     List<Result> newResults = new ArrayList<Result>();
>     for (Result result : results) {
>         List<KeyValue> newKVList = new ArrayList<KeyValue>();
>         for (KeyValue kv : result.list()) {
>           String newVal = Bytes.toString(kv.getValue()).toUpperCase();
>           newKVList.add(new KeyValue(kv.getRow(), kv.getFamily(),
> kv.getQualifier(), kv.getTimestamp(),
>                                 Bytes.toBytes(newVal)));
>         }
>         newResults.add(new Result(newKVList));
>     }
>     results.clear();
>     results.addAll(newResults);
>     return super.postScannerNext(e, s, results, limit, hasMore);
>   }
> }
>
>
> And here's the Endpoint:
>
> ...
> public class DemoEndpoint extends BaseEndpointCoprocessor implements
>         DemoProtocol {
>
>   @Override
>   public List<KeyValue> scanRows(Filter filter) throws IOException {
>     Scan scan = new Scan();
>     scan.setFilter(filter);
>     RegionCoprocessorEnvironment env > (RegionCoprocessorEnvironment)getEnvironment();
>     InternalScanner scanner = env.getRegion().getScanner(scan);
>
>     List<KeyValue> retValues = new ArrayList<KeyValue>();
>     boolean more = false;
>     List<KeyValue> res = new ArrayList<KeyValue>();
>     do {
>         res.clear();
>         more = scanner.next(res);
>         if (res != null)
>           retValues.addAll(res);
>     } while (more);
>
>     scanner.close();
>     return retValues;
>   }
> }
>
> They are loaded in separate jar files, and they are both attached to the
> table:
>
> 1.9.3p448 :009 >   describe 'Demo'
> DESCRIPTION                                                        ENABLED
>  {NAME => 'Demo', coprocessor$2 => 'hdfs:///user/hduser/DemoEndpoi true
>  nt.jar|demo.DemoEndpoint|1001', coprocessor$1 => 'hdfs:///user/hd
>  user/DemoObserver.jar|demo.DemoObserver|1', FAMILIES => [{NAME =>
>   'Person', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE',
>  REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE',
>   MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS =>
>  'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCODE_ON_DI
>  SK => 'true', BLOCKCACHE => 'true'}]}
> 1 row(s) in 0.0880 seconds
>
> If I run a test where I do a scan directly on the client (with no filter),
> I
> get the following results:
>
> <row01>, <Person>, <Address>, <123 MAIN STREET>
> <row01>, <Person>, <Name>, <JOHN SMITH>
> <row02>, <Person>, <Address>, <234 ELM STREET>
> <row02>, <Person>, <Name>, <LARRY DAVID>
> <row03>, <Person>, <Address>, <345 SCOTT STREET>
> <row03>, <Person>, <Name>, <JOHN DOE>
>
> The values are all capitalized, as the Observer was supposed to do.
> However, if I then run a scan through the Endpoint coprocessor (with a