Skip to content

Commit

Permalink
feat: experimental operation batching (unjs#240)
Browse files Browse the repository at this point in the history
Co-authored-by: Pooya Parsa <[email protected]>
  • Loading branch information
Hebilicious and pi0 authored Jul 4, 2023
1 parent 8353c6d commit cb8577b
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 3 deletions.
16 changes: 16 additions & 0 deletions docs/content/2.usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ Gets the value of a key in storage. Resolves to either a javascript primitive va
await storage.getItem("foo:bar");
```

### `getItems(items, opts)`

(Experimental) Gets the value of a multiple keys in storage in parallel.

Each item in array can be either a string or an object with`{ key, options? }` format.

Returned value is a Promise resolving to an array of objects with `{ key, value }` format.

### `getItemRaw(key, opts?)`

**Note:** This is an experimental feature. Please check [unjs/unstorage#142](https://github.com/unjs/unstorage/issues/142) for more information.
Expand All @@ -77,6 +85,14 @@ If value is `undefined`, it is same as calling `removeItem(key)`.
await storage.setItem("foo:bar", "baz");
```

### `setItems(items, opts)`

(Experimental) Add/Update items in parallel to the storage.

Each item in `items` array should be in `{ key, value, options? }` format.

Returned value is a Promise resolving to an array of objects with `{ key, value }` format.

### `setItemRaw(key, value, opts?)`

**Note:** This is an experimental feature. Please check [unjs/unstorage#142](https://github.com/unjs/unstorage/issues/142) for more information.
Expand Down
2 changes: 1 addition & 1 deletion src/drivers/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export default defineDriver((opts: HTTPOptions) => {
await _fetch(r(key), {
method: "PUT",
body: value,
headers: { ...opts.headers, ...topts.headers },
headers: { ...opts.headers, ...topts?.headers },
});
},
async setItemRaw(key, value, topts) {
Expand Down
117 changes: 116 additions & 1 deletion src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import type {
Unwatch,
StorageValue,
WatchEvent,
TransactionOptions,
} from "./types";
import memory from "./drivers/memory";
import { asyncCall, deserializeRaw, serializeRaw, stringify } from "./_utils";
import { normalizeKey, normalizeBaseKey } from "./utils";
import { normalizeKey, normalizeBaseKey, joinKeys } from "./utils";

interface StorageCTX {
mounts: Record<string, Driver>;
Expand Down Expand Up @@ -103,6 +104,61 @@ export function createStorage<T extends StorageValue>(
context.watching = false;
};

type BatchItem = {
driver: Driver;
base: string;
items: {
key: string;
relativeKey: string;
value?: StorageValue;
options?: TransactionOptions;
}[];
};

const runBatch = (
items: (
| string
| { key: string; value?: StorageValue; options?: TransactionOptions }
)[],
commonOptions: undefined | TransactionOptions,
cb: (batch: BatchItem) => Promise<any>
) => {
const batches = new Map<string /* mount base */, BatchItem>();
const getBatch = (mount: ReturnType<typeof getMount>) => {
let batch = batches.get(mount.base);
if (!batch) {
batch = {
driver: mount.driver,
base: mount.base,
items: [],
};
batches.set(mount.base, batch);
}
return batch;
};

for (const item of items) {
const isStringItem = typeof item === "string";
const key = normalizeKey(isStringItem ? item : item.key);
const value = isStringItem ? undefined : item.value;
const options =
isStringItem || !item.options
? commonOptions
: { ...commonOptions, ...item.options };
const mount = getMount(key);
getBatch(mount).items.push({
key,
value,
relativeKey: mount.relativeKey,
options,
});
}

return Promise.all([...batches.values()].map((batch) => cb(batch))).then(
(r) => r.flat()
);
};

const storage: Storage = {
// Item
hasItem(key, opts = {}) {
Expand All @@ -117,6 +173,37 @@ export function createStorage<T extends StorageValue>(
destr(value)
);
},
getItems(items, commonOptions) {
return runBatch(items, commonOptions, (batch) => {
if (batch.driver.getItems) {
return asyncCall(
batch.driver.getItems,
batch.items.map((item) => ({
key: item.relativeKey,
options: item.options,
})),
commonOptions
).then((r) =>
r.map((item) => ({
key: joinKeys(batch.base, item.key),
value: destr(item.value),
}))
);
}
return Promise.all(
batch.items.map((item) => {
return asyncCall(
batch.driver.getItem,
item.relativeKey,
item.options
).then((value) => ({
key: item.key,
value: destr(value),
}));
})
);
});
},
getItemRaw(key, opts = {}) {
key = normalizeKey(key);
const { relativeKey, driver } = getMount(key);
Expand All @@ -141,6 +228,34 @@ export function createStorage<T extends StorageValue>(
onChange("update", key);
}
},
async setItems(items, commonOptions) {
await runBatch(items, commonOptions, async (batch) => {
if (batch.driver.setItems) {
await asyncCall(
batch.driver.setItems,
batch.items.map((item) => ({
key: item.relativeKey,
value: stringify(item.value),
options: item.options,
})),
commonOptions
);
}
if (!batch.driver.setItem) {
return;
}
await Promise.all(
batch.items.map((item) => {
return asyncCall(
batch.driver.setItem!,
item.relativeKey,
stringify(item.value),
item.options
);
})
);
});
},
async setItemRaw(key, value, opts = {}) {
if (value === undefined) {
return storage.removeItem(key, opts);
Expand Down
22 changes: 21 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface StorageMeta {
[key: string]: StorageValue | Date | undefined;
}

type TransactionOptions = Record<string, any>;
export type TransactionOptions = Record<string, any>;

export interface Driver {
name?: string;
Expand All @@ -25,13 +25,23 @@ export interface Driver {
opts?: TransactionOptions
) => MaybePromise<StorageValue>;
/** @experimental */
getItems?: (
items: { key: string; options?: TransactionOptions }[],
commonOptions?: TransactionOptions
) => MaybePromise<{ key: string; value: StorageValue }[]>;
/** @experimental */
getItemRaw?: (key: string, opts: TransactionOptions) => MaybePromise<unknown>;
setItem?: (
key: string,
value: string,
opts: TransactionOptions
) => MaybePromise<void>;
/** @experimental */
setItems?: (
items: { key: string; value: string; options?: TransactionOptions }[],
commonOptions?: TransactionOptions
) => MaybePromise<void>;
/** @experimental */
setItemRaw?: (
key: string,
value: any,
Expand All @@ -55,6 +65,11 @@ export interface Storage<T extends StorageValue = StorageValue> {
key: string,
opts?: TransactionOptions
) => Promise<U | null>;
/** @experimental */
getItems: (
items: (string | { key: string; options?: TransactionOptions })[],
commonOptions?: TransactionOptions
) => Promise<{ key: string; value: StorageValue }[]>;
/** @experimental See https://github.com/unjs/unstorage/issues/142 */
getItemRaw: <T = any>(
key: string,
Expand All @@ -65,6 +80,11 @@ export interface Storage<T extends StorageValue = StorageValue> {
value: U,
opts?: TransactionOptions
) => Promise<void>;
/** @experimental */
setItems: (
items: { key: string; value: string; options?: TransactionOptions }[],
commonOptions?: TransactionOptions
) => Promise<void>;
/** @experimental See https://github.com/unjs/unstorage/issues/142 */
setItemRaw: <T = any>(
key: string,
Expand Down
6 changes: 6 additions & 0 deletions test/drivers/redis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ describe("drivers: redis", () => {
"test:data:serialized1.json",
"test:data:serialized2.json",
"test:data:raw.bin",
"test:t:1",
"test:t:2",
"test:t:3",
"test:v1:a",
"test:v2:a",
"test:v3:a",
]
`);
await client.disconnect();
Expand Down
35 changes: 35 additions & 0 deletions test/drivers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,41 @@ export function testDriver(opts: TestOptions) {
);
});

// Bulk tests
it("setItems", async () => {
await ctx.storage.setItems([
{ key: "t:1", value: "test_data_t1" },
{ key: "t:2", value: "test_data_t2" },
{ key: "t:3", value: "test_data_t3" },
]);
expect(await ctx.storage.getItem("t:1")).toBe("test_data_t1");
expect(await ctx.storage.getItem("t:2")).toBe("test_data_t2");
expect(await ctx.storage.getItem("t:3")).toBe("test_data_t3");
});

it("getItems", async () => {
await ctx.storage.setItem("v1:a", "test_data_v1:a");
await ctx.storage.setItem("v2:a", "test_data_v2:a");
await ctx.storage.setItem("v3:a?q=1", "test_data_v3:a?q=1");

expect(
await ctx.storage.getItems([{ key: "v1:a" }, "v2:a", { key: "v3:a?q=1" }])
).toMatchObject([
{
key: "v1:a",
value: "test_data_v1:a",
},
{
key: "v2:a",
value: "test_data_v2:a",
},
{
key: "v3:a", // key should lose the querystring
value: "test_data_v3:a?q=1",
},
]);
});

// TODO: Refactor to move after cleanup
if (opts.additionalTests) {
opts.additionalTests(ctx);
Expand Down

0 comments on commit cb8577b

Please sign in to comment.