Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Drill, mail # dev - [27/53] [abbrv] DRILL-75 and DRILL-76


Copy link to this message
-
[27/53] [abbrv] DRILL-75 and DRILL-76
jacques@... 2013-07-20, 01:57
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01278ae5/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
new file mode 100644
index 0000000..42d1be5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -0,0 +1,423 @@
+package org.apache.drill.exec.store;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.Resources;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.SchemaDefProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.*;
+import org.apache.drill.exec.schema.*;
+import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static com.fasterxml.jackson.core.JsonToken.*;
+
+public class JSONRecordReader implements RecordReader {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+    private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+    public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+    private final String inputPath;
+
+    private final IntObjectOpenHashMap<VectorHolder> valueVectorMap;
+
+    private JsonParser parser;
+    private SchemaIdGenerator generator;
+    private DiffSchema diffSchema;
+    private RecordSchema currentSchema;
+    private List<Field> removedFields;
+    private OutputMutator outputMutator;
+    private BufferAllocator allocator;
+    private int batchSize;
+
+    public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
+        this.inputPath = inputPath;
+        this.allocator = fragmentContext.getAllocator();
+        this.batchSize = batchSize;
+        valueVectorMap = new IntObjectOpenHashMap<>();
+    }
+
+    public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
+        this(fragmentContext, inputPath, DEFAULT_LENGTH);
+    }
+
+    private JsonParser getParser() {
+        return parser;
+    }
+
+    @Override
+    public void setup(OutputMutator output) throws ExecutionSetupException {
+        outputMutator = output;
+        currentSchema = new ObjectSchema();
+        diffSchema = new DiffSchema();
+        removedFields = Lists.newArrayList();
+
+        try {
+            InputSupplier<InputStreamReader> input;
+            if (inputPath.startsWith("resource:")) {
+                input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
+            } else {
+                input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8);
+            }
+
+            JsonFactory factory = new JsonFactory();
+            parser = factory.createJsonParser(input.getInput());
+            parser.nextToken(); // Read to the first START_OBJECT token
+            generator = new SchemaIdGenerator();
+        } catch (IOException e) {
+            throw new ExecutionSetupException(e);
+        }
+    }
+
+    @Override
+    public int next() {
+        if (parser.isClosed() || !parser.hasCurrentToken()) {
+            return 0;
+        }
+
+        resetBatch();
+
+        int nextRowIndex = 0;
+
+        try {
+            while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) {
+                parser.nextToken(); // Read to START_OBJECT token
+
+                if (!parser.hasCurrentToken()) {
+                    parser.close();
+                    break;
+                }
+            }
+
+            parser.nextToken();
+
+            if (!parser.hasCurrentToken()) {
+                parser.close();
+            }
+
+            // Garbage collect fields never referenced in this batch
+            for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+                diffSchema.addRemovedField(field);
+                outputMutator.removeField(field.getFieldId());
+            }
+
+        } catch (IOException | SchemaChangeException e) {
+            logger.error("Error reading next in Json reader", e);
+        }
+        return nextRowIndex;
+    }
+
+    private void resetBatch() {
+        for (ObjectCursor<VectorHolder> holder : valueVectorMap.values()) {
+            holder.value.reset();
+        }
+
+        currentSchema.resetMarkedFields();
+        diffSchema.reset();
+        removedFields.clear();
+    }
+
+    @Override
+    public void cleanup() {
+        try {
+            parser.close();
+        } catch (IOException e) {
+            logger.warn("Error closing Json parser", e);
+        }
+    }
+
+    private SchemaIdGenerator getGenerator() {
+        return generator;
+    }
+
+    private RecordS