Skip to content

Commit

Permalink
Follow up of the Hadoop merge
Browse files Browse the repository at this point in the history
Part of JanusGraph#1438

Signed-off-by: Jan Jansen <[email protected]>
  • Loading branch information
farodin91 committed Oct 7, 2019
1 parent cda9051 commit 8c1b8fc
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.hadoop.config.JanusGraphHadoopConfiguration;
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
import org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetupCommon;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
import org.apache.cassandra.hadoop.ConfigHelper;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.janusgraph.hadoop.formats.util.input.current.JanusGraphHadoopSetupImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,9 +97,9 @@ public void setConf(final Configuration config) {

private SliceRange getSliceRange(final int limit) {
final SliceRange sliceRange = new SliceRange();
sliceRange.setStart(JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY.getSliceStart().asByteBuffer());
sliceRange.setFinish(JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY.getSliceEnd().asByteBuffer());
sliceRange.setCount(Math.min(limit, JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY.getLimit()));
sliceRange.setStart(JanusGraphHadoopSetupImpl.DEFAULT_SLICE_QUERY.getSliceStart().asByteBuffer());
sliceRange.setFinish(JanusGraphHadoopSetupImpl.DEFAULT_SLICE_QUERY.getSliceEnd().asByteBuffer());
sliceRange.setCount(Math.min(limit, JanusGraphHadoopSetupImpl.DEFAULT_SLICE_QUERY.getLimit()));
return sliceRange;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.hadoop.config.JanusGraphHadoopConfiguration;
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
import org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetupCommon;
import org.janusgraph.hadoop.formats.util.input.current.JanusGraphHadoopSetupImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,9 +89,9 @@ public void setConf(final Configuration config) {

private SliceRange getSliceRange(final int limit) {
final SliceRange sliceRange = new SliceRange();
sliceRange.setStart(JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY.getSliceStart().asByteBuffer());
sliceRange.setFinish(JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY.getSliceEnd().asByteBuffer());
sliceRange.setCount(Math.min(limit, JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY.getLimit()));
sliceRange.setStart(JanusGraphHadoopSetupImpl.DEFAULT_SLICE_QUERY.getSliceStart().asByteBuffer());
sliceRange.setFinish(JanusGraphHadoopSetupImpl.DEFAULT_SLICE_QUERY.getSliceEnd().asByteBuffer());
sliceRange.setCount(Math.min(limit, JanusGraphHadoopSetupImpl.DEFAULT_SLICE_QUERY.getLimit()));
return sliceRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,30 @@
import com.google.common.base.Preconditions;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetup;
import org.janusgraph.util.system.ConfigurationUtil;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.janusgraph.hadoop.formats.util.input.current.JanusGraphHadoopSetupImpl;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;

import static org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetupCommon.SETUP_CLASS_NAME;
import static org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetupCommon.SETUP_PACKAGE_PREFIX;

public abstract class HadoopInputFormat extends InputFormat<NullWritable, VertexWritable> implements Configurable, GraphFilterAware {

private final InputFormat<StaticBuffer, Iterable<Entry>> inputFormat;
private static final RefCountedCloseable<JanusGraphVertexDeserializer> refCounter;

static {
refCounter = new RefCountedCloseable<>((conf) -> {
final String janusgraphVersion = "current";

String className = SETUP_PACKAGE_PREFIX + janusgraphVersion + SETUP_CLASS_NAME;

JanusGraphHadoopSetup ts = ConfigurationUtil.instantiate(
className, new Object[]{ conf }, new Class[]{ Configuration.class });

return new JanusGraphVertexDeserializer(ts);
});
refCounter = new RefCountedCloseable<>((conf) ->
new JanusGraphVertexDeserializer(new JanusGraphHadoopSetupImpl(conf)));
}



public HadoopInputFormat(InputFormat<StaticBuffer, Iterable<Entry>> inputFormat) {
this.inputFormat = inputFormat;
Preconditions.checkState(Configurable.class.isAssignableFrom(inputFormat.getClass()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.carrotsearch.hppc.cursors.LongObjectCursor;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.janusgraph.core.*;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.StaticBuffer;
Expand Down Expand Up @@ -56,6 +57,7 @@ public JanusGraphVertexDeserializer(final JanusGraphHadoopSetup setup) {
this.idManager = setup.getIDManager();
}


private static Boolean isLoopAdded(Vertex vertex, String label) {
Iterator<Vertex> adjacentVertices = vertex.vertices(Direction.BOTH, label);

Expand Down Expand Up @@ -93,7 +95,7 @@ public TinkerVertex readHadoopVertex(final StaticBuffer key, Iterable<Entry> ent

// Iterate over edgestore columns to find the vertex's label relation
for (final Entry data : entries) {
RelationReader relationReader = setup.getRelationReader(vertexId);
RelationReader relationReader = setup.getRelationReader();
final RelationCache relation = relationReader.parseRelation(data, false, typeManager);
if (systemTypes.isVertexLabelSystemType(relation.typeId)) {
// Found vertex Label
Expand All @@ -114,7 +116,7 @@ public TinkerVertex readHadoopVertex(final StaticBuffer key, Iterable<Entry> ent
// Iterate over and decode edgestore columns (relations) on this vertex
for (final Entry data : entries) {
try {
RelationReader relationReader = setup.getRelationReader(vertexId);
RelationReader relationReader = setup.getRelationReader();
final RelationCache relation = relationReader.parseRelation(data, false, typeManager);

if (systemTypes.isSystemType(relation.typeId)) continue; //Ignore system types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.janusgraph.hadoop.formats.util.input;

import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.graphdb.database.RelationReader;
import org.janusgraph.graphdb.idmanagement.IDManager;
import org.janusgraph.graphdb.types.TypeInspector;
Expand All @@ -28,19 +27,10 @@ public interface JanusGraphHadoopSetup {

SystemTypeInspector getSystemTypeInspector();

RelationReader getRelationReader(long vertexId);
RelationReader getRelationReader();

IDManager getIDManager();

/**
* Return an input slice across the entire row.
*
* TODO This would ideally slice only columns inside the row needed by the query.
* The slice must include the hidden vertex state property (to filter removed vertices).
*
*/
SliceQuery inputSlice();

void close();

boolean getFilterPartitionedVertices();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import com.google.common.base.Preconditions;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.configuration.BasicConfiguration;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.graphdb.database.RelationReader;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.idmanagement.IDManager;
Expand All @@ -32,15 +35,18 @@
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
import org.janusgraph.hadoop.config.ModifiableHadoopConfiguration;
import org.janusgraph.hadoop.config.JanusGraphHadoopConfiguration;
import org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetup;
import org.janusgraph.hadoop.formats.util.input.SystemTypeInspector;
import org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetupCommon;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.hadoop.conf.Configuration;

/**
* @author Matthias Broecheler ([email protected])
*/
public class JanusGraphHadoopSetupImpl extends JanusGraphHadoopSetupCommon {
public class JanusGraphHadoopSetupImpl implements JanusGraphHadoopSetup {

private static final StaticBuffer DEFAULT_COLUMN = StaticArrayBuffer.of(new byte[0]);
public static final SliceQuery DEFAULT_SLICE_QUERY = new SliceQuery(DEFAULT_COLUMN, DEFAULT_COLUMN);

private final ModifiableHadoopConfiguration scanConf;
private final StandardJanusGraph graph;
Expand Down Expand Up @@ -108,7 +114,7 @@ public IDManager getIDManager() {
}

@Override
public RelationReader getRelationReader(long vertexId) {
public RelationReader getRelationReader() {
return graph.getEdgeSerializer();
}

Expand Down

0 comments on commit 8c1b8fc

Please sign in to comment.