Skip to content

Commit

Permalink
feat(rstream-query): add query spec types, addQueryFromSpec(), dedupe…
Browse files Browse the repository at this point in the history
… xforms
  • Loading branch information
postspectacular committed Apr 26, 2018
1 parent 2ac8bff commit d093a5c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 18 deletions.
31 changes: 29 additions & 2 deletions packages/rstream-query/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { IObjectOf } from "@thi.ng/api/api";
import { ISubscribable } from "@thi.ng/rstream/api";

export let DEBUG = false;

export type Pattern = [any, any, any];

export type PathPattern = [any, any[], any];
Expand All @@ -11,13 +13,38 @@ export type Triples = Set<Pattern>;

export type TripleIds = Set<number>;

export type Solutions = Set<IObjectOf<any>>;
export type Solution = IObjectOf<any>;

export type Solutions = Set<Solution>;

export type QuerySolution = ISubscribable<Solutions>;

export type BindFn = (s: Solution) => any;

export interface Edit {
index: Set<number>;
key: any;
}

export let DEBUG = false;
export interface QuerySpec {
q: SubQuerySpec[];
select?: string[];
order?: string;
bind?: IObjectOf<BindFn>;
limit?: number;
}

export type SubQuerySpec = WhereQuerySpec | PathQuerySpec;

export interface WhereQuerySpec {
where: Pattern[];
}

export interface PathQuerySpec {
path: PathPattern;
}

export interface JoinOpts {
limit: number;
select: string[];
}
99 changes: 83 additions & 16 deletions packages/rstream-query/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import { keySelector } from "@thi.ng/transducers/func/key-selector";
import { comp } from "@thi.ng/transducers/func/comp";
import { compR } from "@thi.ng/transducers/func/compr";
import { assocObj } from "@thi.ng/transducers/rfn/assoc-obj";
import { dedupe } from "@thi.ng/transducers/xform/dedupe";
import { map } from "@thi.ng/transducers/xform/map";
import { mapIndexed } from "@thi.ng/transducers/xform/map-indexed";

import { DEBUG, Edit, Triple, TripleIds, Pattern, Solutions, Triples, PathPattern, QuerySolution } from "./api";
import { DEBUG, Edit, Triple, TripleIds, Pattern, Solutions, Triples, PathPattern, QuerySolution, QuerySpec, SubQuerySpec, WhereQuerySpec, PathQuerySpec, BindFn } from "./api";
import { resolvePathPattern, patternVars } from "./pattern";
import { isQVar, qvarResolver } from "./qvar";

Expand Down Expand Up @@ -180,7 +181,7 @@ export class TripleStore implements
results = sync<TripleIds, TripleIds>({
id,
src: { s: qs, p: qp, o: qo },
xform: map(({ s, p, o }) => intersection(intersection(s, p), o)),
xform: comp(map(({ s, p, o }) => intersection(intersection(s, p), o)), dedupe(equiv)),
reset: true,
});
this.queries.set(key, <ISubscribable<TripleIds>>results);
Expand Down Expand Up @@ -236,10 +237,18 @@ export class TripleStore implements
}
return res;
}),
dedupe(equiv),
id
);
}

addParamQueries(patterns: Iterable<Pattern>) {
return iterator(
map<Pattern, QuerySolution>((q) => this.addParamQuery(q)),
patterns
);
}

/**
* Converts the given path pattern into a number of sub-queries and
* return a rstream subscription of re-joined result solutions. If
Expand All @@ -252,10 +261,7 @@ export class TripleStore implements
*/
addPathQuery(path: PathPattern, len = path[1].length, id?: string): QuerySolution {
return this.addMultiJoin(
iterator(
map<Pattern, QuerySolution>((q) => this.addParamQuery(q)),
resolvePathPattern(path, len)[0]
),
this.addParamQueries(resolvePathPattern(path, len)[0]),
patternVars(path),
id
);
Expand All @@ -274,25 +280,60 @@ export class TripleStore implements
return sync<Solutions, Solutions>({
id,
src: { a, b },
xform: map(({ a, b }) => join(a, b))
xform: comp(map(({ a, b }) => join(a, b)), dedupe(equiv)),
reset: false,
});
}

addMultiJoin(queries: Iterable<QuerySolution>, keepVars?: Iterable<string>, id?: string): QuerySolution {
const src = transduce(
mapIndexed<QuerySolution, [string, QuerySolution]>(
(i, q) => [String(i), q]
),
mapIndexed<QuerySolution, [string, QuerySolution]>((i, q) => [String(i), q]),
assocObj(),
queries
);
let xform = joinSolutions(Object.keys(src).length);
keepVars && (xform = comp(xform, filterSolutions(keepVars)));
return sync({ id, src, xform });
let xforms: Transducer<any, any>[] = [joinSolutions(Object.keys(src).length), dedupe(equiv)];
keepVars && (xforms.push(filterSolutions(keepVars)));
return sync({ id, src, xform: <Transducer<any, any>>comp.apply(null, xforms), reset: false });
}

/**
* Compiles given query spec into a number of sub-queries and result
* transformations. Returns rstream subscription of final result
* sets. See `QuerySpec` docs for further details.
*
* @param spec
*/
addQueryFromSpec(spec: QuerySpec): QuerySolution {
let query: QuerySolution;
let curr: QuerySolution;
for (let q of spec.q) {
if (isWhereQuery(q)) {
curr = this.addMultiJoin(this.addParamQueries(q.where));
query && (curr = this.addJoin(query, curr));
} else if (isPathQuery(q)) {
curr = this.addPathQuery(q.path);
query && (curr = this.addJoin(query, curr));
}
query = curr;
}
let xforms: Transducer<any, any>[] = [];
if (spec.limit) {
xforms.push(limitSolutions(spec.limit));
}
if (spec.bind) {
xforms.push(bindVars(spec.bind));
}
if (spec.select) {
xforms.push(filterSolutions(spec.select));
}
if (xforms.length) {
query = <ISubscribable<any>>query.subscribe(comp.apply(null, xforms));
}
return query;
}

toDot(opts?: Partial<DotOpts>) {
return toDot(walk([this.streamS, this.streamP, this.streamO, this.streamAll]), opts);
return toDot(walk([this.streamS, this.streamP, this.streamO, this.streamAll], opts), opts);
}

protected nextID() {
Expand Down Expand Up @@ -336,7 +377,6 @@ export class TripleStore implements
}
}


const submit = (index: Map<any, TripleIds>, stream: Subscription<Edit, TripleIds>, key: any) => {
if (key != null) {
const ids = index.get(key);
Expand Down Expand Up @@ -384,4 +424,31 @@ const filterSolutions = (qvars: Iterable<string>) => {
}
return res;
});
};
};

const limitSolutions = (n: number) =>
map((sol: Solutions) => {
if (sol.size <= n) {
return sol;
}
const res: Solutions = new Set();
let m = n;
for (let s of sol) {
res.add(s);
if (--m <= 0) break;
}
return res;
});

const bindVars = (bindings: IObjectOf<BindFn>) =>
map((sol: Solutions) => {
for (let s of sol) {
for (let b in bindings) {
s[b] = bindings[b](s);
}
}
return sol;
});

const isWhereQuery = (q: SubQuerySpec): q is WhereQuerySpec => !!(<any>q).where;
const isPathQuery = (q: SubQuerySpec): q is PathQuerySpec => !!(<any>q).path;

0 comments on commit d093a5c

Please sign in to comment.