Skip to Content
DocsConceptsFlow Language

Flow Language

The flow language is how you build pipelines — a declarative DSL where everything starts with flow() and chains naturally.

Anatomy of a Flow

Every pipeline starts with flow() and ends with a terminal method. In between, there are two phases — configuration (infrastructure setup) and nodes (processing logic):

flow( 'pipeline-name' ) // ── Configuration phase (optional, must come before nodes) ── .assetId( 'machineId' ) // each unique value gets its own pipeline instance .source( adapter, config ) // data source .emitter( adapter, config ) // output destination .storage( adapter, config ) // persistence .namingPolicy( '{param}_{stat}' ) // auto-naming for multi-field .assetClass( definition ) // semantics for storage schema // ── Node phase (processing, in order) ── .esMean( ... ) .threshold( ... ) .emitIf( ... ) // ── Terminal ── .run() // wire and start immediately

Key rules:

  • Configuration methods must appear before any node — the Flow Language enforces this at build time
  • Nodes execute in the order you write them — each reads fields added by earlier nodes
  • .run() wires sources, emitters, storage, and starts the pipeline

The simplest possible flow has zero configuration and one node:

flow( 'minimal' ) .esMean( 'smooth', 'temperature', { mean: 'avg' } ) .run();

Semantics

As your pipeline grows, you add configuration. Semantics enter the flow through .assetClass() — they are the single source of truth for what your data means: column types, units, physical ranges, operational limits, and which columns belong to each storage table (insightType). The key design principle is that facts live in semantics, decisions live in flows — a column’s physical range is a fact; the threshold that triggers an alert is a decision. Nodes don’t consult semantics at runtime, but everything downstream does: the storage layer uses them to create schemas, the MCP server uses them to answer queries, and dashboards use them to render context-aware visualizations.


Anatomy of a Node Call

Most nodes follow this signature:

.nodeType( name, inputField, stats, options ) // ↑ ↑ ↑ ↑ // unique what to what to tuning // name read compute knobs
.esMean('smooth', 'temperature', { mean: 'tempAvg' }, { halfLife: 5 })

Variations exist — field-pair nodes take two input fields, condition nodes take a predicate instead of input/stats — but the pattern is consistent. The subsections below explain each part.


Dynamic Options

Any option that accepts a static value can also accept a function. The function receives the current message and returns the resolved value — evaluated fresh on every message:

// Static — same threshold for every message .threshold('check', 'temp', { active: 'hot' }, { threshold: 80 }) // Tunable — threshold adapts per message .threshold('check', 'temp', { active: 'hot' }, { threshold: ( msg ) => msg.baseline + 10 })

This means pipelines adapt to context — operating mode, shift, sensor type, learned baselines — without restart.

Built-in Helpers

The winkComposer framework provides helpers for common patterns. Each helper returns a function that works anywhere a static value is accepted:

HelperWhat It DoesExample
lookupByField( field, map, default )Map a message field to a valuelookupByField( 'shift', { day: 35, night: 30 }, 32 )
scaleBy( field, factor, offset, step )Scale a message fieldscaleBy( 'stdev', 0.5 )stdev * 0.5
fromField( field, default )Read a field directlyfromField( 'learnedBaseline', 50 )
chooseWhen( predicate, ifTrue, ifFalse )Conditional valuechooseWhen( msg => msg.isWarmup, 100, 78 )
clampTo( field, min, max )Clamp a field to a rangeclampTo( 'requested', 10, 100 )
offsetBy( field, offset )Add an offset to a fieldoffsetBy( 'baseline', 10 )

Real-World Example

WiFi RSSI thresholds that vary by protocol — each message carries its own protocol type, and the threshold adapts:

import { lookupByField } from '@winkjs/composer'; .threshold('rssiAlert', 'rssi', { active: 'rssi_low' }, { threshold: lookupByField( 'protocolType', { '802.11ac': -70, '802.11n': -75 }, -75 ) })

Individual node entries mark tunable-capable options with ⚡.

What Happens When a Tunable Throws

Dynamic option functions can throw at runtime — for example, if a message lacks an expected field. winkComposer handles this gracefully:

  • Last good value: The node continues with the last successfully resolved value. The throwing call is simply skipped — the node processes the message using whatever value worked last.
  • First-message edge case: If the function throws before any value has been resolved, the node falls back to a safe default — it either stays inactive until the function succeeds or marks its outputs as invalid, depending on the node.
  • Log suppression: The first error per episode is logged to console. Subsequent errors are suppressed until the function succeeds again (recovery), preventing console flooding at high message rates.
  • Automatic recovery: When the function succeeds again, the node resumes normal operation. A new error episode will be logged if the function fails again later.
  • Reset: A controller-triggered reset also clears the suppression. If the tunable still throws after the reset, a fresh error is logged.

Static values (threshold: 80) cannot throw, so the guard only activates for user-supplied functions.


Node Names

The first argument to every node is its name — a unique identifier within the pipeline:

.esMean('smooth', 'temperature', { mean: 'tempAvg' }) // ↑ // node name — must be unique across the pipeline

Node names serve two purposes:

  • Identity: Each name must be unique — duplicates cause a build error
  • Targeting: Controllers reference nodes by name to send control signals (e.g., enable, disable, reset)
.esMean('smooth', 'vibration', { mean: 'smoothVib' }) // node named 'smooth' .esStats('stats', 'vibration', { stdev: 'vibStd' }) // node named 'stats' .controller('ctrl', [{ when: msg => msg.anomaly === true, triggers: [{ control: 'reset', targets: ['smooth', 'stats'] }] // ↑ references nodes by name }])

Choosing What Each Node Computes and Adds to the Message

Every node can compute one or more statistics — you choose which ones you want via the stats object. Each requested stat becomes a new field on the message, available to all downstream nodes. Keys choose what to compute, values choose what to call it:

.esMean('smooth', 'temperature', { mean: 'tempAvg' }) // ↑ ↑ // compute mean call it 'tempAvg'

Request multiple outputs from one node:

.esStats('monitor', 'temperature', { mean: 'avg', stdev: 'std', zScore: 'z' }) // Message gets three new fields: avg, std, z

Each node reads fields already on the message — including fields written by upstream nodes — and adds its own. The message grows as it flows:

.esMean('smooth', 'temperature', { mean: 'smoothTemp' }, { halfLife: 5 }) .esStats('monitor', 'smoothTemp', { stdev: 'tempStd', zScore: 'tempZ' }) .threshold('check', 'tempZ', { active: 'anomaly' }, { mode: 'above', threshold: 3 })
Incoming: { temperature: 92 } After esMean: { temperature: 92, smoothTemp: 88.5 } ↑ reads temperature After esStats: { ..., smoothTemp: 88.5, tempStd: 2.1, tempZ: 1.7 } ↑ reads smoothTemp After threshold: { ..., tempZ: 1.7, anomaly: false } ↑ reads tempZ

Single vs Multi-Field Processing

Nodes can process fields in two modes:

Single Field Mode (String Input)

Pass a single field name as a string. You specify the exact output name:

// Single field - YOU control the output name directly .esMean('baseline', 'temperature', { mean: 'tempBaseline' }, { halfLife: 10 }) // Output field: tempBaseline (exactly what you specified) .threshold('check', 'pressure', { active: 'highPressure' }) // Output field: highPressure (exactly what you specified)

Key Points:

  • Input: String field name
  • Output: Exactly what you specify in the stats object
  • Naming policy: NOT applied
  • Use when: You want precise control over output names

Multi-Field Mode (Array Input)

Pass multiple field names as an array. The system auto-generates names using naming policy:

// Multi-field - SYSTEM generates names using naming policy .namingPolicy('{param}_{stat}') // Define the pattern .esMean('smooth', ['temperature', 'pressure'], { mean: 'smoothed' }, { halfLife: 10 }) // Output fields: temperature_smoothed, pressure_smoothed (auto-generated) .sanitize('validate', ['inlet_pressure', 'outlet_pressure'], // Array = multi-field { failureReason: 'error', failedValue: 'value' } ) // Output fields: inlet_pressure_error, inlet_pressure_value, // outlet_pressure_error, outlet_pressure_value

Key Points:

  • Input: Array of field names
  • Output: Auto-generated using naming policy pattern
  • Naming policy: APPLIED
  • Use when: Processing multiple fields with consistent naming

Naming Policy (Multi-Field Only!)

The naming policy only applies to multi-field processing (array inputs):

flow('my-pipeline') .namingPolicy('{param}_{stat}') // Template for multi-field auto-naming // Multi-field: naming policy APPLIES .esMean('smooth', ['temp', 'pressure'], { mean: 'avg' }) // Generates: temp_avg, pressure_avg // Single field: naming policy IGNORED .esMean('baseline', 'temp', { mean: 'myCustomBaseline' }) // Output: myCustomBaseline (your exact specification)

Template Variables:

  • {param} - The input field name (e.g., ‘temperature’)
  • {stat} - The name YOU provided in the stats object (e.g., ‘avg’)
  • {name} - The node instance name (e.g., ‘smooth’)
  • |dv - Drop vowels modifier (e.g., “temperature” → “tmprtr”)

Important: The {stat} variable uses the VALUE from your stats object, not the key:

.sanitize('check', ['temp'], { failureReason: 'invalid' }) // {stat} = 'invalid' (not 'failureReason') // Generates: temp_invalid

Per-Field Option Customization

When using multi-field mode, options can be customized per field using object syntax:

// Same halfLife for all fields .esMean('smooth', ['temp', 'pressure'], { mean: 'avg' }, { halfLife: 10 }) // Different halfLife per field — temperature responds faster .esMean('smooth', ['temp', 'pressure'], { mean: 'avg' }, { halfLife: { temp: 5, pressure: 20 } })

Each field gets independent state, namespaced control signals, and isolated failure handling. A processing error on one field does not affect others.


Node Processing Types

Beyond single/multi-field modes, nodes have different processing behaviors:

TypeDescriptionExample
Per-fieldEach field processed independently.esMean(), .threshold()
Field-pairOperates between exactly two fields.diff(), .ratio()
Field-GroupAnalyzes multiple fields together.esPairwiseCorrelation()
ConditionEvaluates boolean expressions.passIf(), .persistenceCheck()
ControlOrchestrates other nodes.controller()

Per-Field Processing

These nodes process each field independently. Supports both single and multi-field modes:

// Single field - direct naming .esMean('avg', 'temperature', { mean: 'avgTemp' }) // Output: avgTemp // Multi-field - auto-naming with policy .esMean('avg', ['temperature', 'pressure'], { mean: 'avg' }) // Outputs: temperature_avg, pressure_avg (with policy: '{param}_{stat}')

Nodes supporting per-field processing:

  • Signal Conditioning: esMean, median3, sanitize, butterworthFilter, kernel
  • Feature Extraction: lag, momentsDigest, trend
  • Detection: threshold, pageHinkley

Field-Pair Operations

These operate between exactly two fields. Always single mode (no arrays):

// Calculate difference between two fields .diff('delta', 'outlet_pressure', 'inlet_pressure', { diff: 'pressureDrop' }) // Output: pressureDrop // Calculate ratio .ratio('efficiency', 'output_power', 'input_power', { ratio: 'powerEfficiency' }) // Output: powerEfficiency // Correlation between two fields .esCorrelation('coupling', 'temp', 'pressure', { correlation: 'tempPressCorr' }) // Output: tempPressCorr

Field-Group Analysis

These analyze multiple fields as one group:

// Compute all pairwise correlations .esPairwiseCorrelation('matrix', ['temperature', 'pressure', 'flow'], // Analyzes as a group { correlations: 'sensorCorr', // Vector of all correlations pairNames: 'corrLabels' // "temp-pressure", "temp-flow", etc. } ) // Outputs: sensorCorr (array), corrLabels (array) // Monitor multiple states together .stateChangeDetector('monitor', ['machineState', 'qualityLevel'], // Watches as a group { dwellTime: 'stateTime' }, { changeMode: 'any' } // Trigger on any change )
Note

Group analyzers emit shared outputs; naming policy does not apply.

Condition-Based Processing

These evaluate predicates on messages and compute stats from the result:

// Track binary conditions .dwellTimeTracker('uptime', msg => msg.running, { dwellTime: 'runDuration', dutyCycle: 'runPercentage' }) // Output: runDuration, runPercentage // Confirm persistence (2-of-3 voting) .persistenceCheck('stable', msg => msg.temperature > 80, { persistenceConfirmed: 'overheating' }, { minVotes: 2, outOfTotal: 3 } ) // Output: overheating

Control Flow

Orchestrate other nodes without processing data:

.controller('adaptive', [{ when: msg => msg.vibration > 2.5, triggers: [ { control: 'enable', targets: ['esStats', 'correlation'] }, // targets by node name { control: 'reset', targets: ['baseline'] } // 'baseline' is an esMean node name ] }])

What Happens When a Predicate Throws

Predicate functions can throw at runtime — for example, if a message lacks a field the predicate reads. Every predicate-using node catches the exception and handles it according to its role:

NodeOn exceptionEffect
passIfMessage droppedSame as returning false — downstream nodes never see the message
persistenceCheckTreated as non-voteThe message doesn’t count toward confirmation
dwellTimeTrackerOutputs marked invalidNo state transition is recorded
controllerCondition skippedRemaining conditions still evaluate (first-match-wins)
emitIfEmission skippedEmits a status signal to the target; recovers automatically
persistIfWrite skippedFirst error per episode logged; subsequent suppressed until recovery
sanitize (custom predicate)Value treated as invalidSame as predicate returning false

All predicate exceptions are logged to console.error — once per error episode, not once per message. On recovery (successful predicate call), the suppression resets so the next episode is logged. A controller-triggered reset also clears the suppression.

No predicate exception ever crashes the pipeline or affects other assets.


Which Node Type to Choose

If you need to…Use this node typeExample
Process each field the same wayPer-fieldesMean, threshold
Compare two fieldsField-pairdiff, ratio
Analyze relationshipsField-GroupesPairwiseCorrelation
Filter or route messagesConditionpassIf, emitIf
Track binary statesdwellTimeTrackerRunning/stopped
Track categorical changesstateChangeDetectorState machines
Coordinate nodesControlcontroller

Next Steps

Last updated on