From 6cca0d17a18f8ea1aa72b51e02b7cd9bd01aa82f Mon Sep 17 00:00:00 2001 From: Yura Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Tue, 17 Feb 2026 22:51:26 +0100 Subject: [PATCH] Suffering. Implemented an initial version of a proper signal runtime. --- src/lang/eval/priority_queue.ts | 79 ++++++ src/lang/eval/signal.ts | 2 +- src/lang/eval/signalValue.ts | 421 ++++++++++++++++++++++++++++++++ src/lang/program.ts | 7 +- 4 files changed, 507 insertions(+), 2 deletions(-) create mode 100644 src/lang/eval/priority_queue.ts create mode 100644 src/lang/eval/signalValue.ts diff --git a/src/lang/eval/priority_queue.ts b/src/lang/eval/priority_queue.ts new file mode 100644 index 0000000..d6715a7 --- /dev/null +++ b/src/lang/eval/priority_queue.ts @@ -0,0 +1,79 @@ +import { SignalRuntime } from "./signalValue"; + +type T = SignalRuntime.DAGNode; + +export class PriorityQueue { + private heap: T[] = []; + private inQueue = new Set; + + isEmpty(): boolean { + return this.heap.length === 0; + } + + get size(): number { + return this.heap.length; + } + + // Pushes a node into the queue if it's not already present. + push(node: T): void { + if (this.inQueue.has(node)) return; + + this.inQueue.add(node); + this.heap.push(node); + this.bubbleUp(this.heap.length - 1); + } + + // Removes and returns the node with the lowest rank. + pop(): T | undefined { + if (this.heap.length === 0) return undefined; + + const top = this.heap[0]; + const last = this.heap.pop()!; + + if (this.heap.length > 0) { + this.heap[0] = last; + this.bubbleDown(0); + } + + this.inQueue.delete(top); + return top; + } + + private swap(i: number, j: number): void { + const temp = this.heap[i]; + this.heap[i] = this.heap[j]; + this.heap[j] = temp; + } + + private bubbleUp(index: number): void { + while (index > 0) { + const parentIndex = (index - 1) >> 1; // Faster Math.floor((i-1)/2) + if (this.heap[index].rank >= this.heap[parentIndex].rank) break; + + this.swap(index, parentIndex); + index = parentIndex; + } + } + + private bubbleDown(index: number): void { + const length = this.heap.length; + while (true) { + let smallest = index; + const left = (index << 1) + 1; // 2 * i + 1 + const right = (index << 1) + 2; // 2 * i + 2 + + if (left < length && this.heap[left].rank < this.heap[smallest].rank) { + smallest = left; + } + if (right < length && this.heap[right].rank < this.heap[smallest].rank) { + smallest = right; + } + + if (smallest === index) break; + + this.swap(index, smallest); + index = smallest; + } + } +} + diff --git a/src/lang/eval/signal.ts b/src/lang/eval/signal.ts index 578315c..22a6095 100644 --- a/src/lang/eval/signal.ts +++ b/src/lang/eval/signal.ts @@ -44,7 +44,7 @@ export function eval_signal_expression(program: Program, env: Env, e: SignalExpr case "read": return Program.get_or_create_signal(program, e.name); case "const": - const val = eval_expr(program, Env.nil(), e.arg); + const val = eval_expr(program, env, e.arg); return signal(val); case "let": // TODO: To change this, first look at how `tupleThen` works. It's a simpler more isolated case. Easier to get right. diff --git a/src/lang/eval/signalValue.ts b/src/lang/eval/signalValue.ts new file mode 100644 index 0000000..7588629 --- /dev/null +++ b/src/lang/eval/signalValue.ts @@ -0,0 +1,421 @@ +import { Expr, ProductPattern, SignalExpr, SignalName } from "../expr"; +import { Program } from "../program"; +import { Result, RuntimeError, ThrownRuntimeError } from "./error"; +import { eval_expr } from "./evaluator"; +import { match_product_pattern } from "./pattern_match"; +import { Env, Value } from "./value"; +import { PriorityQueue } from "./priority_queue" + +// === Reactive DAG === + +export type SignalRuntime = { + // Named signals (there are also nameless ones - and the distinction matters for garbage collection) + store: Map, +}; + +export type ExternalObserver = (state: Value) => void; +export type UnsubscribeCapability = () => void; + +export namespace SignalRuntime { + // TODO: This is a terrible name. Looking for a new name... Just don't call it `Node`, which clashes with the builtin ts `Node` type all the time. + export type DAGNode = { + signalName?: SignalName, + signal: SignalValue, + rank: number, + + // ===inputs=== + internalInputs: DAGNode[], + + // ===outputs=== + internalOutputs: DAGNode[], + externalOutputs: ExternalObserver[], + + currentValue: Value, + } + + export function make(): SignalRuntime { + return { + store: new Map(), + }; + } +} + +function internalSubscribe(node: SignalRuntime.DAGNode, observer: SignalRuntime.DAGNode) { + node.internalOutputs.push(observer); + observer.internalInputs.push(node); +} + +export function externalSubscribe(node: SignalRuntime.DAGNode, observer: ExternalObserver): UnsubscribeCapability { + node.externalOutputs.push(observer); + return () => { + node.externalOutputs = node.externalOutputs.filter(sub => sub !== observer); + }; +} + +export function getNode(program: Program, signalName: SignalName): SignalRuntime.DAGNode { + const maybeNode = program.signal_runtimeNew.store.get(signalName); + if (maybeNode === undefined) { + // TODO: Make this into a proper error + throw Error(`Signal '${signalName}' not found!`); + } else { + return maybeNode; + } +} + +// TODO: This feels wrong. We shouldn't allow setting of arbitrary signals... only special signals that are "cells", +// and are meant to be modified by the user... +export function setSignal(program: Program, name: SignalName, value: Value) { + const node = getNode(program, name); + node.currentValue = value; + propagate(program, node); +} + +export function spawnSignal(program: Program, signalName: SignalName, expr: SignalExpr): SignalRuntime.DAGNode { + const maybeNode = program.signal_runtimeNew.store.get(signalName); + if (maybeNode !== undefined) { + // TODO: Make this into a proper error + throw Error(`Attempt to spawn a signal '${signalName}' that already exists`); + } + const node = eval_signal_expression(program, Env.nil(), expr); + node.signalName = signalName; + program.signal_runtimeNew.store.set(signalName, node); + return node; +} + +// TODO: Should take in a `SignalName`, and find `oldNode` first +export function hotSwapSignal(program: Program, oldNode: SignalRuntime.DAGNode, expr: SignalExpr) { + reeval_signal_expression(program, Env.nil(), oldNode, expr) +} + +function disconnectInputs(program: Program, node: SignalRuntime.DAGNode) { + for (const parent of node.internalInputs) { + parent.internalOutputs = parent.internalOutputs.filter(sub => sub !== node); + if (parent.signalName === undefined && parent.internalOutputs.length === 0) { + // Note that we're not checking for `parent.externalOutputs.length === 0`, since nameless modes can't be subscribed to externally. + disconnectInputs(program, parent); + } + } + node.internalInputs = []; +} + +// This is used during rollback when a cycle is detected. +function disconnectTrialNodes(program: Program, nodes: SignalRuntime.DAGNode[]) { + for (const node of nodes) { + if (node.signalName === undefined && node.internalOutputs.length === 0) { + // Note that we're not checking for `parent.externalOutputs.length === 0`, since nameless modes can't be subscribed to externally. + disconnectInputs(program, node); + } + } +} + + +// Does old-rank based priority-queue traversal to update the ranks of all the descendants of a node. +// IMPORTANT: Assumes that the `node` already has the new rank - otherwise will corrupt the DAG. +function refreshRank(node: SignalRuntime.DAGNode) { + const heap = new PriorityQueue(); + + for (const child of node.internalOutputs) { + heap.push(child); + } + + while (!heap.isEmpty()) { + const child = heap.pop()!; + + let maxParentRank = 0; + for (const childParent of child.internalInputs) { + maxParentRank = Math.max(maxParentRank, childParent.rank); + } + + const expectedRank = maxParentRank + 1; + + if (child.rank < expectedRank) { + child.rank = expectedRank; + for (const grandchild of child.internalOutputs) { + heap.push(grandchild); + } + } + } +} + +function wouldCreateCycle(node: SignalRuntime.DAGNode, newParents: SignalRuntime.DAGNode[]): boolean { + const descendants = new Set(); + + // ===Compute Descendants=== + function addDescendants(node: SignalRuntime.DAGNode) { + if (descendants.has(node)) { + return + } + descendants.add(node); + for (const child of node.internalOutputs) { + addDescendants(child) + } + } + + addDescendants(node); + + // ===Check if new parents are reachable (thus creating a cycle) + for (const parent of newParents) { + if (descendants.has(parent)) { + return true; + } + } + return false; +} + +// === Value === + +type SignalValue = + | { tag: "closure", closure: SignalClosure } + | { tag: "const", value: Value } // Is this really necessary? Yes... its not a special case of SignalClosure - it has a value as a body, not an expression... + | { tag: "proxy", parent: SignalRuntime.DAGNode } + +type SignalClosure = { + bindings: SignalBinding[], + env: Env, + body: Expr +} + +type SignalBinding = { + pattern: ProductPattern, + // TODO: remove? + // signal: SignalValue, // TODO: Maybe this needs to be a Node in a DAG... I bet it will then infect the `eval` to also return a Node + node: SignalRuntime.DAGNode, +} + +// === Evaluation === + +// You may think that you don't need `env, because branches in a `let-signal` expression are independent. +// But in the future we could allow "usual" let-expressions on top, so `env` is useful! e.g. +// ``` +// let { +// x = e0, +// y = e1, +// . let-signal { // note that this is a signal-expression, nor a regular expression! +// u = s0, +// v = s1, +// . body-expr +// } +// } +// ``` +export function eval_signal_expression(program: Program, env: Env, e: SignalExpr): SignalRuntime.DAGNode { + switch (e.tag) { + case "read": { + // Note that we're not returning a proxy here - it would be pointless. And would create lot's of inefficient indirections. + return getNode(program, e.name); + } + case "const": + // TODO: This may throw an exception! + const val = eval_expr(program, env, e.arg); + const node: SignalRuntime.DAGNode = { + signal: SignalValue.const_(val), + rank: 0, + internalInputs: [], + internalOutputs: [], + externalOutputs: [], + currentValue: val, + }; + return node; + case "let": + let bindings: SignalBinding[] = []; + const closure: SignalClosure = { bindings, body: e.body, env }; + const letNode: SignalRuntime.DAGNode = { + signal: SignalValue.closure(closure), + rank: 0, // (1) Rank set later + + internalInputs: [], // (2) internal inputs set later + internalOutputs: [], + externalOutputs: [], + currentValue: null as any, // (3) Current value set later + }; + let maxParentRank = 0; + for (const { pattern, expr } of e.bindings) { + // This will either get a reference to an existing signal or create a bunch of new signals + // Note that we're using `env` here and not `cur_env`. That's intentional. + // TODO: This may throw an exception! + const parentNode = eval_signal_expression(program, env, expr); + + internalSubscribe(parentNode, letNode); // (2) internal inputs set + + maxParentRank = Math.max(maxParentRank, parentNode.rank); + bindings.push({ pattern, node: parentNode }); + } + letNode.rank = 1 + maxParentRank; // (1) Rank set + + // === Initialization === + // TODO: This may throw an exception! + const initValue = eval_signal_closure(program, closure); + letNode.currentValue = initValue; // (3) Current value set + + return letNode; + } +} + +export function reeval_signal_expression(program: Program, env: Env, node: SignalRuntime.DAGNode, e: SignalExpr) { + switch (e.tag) { + case "read": { + const parentNode = getNode(program, e.name); + const newNodeRank = 1 + parentNode.rank; + + if (wouldCreateCycle(node, [parentNode])) { + // TODO: You need to throw a proper error value here + throw new Error("Cycle detected while hot-swapping signal"); + } else { + // ===We commit to re-eval=== + disconnectInputs(program, node); + node.signal = SignalValue.proxy(parentNode); + node.rank = newNodeRank; // (1) + internalSubscribe(parentNode, node); + refreshRank(node); // `node` has to have the new rank for this to work - which is ensured by (1). + + // === re-initialization === + node.currentValue = parentNode.currentValue; + propagate(program, node); + break; + } + } + case "const": + // TODO: This may throw an exception! + const val = eval_expr(program, env, e.arg); + const signal = SignalValue.const_(val); + + disconnectInputs(program, node); + node.signal = signal; + node.rank = 0; + node.currentValue = val; + // We don't have to rerank, since this has no inputs and its rank is 0. + + propagate(program, node); + break; + case "let": + // ===Can we re-eval succesfully?=== + let bindings: SignalBinding[] = []; + + let maxParentRank = 0; + const parents: SignalRuntime.DAGNode[] = []; + for (const { pattern, expr } of e.bindings) { + // This will either get a reference to an existing signal or create a bunch of new signals + // Note that we're using `env` here and not `cur_env`. That's intentional. + // TODO: This may throw an exception! + const parentNode = eval_signal_expression(program, env, expr); + parents.push(parentNode); + + maxParentRank = Math.max(maxParentRank, parentNode.rank); + bindings.push({ pattern, node: parentNode }); + } + const newLetNodeRank = 1 + maxParentRank; + + const letNode = node; + // Obligations after finding out we can succesfully re-eval: + // (0) to detach from old parents + // (1) to set rank + // (2) to set internal inputs + // (3) to set current value + + if (wouldCreateCycle(letNode, parents)) { + // disconnect would be parents from the graph (if they are newly created) + disconnectTrialNodes(program, parents); + // TODO: You need to throw a proper error value here + throw new Error("Cycle detected while hot-swapping signal"); + } else { + // ===We commit to re-eval=== + disconnectInputs(program, letNode); // (0) detached from old parents + const closure: SignalClosure = { bindings, body: e.body, env }; + letNode.signal = SignalValue.closure(closure); + letNode.rank = newLetNodeRank; // (1) Rank set + for (const parentNode of parents) { + internalSubscribe(parentNode, letNode); // (2) internal inputs set + } + refreshRank(letNode); // `letNode` has to have the new rank for this to work - which is ensured by (1). + + // === re-initialization === + // TODO: This may throw an exception! + const newValue = eval_signal_closure(program, closure); + letNode.currentValue = newValue; // (3) Current value set + propagate(program, letNode); + } + + break; + } +} + +function eval_signal_closure(program: Program, closure: SignalClosure): Value { + let cur_env = closure.env; + for (const { pattern, node } of closure.bindings) { + const value = node.currentValue; + const res = match_product_pattern(pattern, value); + if (res.tag === "failure") { + // TODO: idk what to do here... + // TODO: Do you actually need to cleanup the old signals? What should be done here? + // TODO: We shouldn't throw here... the individual let-signal-branches should ve independent of each other. + // not sure... + throw ThrownRuntimeError.error({ + tag: "UnableToFindMatchingPattern", + value, + }); + } else { + cur_env = Env.push_frame(cur_env, res.frame); + } + } + + // TODO: This may throw an exception! + const value = eval_expr(program, cur_env, closure.body); + return value; +} + +export namespace SignalValue { + export const const_ = (value: Value): SignalValue => ({ tag: "const", value }); + export const closure = (closure: SignalClosure): SignalValue => ({ tag: "closure", closure }); + export const proxy = (parent: SignalRuntime.DAGNode): SignalValue => ({ tag: "proxy", parent }); +} + +// Calling convention: This assumes that the root-node's current-value has already been set - but its external-outputs haven't yet been notified. +function propagate(program: Program, rootNode: SignalRuntime.DAGNode) { + // Note that `rootNode` is not included in the `heap`. + const heap = new PriorityQueue(); + // But `rootNode` is queued into `externalEffectNodes`, since we're doing effects after internal propagation. + // We push the descendants of `rootNode` in a priority-queue order. + const descendantsWithExternalEffects: SignalRuntime.DAGNode[] = [rootNode]; + + // Add children + for (const childNode of rootNode.internalOutputs) { + heap.push(childNode); + } + + // === Internal Outputs === + while (!heap.isEmpty()) { + const node = heap.pop()!; + switch (node.signal.tag) { + case "const": + break; + case "proxy": { + // TODO: new PROXY code: Is this correct? + const value = node.internalInputs[0].currentValue; + node.currentValue = value; + descendantsWithExternalEffects.push(node); + for (const childNode of node.internalOutputs) { + heap.push(childNode); + } + break; + } + case "closure": { + // TODO: This may throw an exception! + const value = eval_signal_closure(program, node.signal.closure); + node.currentValue = value; + descendantsWithExternalEffects.push(node); + for (const childNode of node.internalOutputs) { + heap.push(childNode); + } + break; + } + } + } + + // === External Outputs === + for (const node of descendantsWithExternalEffects) { + for (const observer of node.externalOutputs) { + observer(node.currentValue); + } + } + +} + diff --git a/src/lang/program.ts b/src/lang/program.ts index 49ee449..d348a58 100644 --- a/src/lang/program.ts +++ b/src/lang/program.ts @@ -4,6 +4,7 @@ import { Env, Value } from "./eval/value"; import { Expr, FunctionName, SignalName, ProductPattern, SignalExpr } from "./expr"; import { installPrimitives } from "./primitive"; import { eval_expr } from "./eval/evaluator"; +import { SignalRuntime as SignalRuntimeNew} from "./eval/signalValue" export type Timestamp = number; @@ -14,6 +15,9 @@ export type Program = { signal_definitions: Map, signal_definition_order: SignalName[], + signal_runtimeNew: SignalRuntimeNew, + + // TODO: Get rid of the old Runtime signal_runtime: SignalRuntime, }; @@ -125,6 +129,7 @@ export namespace Program { function_definitions: new Map(), function_definition_order: [], + signal_runtimeNew: SignalRuntimeNew.make(), signal_runtime: SignalRuntime.make(), signal_definitions: new Map(), signal_definition_order: [], @@ -377,7 +382,7 @@ export namespace Program { } // === Signals === - export function getSignal(program: Program, name: FunctionName): Result { + export function getSignal(program: Program, name: SignalName): Result { const sigDef = program.signal_definitions.get(name); if (sigDef === undefined) { return Result.error({ tag: "SignalNotFound", name });