Suffering. Implemented an initial version of a proper signal runtime.

This commit is contained in:
Yura Dupyn 2026-02-17 22:51:26 +01:00
parent b9332ad565
commit 6cca0d17a1
4 changed files with 507 additions and 2 deletions

View file

@ -0,0 +1,79 @@
import { SignalRuntime } from "./signalValue";
type T = SignalRuntime.DAGNode;
export class PriorityQueue {
private heap: T[] = [];
private inQueue = new Set<T>;
isEmpty(): boolean {
return this.heap.length === 0;
}
get size(): number {
return this.heap.length;
}
// Pushes a node into the queue if it's not already present.
push(node: T): void {
if (this.inQueue.has(node)) return;
this.inQueue.add(node);
this.heap.push(node);
this.bubbleUp(this.heap.length - 1);
}
// Removes and returns the node with the lowest rank.
pop(): T | undefined {
if (this.heap.length === 0) return undefined;
const top = this.heap[0];
const last = this.heap.pop()!;
if (this.heap.length > 0) {
this.heap[0] = last;
this.bubbleDown(0);
}
this.inQueue.delete(top);
return top;
}
private swap(i: number, j: number): void {
const temp = this.heap[i];
this.heap[i] = this.heap[j];
this.heap[j] = temp;
}
private bubbleUp(index: number): void {
while (index > 0) {
const parentIndex = (index - 1) >> 1; // Faster Math.floor((i-1)/2)
if (this.heap[index].rank >= this.heap[parentIndex].rank) break;
this.swap(index, parentIndex);
index = parentIndex;
}
}
private bubbleDown(index: number): void {
const length = this.heap.length;
while (true) {
let smallest = index;
const left = (index << 1) + 1; // 2 * i + 1
const right = (index << 1) + 2; // 2 * i + 2
if (left < length && this.heap[left].rank < this.heap[smallest].rank) {
smallest = left;
}
if (right < length && this.heap[right].rank < this.heap[smallest].rank) {
smallest = right;
}
if (smallest === index) break;
this.swap(index, smallest);
index = smallest;
}
}
}

View file

@ -44,7 +44,7 @@ export function eval_signal_expression(program: Program, env: Env, e: SignalExpr
case "read":
return Program.get_or_create_signal(program, e.name);
case "const":
const val = eval_expr(program, Env.nil(), e.arg);
const val = eval_expr(program, env, e.arg);
return signal(val);
case "let":
// TODO: To change this, first look at how `tupleThen` works. It's a simpler more isolated case. Easier to get right.

View file

@ -0,0 +1,421 @@
import { Expr, ProductPattern, SignalExpr, SignalName } from "../expr";
import { Program } from "../program";
import { Result, RuntimeError, ThrownRuntimeError } from "./error";
import { eval_expr } from "./evaluator";
import { match_product_pattern } from "./pattern_match";
import { Env, Value } from "./value";
import { PriorityQueue } from "./priority_queue"
// === Reactive DAG ===
export type SignalRuntime = {
// Named signals (there are also nameless ones - and the distinction matters for garbage collection)
store: Map<SignalName, SignalRuntime.DAGNode>,
};
export type ExternalObserver = (state: Value) => void;
export type UnsubscribeCapability = () => void;
export namespace SignalRuntime {
// TODO: This is a terrible name. Looking for a new name... Just don't call it `Node`, which clashes with the builtin ts `Node` type all the time.
export type DAGNode = {
signalName?: SignalName,
signal: SignalValue,
rank: number,
// ===inputs===
internalInputs: DAGNode[],
// ===outputs===
internalOutputs: DAGNode[],
externalOutputs: ExternalObserver[],
currentValue: Value,
}
export function make(): SignalRuntime {
return {
store: new Map(),
};
}
}
function internalSubscribe(node: SignalRuntime.DAGNode, observer: SignalRuntime.DAGNode) {
node.internalOutputs.push(observer);
observer.internalInputs.push(node);
}
export function externalSubscribe(node: SignalRuntime.DAGNode, observer: ExternalObserver): UnsubscribeCapability {
node.externalOutputs.push(observer);
return () => {
node.externalOutputs = node.externalOutputs.filter(sub => sub !== observer);
};
}
export function getNode(program: Program, signalName: SignalName): SignalRuntime.DAGNode {
const maybeNode = program.signal_runtimeNew.store.get(signalName);
if (maybeNode === undefined) {
// TODO: Make this into a proper error
throw Error(`Signal '${signalName}' not found!`);
} else {
return maybeNode;
}
}
// TODO: This feels wrong. We shouldn't allow setting of arbitrary signals... only special signals that are "cells",
// and are meant to be modified by the user...
export function setSignal(program: Program, name: SignalName, value: Value) {
const node = getNode(program, name);
node.currentValue = value;
propagate(program, node);
}
export function spawnSignal(program: Program, signalName: SignalName, expr: SignalExpr): SignalRuntime.DAGNode {
const maybeNode = program.signal_runtimeNew.store.get(signalName);
if (maybeNode !== undefined) {
// TODO: Make this into a proper error
throw Error(`Attempt to spawn a signal '${signalName}' that already exists`);
}
const node = eval_signal_expression(program, Env.nil(), expr);
node.signalName = signalName;
program.signal_runtimeNew.store.set(signalName, node);
return node;
}
// TODO: Should take in a `SignalName`, and find `oldNode` first
export function hotSwapSignal(program: Program, oldNode: SignalRuntime.DAGNode, expr: SignalExpr) {
reeval_signal_expression(program, Env.nil(), oldNode, expr)
}
function disconnectInputs(program: Program, node: SignalRuntime.DAGNode) {
for (const parent of node.internalInputs) {
parent.internalOutputs = parent.internalOutputs.filter(sub => sub !== node);
if (parent.signalName === undefined && parent.internalOutputs.length === 0) {
// Note that we're not checking for `parent.externalOutputs.length === 0`, since nameless modes can't be subscribed to externally.
disconnectInputs(program, parent);
}
}
node.internalInputs = [];
}
// This is used during rollback when a cycle is detected.
function disconnectTrialNodes(program: Program, nodes: SignalRuntime.DAGNode[]) {
for (const node of nodes) {
if (node.signalName === undefined && node.internalOutputs.length === 0) {
// Note that we're not checking for `parent.externalOutputs.length === 0`, since nameless modes can't be subscribed to externally.
disconnectInputs(program, node);
}
}
}
// Does old-rank based priority-queue traversal to update the ranks of all the descendants of a node.
// IMPORTANT: Assumes that the `node` already has the new rank - otherwise will corrupt the DAG.
function refreshRank(node: SignalRuntime.DAGNode) {
const heap = new PriorityQueue();
for (const child of node.internalOutputs) {
heap.push(child);
}
while (!heap.isEmpty()) {
const child = heap.pop()!;
let maxParentRank = 0;
for (const childParent of child.internalInputs) {
maxParentRank = Math.max(maxParentRank, childParent.rank);
}
const expectedRank = maxParentRank + 1;
if (child.rank < expectedRank) {
child.rank = expectedRank;
for (const grandchild of child.internalOutputs) {
heap.push(grandchild);
}
}
}
}
function wouldCreateCycle(node: SignalRuntime.DAGNode, newParents: SignalRuntime.DAGNode[]): boolean {
const descendants = new Set<SignalRuntime.DAGNode>();
// ===Compute Descendants===
function addDescendants(node: SignalRuntime.DAGNode) {
if (descendants.has(node)) {
return
}
descendants.add(node);
for (const child of node.internalOutputs) {
addDescendants(child)
}
}
addDescendants(node);
// ===Check if new parents are reachable (thus creating a cycle)
for (const parent of newParents) {
if (descendants.has(parent)) {
return true;
}
}
return false;
}
// === Value ===
type SignalValue =
| { tag: "closure", closure: SignalClosure }
| { tag: "const", value: Value } // Is this really necessary? Yes... its not a special case of SignalClosure - it has a value as a body, not an expression...
| { tag: "proxy", parent: SignalRuntime.DAGNode }
type SignalClosure = {
bindings: SignalBinding[],
env: Env,
body: Expr
}
type SignalBinding = {
pattern: ProductPattern,
// TODO: remove?
// signal: SignalValue, // TODO: Maybe this needs to be a Node in a DAG... I bet it will then infect the `eval` to also return a Node
node: SignalRuntime.DAGNode,
}
// === Evaluation ===
// You may think that you don't need `env, because branches in a `let-signal` expression are independent.
// But in the future we could allow "usual" let-expressions on top, so `env` is useful! e.g.
// ```
// let {
// x = e0,
// y = e1,
// . let-signal { // note that this is a signal-expression, nor a regular expression!
// u = s0,
// v = s1,
// . body-expr
// }
// }
// ```
export function eval_signal_expression(program: Program, env: Env, e: SignalExpr): SignalRuntime.DAGNode {
switch (e.tag) {
case "read": {
// Note that we're not returning a proxy here - it would be pointless. And would create lot's of inefficient indirections.
return getNode(program, e.name);
}
case "const":
// TODO: This may throw an exception!
const val = eval_expr(program, env, e.arg);
const node: SignalRuntime.DAGNode = {
signal: SignalValue.const_(val),
rank: 0,
internalInputs: [],
internalOutputs: [],
externalOutputs: [],
currentValue: val,
};
return node;
case "let":
let bindings: SignalBinding[] = [];
const closure: SignalClosure = { bindings, body: e.body, env };
const letNode: SignalRuntime.DAGNode = {
signal: SignalValue.closure(closure),
rank: 0, // (1) Rank set later
internalInputs: [], // (2) internal inputs set later
internalOutputs: [],
externalOutputs: [],
currentValue: null as any, // (3) Current value set later
};
let maxParentRank = 0;
for (const { pattern, expr } of e.bindings) {
// This will either get a reference to an existing signal or create a bunch of new signals
// Note that we're using `env` here and not `cur_env`. That's intentional.
// TODO: This may throw an exception!
const parentNode = eval_signal_expression(program, env, expr);
internalSubscribe(parentNode, letNode); // (2) internal inputs set
maxParentRank = Math.max(maxParentRank, parentNode.rank);
bindings.push({ pattern, node: parentNode });
}
letNode.rank = 1 + maxParentRank; // (1) Rank set
// === Initialization ===
// TODO: This may throw an exception!
const initValue = eval_signal_closure(program, closure);
letNode.currentValue = initValue; // (3) Current value set
return letNode;
}
}
export function reeval_signal_expression(program: Program, env: Env, node: SignalRuntime.DAGNode, e: SignalExpr) {
switch (e.tag) {
case "read": {
const parentNode = getNode(program, e.name);
const newNodeRank = 1 + parentNode.rank;
if (wouldCreateCycle(node, [parentNode])) {
// TODO: You need to throw a proper error value here
throw new Error("Cycle detected while hot-swapping signal");
} else {
// ===We commit to re-eval===
disconnectInputs(program, node);
node.signal = SignalValue.proxy(parentNode);
node.rank = newNodeRank; // (1)
internalSubscribe(parentNode, node);
refreshRank(node); // `node` has to have the new rank for this to work - which is ensured by (1).
// === re-initialization ===
node.currentValue = parentNode.currentValue;
propagate(program, node);
break;
}
}
case "const":
// TODO: This may throw an exception!
const val = eval_expr(program, env, e.arg);
const signal = SignalValue.const_(val);
disconnectInputs(program, node);
node.signal = signal;
node.rank = 0;
node.currentValue = val;
// We don't have to rerank, since this has no inputs and its rank is 0.
propagate(program, node);
break;
case "let":
// ===Can we re-eval succesfully?===
let bindings: SignalBinding[] = [];
let maxParentRank = 0;
const parents: SignalRuntime.DAGNode[] = [];
for (const { pattern, expr } of e.bindings) {
// This will either get a reference to an existing signal or create a bunch of new signals
// Note that we're using `env` here and not `cur_env`. That's intentional.
// TODO: This may throw an exception!
const parentNode = eval_signal_expression(program, env, expr);
parents.push(parentNode);
maxParentRank = Math.max(maxParentRank, parentNode.rank);
bindings.push({ pattern, node: parentNode });
}
const newLetNodeRank = 1 + maxParentRank;
const letNode = node;
// Obligations after finding out we can succesfully re-eval:
// (0) to detach from old parents
// (1) to set rank
// (2) to set internal inputs
// (3) to set current value
if (wouldCreateCycle(letNode, parents)) {
// disconnect would be parents from the graph (if they are newly created)
disconnectTrialNodes(program, parents);
// TODO: You need to throw a proper error value here
throw new Error("Cycle detected while hot-swapping signal");
} else {
// ===We commit to re-eval===
disconnectInputs(program, letNode); // (0) detached from old parents
const closure: SignalClosure = { bindings, body: e.body, env };
letNode.signal = SignalValue.closure(closure);
letNode.rank = newLetNodeRank; // (1) Rank set
for (const parentNode of parents) {
internalSubscribe(parentNode, letNode); // (2) internal inputs set
}
refreshRank(letNode); // `letNode` has to have the new rank for this to work - which is ensured by (1).
// === re-initialization ===
// TODO: This may throw an exception!
const newValue = eval_signal_closure(program, closure);
letNode.currentValue = newValue; // (3) Current value set
propagate(program, letNode);
}
break;
}
}
function eval_signal_closure(program: Program, closure: SignalClosure): Value {
let cur_env = closure.env;
for (const { pattern, node } of closure.bindings) {
const value = node.currentValue;
const res = match_product_pattern(pattern, value);
if (res.tag === "failure") {
// TODO: idk what to do here...
// TODO: Do you actually need to cleanup the old signals? What should be done here?
// TODO: We shouldn't throw here... the individual let-signal-branches should ve independent of each other.
// not sure...
throw ThrownRuntimeError.error({
tag: "UnableToFindMatchingPattern",
value,
});
} else {
cur_env = Env.push_frame(cur_env, res.frame);
}
}
// TODO: This may throw an exception!
const value = eval_expr(program, cur_env, closure.body);
return value;
}
export namespace SignalValue {
export const const_ = (value: Value): SignalValue => ({ tag: "const", value });
export const closure = (closure: SignalClosure): SignalValue => ({ tag: "closure", closure });
export const proxy = (parent: SignalRuntime.DAGNode): SignalValue => ({ tag: "proxy", parent });
}
// Calling convention: This assumes that the root-node's current-value has already been set - but its external-outputs haven't yet been notified.
function propagate(program: Program, rootNode: SignalRuntime.DAGNode) {
// Note that `rootNode` is not included in the `heap`.
const heap = new PriorityQueue();
// But `rootNode` is queued into `externalEffectNodes`, since we're doing effects after internal propagation.
// We push the descendants of `rootNode` in a priority-queue order.
const descendantsWithExternalEffects: SignalRuntime.DAGNode[] = [rootNode];
// Add children
for (const childNode of rootNode.internalOutputs) {
heap.push(childNode);
}
// === Internal Outputs ===
while (!heap.isEmpty()) {
const node = heap.pop()!;
switch (node.signal.tag) {
case "const":
break;
case "proxy": {
// TODO: new PROXY code: Is this correct?
const value = node.internalInputs[0].currentValue;
node.currentValue = value;
descendantsWithExternalEffects.push(node);
for (const childNode of node.internalOutputs) {
heap.push(childNode);
}
break;
}
case "closure": {
// TODO: This may throw an exception!
const value = eval_signal_closure(program, node.signal.closure);
node.currentValue = value;
descendantsWithExternalEffects.push(node);
for (const childNode of node.internalOutputs) {
heap.push(childNode);
}
break;
}
}
}
// === External Outputs ===
for (const node of descendantsWithExternalEffects) {
for (const observer of node.externalOutputs) {
observer(node.currentValue);
}
}
}

View file

@ -4,6 +4,7 @@ import { Env, Value } from "./eval/value";
import { Expr, FunctionName, SignalName, ProductPattern, SignalExpr } from "./expr";
import { installPrimitives } from "./primitive";
import { eval_expr } from "./eval/evaluator";
import { SignalRuntime as SignalRuntimeNew} from "./eval/signalValue"
export type Timestamp = number;
@ -14,6 +15,9 @@ export type Program = {
signal_definitions: Map<SignalName, SignalDefinition>,
signal_definition_order: SignalName[],
signal_runtimeNew: SignalRuntimeNew,
// TODO: Get rid of the old Runtime
signal_runtime: SignalRuntime,
};
@ -125,6 +129,7 @@ export namespace Program {
function_definitions: new Map(),
function_definition_order: [],
signal_runtimeNew: SignalRuntimeNew.make(),
signal_runtime: SignalRuntime.make(),
signal_definitions: new Map(),
signal_definition_order: [],
@ -377,7 +382,7 @@ export namespace Program {
}
// === Signals ===
export function getSignal(program: Program, name: FunctionName): Result<SignalDefinition> {
export function getSignal(program: Program, name: SignalName): Result<SignalDefinition> {
const sigDef = program.signal_definitions.get(name);
if (sigDef === undefined) {
return Result.error({ tag: "SignalNotFound", name });