Hello, Would there be any interest in developing a SQL-layer on top of Accumulo? I'm part of the Apache Phoenix project and we've built a similar system on top of HBase. I wanted to see if there'd be interest on your end at working with us to generalizing our client and provide in a server that would do Accumulo-specific push down in support of a SQL layer. I suspect there's enough similarity between HBase and Accumulo that this would be feasible. Thanks, James
However, I'm not sure that all of us understand the level of work that would be required for this task (at least, I certainly don't). Can you provide a slightly more detailed plan for what you envision?
Thanks, Mike On Tue, Apr 29, 2014 at 1:32 AM, James Taylor <[EMAIL PROTECTED]>wrote:
I have taken a quick look at phoenix. It's baked into HBase-specific features pretty hard.
It uses coprocessors to do things like create index entries. This is a common enough idiom in the HBase community, but not something we've supported in Accumulo. In general, you do not want an accumulo Iterator or Constraint generating data for other tables.
Like Eric said, I'm a little scared because I know that Phoenix is rather baked into HBase's API. But, that's half the fun in writing some new code :)
I'd be happy to help evaluate what this would look like - what is different (both good and bad) in Accumulo. Like was previously mentioned, targeting the Accismus (Percolator) prototype to generate the secondary indices would, IMO, be the best target here. I know it's in the very early stages right now, but I still believe that it would be the long-term solution.
@Mike - thanks for pointing out that JIRA. I'll comment there with more detail. My high-level thinking would be to work with your community to do a feasibility study and perhaps POC. I'd be, of course, relying on your expertise of Accumulo, as my knowledge is pretty limited.
@Josh - it's less baked in than you'd think on the client where the query parsing, compilation, optimization, and orchestration occurs. The client/server interaction is hidden behind the ConnectionQueryServices interface, the scanning behind ResultIterator (in particular ScanningResultIterator), the DML behind MutationState, and KeyValue interaction behind KeyValueBuilder. Yes, though, it would require some more abstraction, but probably not too bad, though. On the server-side, the entry points would all be different and that's where I'd need your insights for what's possible.
@Eric - I agree about having txn support (probably through snapshot isolation) by controlling the timestamp, and then layering indexing on top of that. That's where we're headed. But I wouldn't let that stop the effort - it would just be layered on top of what's already there. FWIW, there's another interesting indexing model that has been termed "local indexing"( https://github.com/Huawei-Hadoop/hindex) which is being worked on right now (should be available in either our 4.1 or 4.2 release). In this model, the table data and index data are co-located on the same region server through a kind of "buddy" region mechanism. The advantage is that you take no hit at write time, as you're writing both the index and table data together. Not sure how/if this would transfer over to the Accumulo world.
TLDR? Let's continue in the JIRA?
On Tue, Apr 29, 2014 at 7:45 AM, Josh Elser <[EMAIL PROTECTED]> wrote:
Definitely. I'm a little concerned about what's expected to be provided by the "database" (HBase, Accumulo) as I believe HBase is a little more flexible in allowing writes internally where Accumulo has thus far said "you're gonna have a bad time". Interesting. Given that Accumulo doesn't have a fixed column family schema, this might make index generation even easier (maybe "cleaner" is the proper word). You could easily co-locate the indices with the data, given them a proper name.
Problem still exists that we don't have a solid way to do this solely inside of Accumulo ATM. I'd imagine that if someone stepped up to implement coprocessors, we'd be taking the route of a separate, standalone process (as opposed to in-RegionServer). Hypothetically, we could do the same for Phoenix in the short-term.
Can you quantify what would be expected by Accumulo to integrate with Phoenix (maybe list what exactly is done inside of HBase at a high level?) so that we could give some more targeted ideas/feelings as to what the level of work would be inside Accumulo? Mailing list is fine by me for while we get this hashed out :). We can move to Jira when we start getting into specifics.
On Tue, Apr 29, 2014 at 11:57 AM, Josh Elser <[EMAIL PROTECTED]> wrote:
Tell me more about what you mean by "allowing writes internally".
With HBase, you can do something similar (though, you're right, you'd need to create the column family upfront or take the hit of creating it dynamically - that's a nice feature that Accumulo has). The reason this doesn't work is that you need a different row key so that the index rows are ordered according to their indexed column values. If you put it in a column family of the data table, they're ordered in the same way as the data table. This makes range scans over index tables very expensive, as the rows would need to be re-ordered.
There's not a lot of hard/fast requirements. Most of what Phoenix does is to optimize performance by leveraging the capabilities of the server. In terms of hard/fast requirements, these come to mind: - data is returned in row key order from range scans - a scan may set a start key/stop key to do a range scan - a row key may be composed of arbitrary bytes - a client may "pre-split" a table by providing the region boundaries at table create time (we rely on this for salting to prevent hotspotting: http://phoenix.incubator.apache.org/salted.html). - the client has access to the region boundaries of a table (this allows for better parallelization) - the client may issue chunk up a scan into smaller, multiple scans and run them in parallel Some of these may be a bit squishy, as there may be existing machinery already in your client programming model that could be leverage. The client API of HBase, for example, does not provide the ability out of the box to parallelize a scan, so this is something Phoenix had to add on top (through chunking up scans at or within region boundaries).
Phoenix manages the metadata of your tables (tables, columns, indexes, views, etc) in an HBase table. DDL statements such as CREATE TABLE, DROP TABLE, ALTER TABLE are atomic, transactional operations b/c we don't want our metadata table to get in a corrupt state. To accomplish this, we rely on: - setting a "split policy" that ensures that the table data for a given "tenant" (we support multi-tenancy: http://phoenix.incubator.apache.org/multi-tenancy.html) stay together in the same region. - putting the data using an API that guarantees that either the entire batch of mutations succeed or fail completely. Again, these are details of our implementation on HBase which do not necessarily need to be implemented in the same way on a different system.
Phoenix supports sequences which are atomically incremented values. This is done through a coprocessor currently, due to some limitations with the HBase Increment API, but the idea is the same as an atomic increment.
Phoenix does the following push down: - the WHERE clause gets transformed into three things: a start/stop key of a scan, a skip scan filter to efficiently navigate the key space (see http://phoenix-hbase.blogspot.com/2013/05/demystifying-skip-scan-in-phoenix.html), and a custom filter to rule out a row based on some java code that does expression evaluation. - the GROUP BY clause gets pushed to the server and a coprocessor runs the scan on each region so that the client doesn't have to get back all the raw data. Instead, the client gets back the aggregated data (to conserve network bandwidth and to run the scan where the data lives). The client then does a final merge sort. - the ORDER BY clause used in combination with the LIMIT clause is a TopN query. We optimize this by each region holding on to the top N values with the client then doing a merge sort with the limit applied. - the ORDER BY clause on it's own gets executed on each region (spooled using memory mapped files) and then the client does a merge sort. This spooling could potentially be done on the client side. - joins are executed as a broadcast hash join. We run one side of the query (with the filters applied), compact the results, and send them to each region server where they are cached while we run the other side of the query. A coprocessor then does a map lookup (equi-joins only are supported currently) to join based on the join key and returns the joined results (i.e. the concatenated values in a single, condensed key value as access from the client is positional post-join).
For our global secondary indexes (local secondary indexes are different as we discussed already), we trap updates to the data table through a coprocessor. For index maintenance you need to know when a change occurs to a data row what the prior value of the row was. The reason is because you need to delete the index row corresponding to the old data row and then insert the index row corresponding to the new value (remember, the index value makes up the row key). By doing this operation through a coprocessor, we know that we can get the prior data row state locally. We still need to issue a Put from one region server to another, but this isn't really an extra hop, as if it was done on the client, the same hop would need to be done (but the old row state would need to be pulled over to the client which is not necessary with the coprocessor based approach). For more on global secondary indexing, see http://phoenix.incubator.apache.org/secondary_indexing.html (there are some good presentations at the end of the page that provide more technical detail).
Phoenix also allows "point-in-time" queries where a client may establish a connection at an earlier timestamp. If your table is setup to keep multiple versions of the same row, then you can query "back-in-time" and will see the data as it was at that point. We more or less get this for free with the MVCC model of HBase by specifying a max timestamp on a scan. One slightly tricky bit is we correlate the current DDL of your table based on the same timestamp as with your data. So when you go back-in-time like this, you'll also see the structure of your table as it was at time also.
Haha, sorry, that was a sufficiently ominous statement with insufficient context.
For discussion sake, let's just say HBase coprocessors and Accumulo iterators are equivalent, purely in the scope of "running server-side code" (in the RegionServer/TabletServer). However, there is a notable difference in the pipeline where each of those are implemented.
Coprocessors have built-in hooks that let you get updates on PUT/GET/DELETE/etc as well as pre and post each of those operations. In other words, they provide hooks at a "high database level".
Iterators tend to be much closer to the data itself, only dealing with streams of data (other iterators stacked on one another). Iterators implement versioning, visibilities, and can even implement complex searches. The downside of this approach is that iterators lack any means to safely write data _outside of the sorted Key-Value pairs in the tablet currently being processed_. It's possible to make in tablet updates, but sorted order within a large tablet might make this difficult as well.
This is why I was thinking percolator would be a better solution, as it's meant for handling updates like this server-side. However, I imagine it would be possible, in the short-term, to make some separate process between Phoenix and Accumulo which handles writes. Ah, of course. You need the term up front to make it sort properly. All of these look fine. The Accumulo BatchScanner does that parallelization for you which is really nice (handling tablet migration and all that fun stuff transparently).
I'd have to look again at how our mutation failures are handled (or someone else can chime in). This might be something to keep an eye on depending on the distribution of mutations in regards to tables. Conditional Mutations in the about-to-be-released version 1.6.0 will provide this. I've written an iterator to do a group by previously. Depending on the schema this is fine. This is an interesting one. If you remove the possibility of tablets splitting out from underneath you and you had a view of the splits, you could probably pull it off. Unless we can do some trickery with the schema, yeah, client side. The join approach would need to be implemented some other way for the earlier stated comparison of iterators and coprocessors. Right, you want to remove the old index value and update a new index value (actually being two unique keys) in the same transaction to ensure a valid index. Or, at least ensure that you never remove the old value, and die before inserting the new value.
Again, not going to work well in an iterator. I don't see this as a problem. As long as we remove the versioning iterator from a table (which keeps the most recent version of a key by default), it should be pretty easy to implement an iterator which adheres to the "max timestamp" semantics.
Thanks for the explanations, Josh. This sounds very doable. Few more comments inline below.
James On Wed, Apr 30, 2014 at 8:37 AM, Josh Elser <[EMAIL PROTECTED]> wrote:
Another fallback might be to do global index maintenance on the client. It'd just be more expensive, especially if you want to handle out-of-order updates (which are particularly tricky, as you have to get multiple versions of the rows to work out all the different scenarios here).
A second fallback might be to support only local indexing. Does Accumulo have the concept of a "custom load balancer" that would allow you to co-locate two regions from different tables? The local-index features has kind of driven some feature requests on that front for HBase - mainly callbacks when a region is split or re-located. The rows of the local index are prefixed with the region start key to keep them together and identify them.
That's nice that Accumulo has this built-in. Does it allow the client to specify the split points for the scan in some way?
Client-side could be another fallback. The coprocessor approach is really only a big win in two cases: if you have a join which doesn't have many matches (as those rows get filtered on the server-side), or for correlated sub queries or exists queries where you can filter or collapse many rows to one or none on the server-side rather than return them all to the client.
The wikisearch example provides something similar to a local index. Rather than stuff things into two tablets, a single row in accumulo contains both the index and data stored in separate column families. Iterator trees are used to execute queries and retrieve data with that row. On Thu, May 1, 2014 at 2:24 AM, James Taylor <[EMAIL PROTECTED]> wrote:
Agreed with what Bill said. Co-locating indices within the same row simplifies this a bit, IMO. <snip/> Assuming I understand properly, you don't need to be cognizant of the splits. You just specify the Ranges (where each Range is a start key and end key) and the Accumulo client API does the rest. You can be efficient by structuring your data so that you don't touch every tabletserver for every query -- this seems to be what's being suggested.
What do you think is next, James?
I know I won't have a lot of time to devote into heavy development with what I've already signed up for in the next few months, but I'd still like to try to help out where possible. Is anyone else on the Accumulo side interested in getting involved?
Sorry for the delay in getting back to you - things got a bit crazy with our graduation and HBaseCon happening at the same time.
@Josh & Bill - r.e. Co-locating indices within the same row simplifies this a bit. The secondary indexes need to be in row key order by the indexed columns, so co-locating them in the data table wouldn't allow the lookup and range scan abilities we'd need. The advantage of the index is that you don't need to look at all the data, but can do a point lookup or range scan based on the usage of the indexed columns in a query.
@Josh - r.e. Assuming I understand properly, you don't need to be cognizant of the splits. You just specify the Ranges (where each Range is a start key and end key) and the Accumulo client API does the rest.
Typically the Ranges are merge sorted on the client, so this might require an extension to the Accumulo client.
r.e. Next steps.
We'd definitely need an expert on the Accumulo side to proceed. I'm happy to help on the Phoenix side - I'll post a note on our dev list too to see if there are other folks interested as well. Given the similarities between Accumulo and HBase and the abstraction Phoenix already has in place, I don't think the effort would be large to get something up and running. Maybe a phased approach, would make sense: first with query support and next with secondary index support?
Not sure where this stacks up in terms of priority for you all. At Salesforce, we saw a specific need for this with HBase, the "big data store" on top of which we'd choose to standardize. We realized early on that we'd never get the adoption we wanted without providing a different, more familiar programming model: namely SQL. Since we were targeting supporting interactive web-based applications, anything map/reduce based wasn't a fit which led us to create Phoenix. Perhaps there are members in your community in the same boat?
On Fri, May 2, 2014 at 1:44 PM, Josh Elser <[EMAIL PROTECTED]> wrote:
So there may be a bit of confusion with storing index and data in the same row. By "row" I just mean the logical Accumulo unit, not a "row" as in "thing in my relational table." Synonyms for "row" in this scheme are "shard" and "document partition".
You can store multiple documents and indices for those documents in different column families within the same row. You then have separate readers for the indices and document data ("sources" in Iterator terms). Point and range queries are still possible in this fashion, and are made even easier if there's another level that maps terms to rows/shards/partition. The wikisearch example is an (admittedly rough) implementation of this.
I think looking at how "buddy" regions work may help clarify things, since I imagine it works similarly. If the coprocessor is just reading from a region "I", that that contains index data for only region "D", then that maps pretty well to an iterator scanning index data from a column family "I" and fetching documents from a column family "D".
On Thu, May 8, 2014 at 1:09 AM, James Taylor <[EMAIL PROTECTED]> wrote:
@William - it's entirely possible that my HBase terminology is not mapping well to Accumulo terminology. If Accumulo has a capability not present in HBase that'll handle this, that'd be great.
In HBase terminology, by row I mean all of the key values across all column families with the same row key (Row ID in Accumulo?). So in HBase, it doesn't work to store the index data in a separate column family for the same row, because the rows are ordered according to the data table row key. We need the rows of an index to be ordered by the row key formed by the indexed columns instead. Otherwise we have to re-sort the rows which is more expensive than just doing a scan over the data table.
With buddy regions, the two regions are from different tables with different row key orders. All of the data from "D" for a given region is contained in the buddy region for "I", but in a different order. We equally rely on the buddy region for "I" being in row key order according to the indexed columns (as opposed to the row key order of the data table).
Thanks, James On Sat, May 10, 2014 at 7:21 PM, William Slacum < [EMAIL PROTECTED]> wrote:
(sorry for the delay, still trying to stay on top of mail from the outage)
I think I know what Bill is trying to get at here and it hinges on the fact that Accumulo doesn't require you to define the column families for a table up front (it has a default locality group which all colfams which don't have a locality group defined go into -- differs from HBase where locality group == colfam).
Because of this, you can use the column family and qualifier to get the properly sorting index records instead of using the row key (assuming the row is just some bucket/partitioning element). Thus, you can co-locate index and data key-values within the same row if you're tricky enough with how you create the table. :)
That sounds promising, Josh & William. Is there a performance penalty with this approach (versus traversing the rows in row key order)? Thanks, James On Fri, May 16, 2014 at 8:27 AM, Josh Elser <[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation projects and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext