Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Accumulo, mail # user - joining accumulo tables with mapreduce


+
Aji Janis 2013-04-16, 21:28
+
Keith Turner 2013-04-17, 14:59
+
Aji Janis 2013-04-17, 20:43
Copy link to this message
-
Re: joining accumulo tables with mapreduce
Keith Turner 2013-04-17, 23:39
On Wed, Apr 17, 2013 at 4:43 PM, Aji Janis <[EMAIL PROTECTED]> wrote:
> Keith,
>
>  You hit the problem that I purposely didn't ask.
> -Accumulo inputformat doesn't support multiple tables at this point and
> -I can't run three mappers in parallel on different tables and combine/send
> their output to a reducer (that I know of).
>
> If all three tables had the same rowid (eg: rowA exists in table 1, 2 and 3)
> then we can write the row from each table w/a different
> family/qualifier/value to a new table. So it will be three mappers run
> sequentially and end result is a join... this is the best I came up with so
> far. If rowids are different accross three tables then I would have to
> reformat my rowid from all three tables (normalize) prior to writing the
> fourth/final table.

This is a good option.  Also, you could run the three map reduce jobs
in parallel.

>
> Is calling a scanner on the other two tables from within a mapper (that
> takes the first table as the input) bad? Any clues on how that could be done
> in mapreduce?

It depends, it may not be bad.  The best strategy depends on the
relative sizes of the tables and the relative size of the
intersection.   For example if you are mapping of a small table with
1000 rows and for each row you do a lookup into a much bigger table
with 10^9 rows then this is a good strategy.

If the tables have a lot in common, you may be able to create a
scanner in the mapper.  On the first row create the scanner and
position at the row passed to the mapper.  Do not reset the scanner
for each row, just consume its input if its less than the row passed
to the mapper.  This may be faster than reseting the scanner for each
row.  But it depends on the intersection size.

If you had two large tables, with very few rows in common, then maybe
you could use bloom filters.   Make a pass over each table generating
a bloom filter of all of the rows.  Make  another pass using the bloom
filters from the first pass to filter out rows and write the rows that
pass to a third table.  This way you avoid sorting a large amount of
data when there is only a small intersection between the tables.
>
>
> On Wed, Apr 17, 2013 at 10:59 AM, Keith Turner <[EMAIL PROTECTED]> wrote:
>>
>> If I am understaning you correctly, you are proposing for each row a
>> mapper gets to look that row up in two other tables?  This would
>> result in a lot of little round trip RPC calls and random disk
>> accesses.
>>
>> I think a better solution would be to read all three tables into your
>> mappers, and do the join in the reduce.  This solution will avoid all
>> of the little RPC calls and do lots of sequential I/O instead of
>> random accesses.  Between the map and reduce, you could track which
>> table each row came from.  Any filtering could be done in the mapper
>> or by iterators.  Unfortunately Accumulo does not have the needed
>> input format for this out of the box.  There is a ticket,
>> ACCUMULO-391.
>>
>>
>>
>> On Tue, Apr 16, 2013 at 5:28 PM, Aji Janis <[EMAIL PROTECTED]> wrote:
>> > Hello,
>> >
>> >  I am interested in learning what the best solution/practices might be
>> > to
>> > join 3 accumulo tables by running a map reduce job. Interested in
>> > getting
>> > feedback on best practices and such. Heres a pseudo code of what I want
>> > to
>> > accomplish:
>> >
>> >
>> > AccumuloInputFormat accepts tableA
>> > Global variable <table_list> has table names: tableB, tableC
>> >
>> > In a mapper, for example, you would do something like this:
>> >
>> > for each row in TableA
>> >  if (row.family == "abc" && row.qualifier == "xyz") value = getValue()
>> >  if (foundvalue) {
>> >
>> >   for each table in table_list
>> >     scan table with (this rowid && family = "def")
>> >     for each entry found in scan
>> >       write to final_table (rowid, value_as_family,
>> > tablename_as_qualifier,
>> > entry_as_value_string)
>> >
>> > }//end if foundvalue
>> >
>> > }//end for loop
>> >
>> >
>> > This is a simple version of what I want to do. In my non mapreduce java
+
Kurt Christensen 2013-05-04, 14:15
+
David Medinets 2013-04-18, 01:03