-Re: Streaming data locality
Keith Wiley 2011-02-04, 07:03
On Feb 3, 2011, at 6:25 PM, Allen Wittenauer wrote:
> I think you need to give a more concrete example of what you are doing. -cache is used for sending files with your job and should have no bearing on what your input is to your job. Something tells me that you've cooked something up that is overly complex. :D
Fair enough. Here's my setup.
Our data consists of a few million binary (image) files, only about 6MB each uncompressed, 2.5MB gzipped. A typical job does not need to touch every image, rather only a tiny subset of the images is required. In this sense our job is unlike a canonical Hadoop job which combs an entire database. As such our current implementation (not streaming, but "normal" Hadoop) first uses a SQL query against a nonHadoop database to find the names of the relevant images for a given job, then sets up input splits for those individual files and assigns them as input to the Hadoop job.
Side note: since the images are so small and we use so many at a time, they are packed into sequence files. Building the input splits is a very precise piece of code since it must splice out the necessary image files from their locations in the sequence files as eventual Hadoop input splits.
The mappers receive byte arrays as input from the record reader (in the form of a WritableComparable class that contains the byte array).
Our computational heavy lifting resides in a very large and complex set of native libraries (C++) which our Java mappers call using JNI, passing the byte arrays (the image files slurped into memory) through JNI to a small native .so file which then coordinates the processing with the underlying processing library. The native routines return byte arrays through JNI that represent the converted images. The Java mappers pass the byte arrays on to the reducer stage for final processing. In our current design, the reducer's computational complexity is simple enough to be implemented and performed entirely in Java, sans JNI or native code.
This system works fine at an organizational level, but unfortunately we have been plagued by extremely insidious segfaults and memory-allocation exceptions deep in the native library, bugs which absolutely never occur in a nonHadoop or nonJava environment when the library is accessed solo from a native C++ program (or from python scripts).
I have determined that the problem is not an incompatibility between the native library and Hadoop since a nonHadoop Java program, using the same JNI interface to access our native routines, exhibits the same errors. This is some fundamental incompatibility between Java/JVM/JNI and our native library. Like I said, the library is very complicated. It involves extremely fancy template programming and I suspect JNI is simply not properly coordinating memory between the two environments. I've valgrinded it, I've looked at the core dumps, I haven't found an underlying cause yet, much less fixed the problem.
Which brings me to streaming. Serendipitously, our system seems to work considerably better if I don't use Java mappers and JNI, but rather using streaming to run a small native program that then coordinates the calls into the native library (further evidence that the problem lies somewhere in JNI's functionality). I am not sure how to arrange our input data for streaming however. It is binary of course, which has been a long-standing issue (I believe) with Hadoop streams since the assumption is that the data is 8-bit clean with endline record terminators. Furthermore, I'm not really sure how I would indicate which files are the input for a given job since I don't know how to write a driver program for streaming, I only know how to use the streaming jar directly from the command line (so there is no driver of my choosing, I can't control the driver at all, its the "streaming" driver and all I can do is give it my mapper and reducer native programs to run. You see my problem? With streaming I have no driver, just the mappers and reducers, but with a Java Hadoop program, I have all three and I can do useful preprocessing in the driver, like setting up the input paths to the subset of images comprising the input dataset for a given job. This is quite perplexing for me. How do I use streams effectively if I can't write my own driver?
Anyway, what I've come up with is to create a text file that lists the input files relevant to a job. That one text file is the streaming input, each line of which is an input record. Each line of the file is a properly formatted streaming key/val pair (meaning a TAB separates the key and the value). Interestingly, I don't need the key/value pairs per se, I only need the input filenames. So basically I don't use the key very much and store the input files names in the value (meaning right after the TAB on each line of the input file). My native program obviously receives a set of these lines from Hadoop and opens each file and operates on it. To do this, the file must be accessible the the native code, and the only way I know to do that is for the file to be in the distributed cache (which is aliased to the current working directory of the native program), thus my previously described method of putting symbolic links to all the input files in the distributed cache using the -cacheFile flag. That way the native program can open them when it finds their names in the streaming input records.
So that's what I've got going on. If you have a better design in mind, I would love to hear about it. I admit, this all feels rather contrived, but I simply don't know how else to do it. The documentation is rather thin on these topics so I'm kind of making it up as I go along. For example, I have not found a single code example anywhere demonstrating how to use the streaming API with a user-defined driver. These are examples of this for Java-only Hadoop programs so that made it easy for me to learn how to do that, but for streaming, no such luck