Convert cell computation to a Promise with cell flowQueue

  ~~~js
  import {flowQueue} from '@tomlarkworthy/flow-queue'
  ~~~

A flow queue releases values one-at-a-time onto a Dataflow graph, and collects a response before releasing the next. A flowQueue wraps Dataflow with a promise. It allows you to unroll a function body across dataflow cells, which is sometimes better for code layout and explanation.

The following video demonstrates its use during development of a webhook. Note a number of cells update as data passes through the system.

In other words, flowQueue provides dataflow programming with a functional interface. Consider the following

  ~~~js
  aysnc doWork(arg) {
    const r1 = await step1(arg)
    const r2 = await step2(r1);
    return r2;
  }
  ~~~

Using a flowQueue you can spread the asynchronous steps into different cells. To spread doWork across cells we first create a flowQueue, whose messages are doWork's args.

~~~js
viewof head = flowQueue()
~~~

The refactored version of doWork will forward its arg to the flowQueue and returns the promise. (note: viewof)

  ~~~js
  doWork = (arg) => viewof head.send(arg)
  ~~~

Now we unroll the body of doWork across several cells. Cell r1 calls function step1 and makes a dataflow dependency to head of the flowQueue. So when head updates, r1 will too.

~~~js
r1 = step1(head)
~~~

The next step r2 depends on the previous step.

~~~js
r2 = step2(r1)
~~~

To return a result, we call resolve to the flowQueue. This will resolve the send promise earlier, and allow the next to run. (note: viewof)

~~~js
{
  viewof head.resolve(r2)
}
~~~

Optimizations

The flowQueue will unblock immediately when resolve is passed a promise.

Errors

Every send should lead to a call to resolve. If you call resolve an extra time it will throw an Error. If resolve is not called within timeout_ms (1000ms) the promise will reject.

Changelog

2022-05-16 API: resolve changed to resolve, as it ends up looking like a promise anyway 2022-04-13 Bugfix: queue was not recovering after timeout properly.

const flowQueue = ({ name, timeout_ms = 1000 } = {}) => {
  let runningResolve = undefined;
  let runningReject = undefined;
  const q = [];

  const ui = htl.html`<code>flowQueue()</code>`;

  const run = () => {
    const [head, resolve, reject] = q.shift();
    let timer;
    runningResolve = (val) => {
      clearTimeout(timer);
      return resolve(val);
    };
    runningReject = (err) => {
      clearTimeout(timer);
      return reject(err);
    };

    timer = setTimeout(
      () =>
        ui.reject(
          new Error(`Timeout (maybe increase timeout_ms?) ${name || ""}`)
        ),
      timeout_ms
    );

    ui.value = head;
    ui.dispatchEvent(new Event("input", { bubbles: true }));
  };

  ui.send = (task) =>
    new Promise((resolve, reject) => {
      q.push([task, resolve, reject]);
      if (runningResolve === undefined) run();
    });

  ui.reject = async (err) => {
    if (!runningReject) throw new Error(`No task executing! ${name || ""}`);
    const resolve = runningResolve;
    const reject = runningReject;
    runningResolve = undefined;
    runningReject = undefined;
    reject(err);
    if (q.length > 0) run();
  };

  ui.resolve = async (value) => {
    if (!runningResolve) throw new Error(`No task executing! ${name || ""}`);
    const resolve = runningResolve;
    const reject = runningReject;
    runningResolve = undefined;
    runningReject = undefined;
    if (q.length > 0) run();
    try {
      value = await value;
      resolve(value);
    } catch (err) {
      reject(err);
    }
  };

  ui.respond = ui.resolve; // old name

  return ui;
}

Uses

const sqrt = view(flowQueue())
sqrt.resolve(Math.sqrt(sqrt))
const testing = (async () => {
  flowQueue; // load after implementation
  const [{ Runtime }, { default: define }] = await Promise.all([
    import(
      "https://cdn.jsdelivr.net/npm/@observablehq/runtime@4/dist/runtime.js"
    ),
    import(`https://api.observablehq.com/@tomlarkworthy/testing.js?v=3`)
  ]);
  const module = new Runtime().module(define);
  return Object.fromEntries(
    await Promise.all(
      ["expect", "createSuite"].map((n) => module.value(n).then((v) => [n, v]))
    )
  );
})();
display(testing)
const suite = view(testing.createSuite({
  name: "Unit Tests"
}))
suite.test("resolve after send resolves", async () => {
  const q = flowQueue();
  const prom = q.send("send val");
  testing.expect(q.value).toBe("send val");
  q.resolve("resolve val");
  const response = await prom;
  testing.expect(response).toBe("resolve val");
});
const maybeReply = view(flowQueue({ timeout_ms: 100 }))
const maybeReplyReplier = () => {
  if (maybeReply === "reply") maybeReply.resolve("reply");
}