Convert cell computation to a Promise with cell flowQueue
~~~js
import {flowQueue} from '@tomlarkworthy/flow-queue'
~~~
A flow queue releases values one-at-a-time onto a Dataflow graph, and collects a response before releasing the next. A flowQueue wraps Dataflow with a promise. It allows you to unroll a function body across dataflow cells, which is sometimes better for code layout and explanation.
The following video demonstrates its use during development of a webhook. Note a number of cells update as data passes through the system.
In other words, flowQueue provides dataflow programming with a functional interface. Consider the following
~~~js
aysnc doWork(arg) {
const r1 = await step1(arg)
const r2 = await step2(r1);
return r2;
}
~~~
Using a flowQueue you can spread the asynchronous steps into different cells. To spread doWork across cells we first create a flowQueue, whose messages are doWork's args.
~~~js
viewof head = flowQueue()
~~~
The refactored version of doWork will forward its arg to the flowQueue and returns the promise. (note: viewof)
~~~js
doWork = (arg) => viewof head.send(arg)
~~~
Now we unroll the body of doWork across several cells. Cell r1 calls function step1 and makes a dataflow dependency to head of the flowQueue. So when head updates, r1 will too.
~~~js
r1 = step1(head)
~~~
The next step r2 depends on the previous step.
~~~js
r2 = step2(r1)
~~~
To return a result, we call resolve to the flowQueue. This will resolve the send promise earlier, and allow the next to run. (note: viewof)
~~~js
{
viewof head.resolve(r2)
}
~~~
Optimizations
The flowQueue will unblock immediately when resolve is passed a promise.
Errors
Every send should lead to a call to resolve. If you call resolve an extra time it will throw an Error. If resolve is not called within timeout_ms (1000ms) the promise will reject.
Changelog
2022-05-16 API: resolve changed to resolve, as it ends up looking like a promise anyway 2022-04-13 Bugfix: queue was not recovering after timeout properly.
const flowQueue = ({ name, timeout_ms = 1000 } = {}) => {
let runningResolve = undefined;
let runningReject = undefined;
const q = [];
const ui = htl.html`<code>flowQueue()</code>`;
const run = () => {
const [head, resolve, reject] = q.shift();
let timer;
runningResolve = (val) => {
clearTimeout(timer);
return resolve(val);
};
runningReject = (err) => {
clearTimeout(timer);
return reject(err);
};
timer = setTimeout(
() =>
ui.reject(
new Error(`Timeout (maybe increase timeout_ms?) ${name || ""}`)
),
timeout_ms
);
ui.value = head;
ui.dispatchEvent(new Event("input", { bubbles: true }));
};
ui.send = (task) =>
new Promise((resolve, reject) => {
q.push([task, resolve, reject]);
if (runningResolve === undefined) run();
});
ui.reject = async (err) => {
if (!runningReject) throw new Error(`No task executing! ${name || ""}`);
const resolve = runningResolve;
const reject = runningReject;
runningResolve = undefined;
runningReject = undefined;
reject(err);
if (q.length > 0) run();
};
ui.resolve = async (value) => {
if (!runningResolve) throw new Error(`No task executing! ${name || ""}`);
const resolve = runningResolve;
const reject = runningReject;
runningResolve = undefined;
runningReject = undefined;
if (q.length > 0) run();
try {
value = await value;
resolve(value);
} catch (err) {
reject(err);
}
};
ui.respond = ui.resolve; // old name
return ui;
}
Uses
- Functional adapter, for interfacing with functional interfaces.
- Testing, as you can write clear expected starting and ending criteria on a dataflow subgraph.
const sqrt = view(flowQueue())
sqrt.resolve(Math.sqrt(sqrt))
const testing = (async () => {
flowQueue; // load after implementation
const [{ Runtime }, { default: define }] = await Promise.all([
import(
"https://cdn.jsdelivr.net/npm/@observablehq/runtime@4/dist/runtime.js"
),
import(`https://api.observablehq.com/@tomlarkworthy/testing.js?v=3`)
]);
const module = new Runtime().module(define);
return Object.fromEntries(
await Promise.all(
["expect", "createSuite"].map((n) => module.value(n).then((v) => [n, v]))
)
);
})();
display(testing)
const suite = view(testing.createSuite({
name: "Unit Tests"
}))
suite.test("resolve after send resolves", async () => {
const q = flowQueue();
const prom = q.send("send val");
testing.expect(q.value).toBe("send val");
q.resolve("resolve val");
const response = await prom;
testing.expect(response).toBe("resolve val");
});
const maybeReply = view(flowQueue({ timeout_ms: 100 }))
const maybeReplyReplier = () => {
if (maybeReply === "reply") maybeReply.resolve("reply");
}