Flow Language
The flow language is how you build pipelines — a declarative DSL where everything starts with flow() and chains naturally.
Anatomy of a Flow
Every pipeline starts with flow() and ends with a terminal method. In between, there are two phases — configuration (infrastructure setup) and nodes (processing logic):
flow( 'pipeline-name' )
// ── Configuration phase (optional, must come before nodes) ──
.assetId( 'machineId' ) // each unique value gets its own pipeline instance
.source( adapter, config ) // data source
.emitter( adapter, config ) // output destination
.storage( adapter, config ) // persistence
.namingPolicy( '{param}_{stat}' ) // auto-naming for multi-field
.assetClass( definition ) // semantics for storage schema
// ── Node phase (processing, in order) ──
.esMean( ... )
.threshold( ... )
.emitIf( ... )
// ── Terminal ──
.run() // wire and start immediatelyKey rules:
- Configuration methods must appear before any node — the Flow Language enforces this at build time
- Nodes execute in the order you write them — each reads fields added by earlier nodes
.run()wires sources, emitters, storage, and starts the pipeline
The simplest possible flow has zero configuration and one node:
flow( 'minimal' )
.esMean( 'smooth', 'temperature', { mean: 'avg' } )
.run();Semantics
As your pipeline grows, you add configuration. Semantics enter the flow through .assetClass() — they are the single source of truth for what your data means: column types, units, physical ranges, operational limits, and which columns belong to each storage table (insightType). The key design principle is that facts live in semantics, decisions live in flows — a column’s physical range is a fact; the threshold that triggers an alert is a decision. Nodes don’t consult semantics at runtime, but everything downstream does: the storage layer uses them to create schemas, the MCP server uses them to answer queries, and dashboards use them to render context-aware visualizations.
Anatomy of a Node Call
Most nodes follow this signature:
.nodeType( name, inputField, stats, options )
// ↑ ↑ ↑ ↑
// unique what to what to tuning
// name read compute knobs.esMean('smooth', 'temperature', { mean: 'tempAvg' }, { halfLife: 5 })Variations exist — field-pair nodes take two input fields, condition nodes take a predicate instead of input/stats — but the pattern is consistent. The subsections below explain each part.
Dynamic Options
Any option that accepts a static value can also accept a function. The function receives the current message and returns the resolved value — evaluated fresh on every message:
// Static — same threshold for every message
.threshold('check', 'temp', { active: 'hot' }, { threshold: 80 })
// Tunable — threshold adapts per message
.threshold('check', 'temp', { active: 'hot' }, {
threshold: ( msg ) => msg.baseline + 10
})This means pipelines adapt to context — operating mode, shift, sensor type, learned baselines — without restart.
Built-in Helpers
The winkComposer framework provides helpers for common patterns. Each helper returns a function that works anywhere a static value is accepted:
| Helper | What It Does | Example |
|---|---|---|
lookupByField( field, map, default ) | Map a message field to a value | lookupByField( 'shift', { day: 35, night: 30 }, 32 ) |
scaleBy( field, factor, offset, step ) | Scale a message field | scaleBy( 'stdev', 0.5 ) → stdev * 0.5 |
fromField( field, default ) | Read a field directly | fromField( 'learnedBaseline', 50 ) |
chooseWhen( predicate, ifTrue, ifFalse ) | Conditional value | chooseWhen( msg => msg.isWarmup, 100, 78 ) |
clampTo( field, min, max ) | Clamp a field to a range | clampTo( 'requested', 10, 100 ) |
offsetBy( field, offset ) | Add an offset to a field | offsetBy( 'baseline', 10 ) |
Real-World Example
WiFi RSSI thresholds that vary by protocol — each message carries its own protocol type, and the threshold adapts:
import { lookupByField } from '@winkjs/composer';
.threshold('rssiAlert', 'rssi', { active: 'rssi_low' }, {
threshold: lookupByField( 'protocolType', {
'802.11ac': -70,
'802.11n': -75
}, -75 )
})Individual node entries mark tunable-capable options with ⚡.
What Happens When a Tunable Throws
Dynamic option functions can throw at runtime — for example, if a message lacks an expected field. winkComposer handles this gracefully:
- Last good value: The node continues with the last successfully resolved value. The throwing call is simply skipped — the node processes the message using whatever value worked last.
- First-message edge case: If the function throws before any value has been resolved, the node falls back to a safe default — it either stays inactive until the function succeeds or marks its outputs as invalid, depending on the node.
- Log suppression: The first error per episode is logged to console. Subsequent errors are suppressed until the function succeeds again (recovery), preventing console flooding at high message rates.
- Automatic recovery: When the function succeeds again, the node resumes normal operation. A new error episode will be logged if the function fails again later.
- Reset: A controller-triggered reset also clears the suppression. If the tunable still throws after the reset, a fresh error is logged.
Static values (threshold: 80) cannot throw, so the guard only activates for user-supplied functions.
Node Names
The first argument to every node is its name — a unique identifier within the pipeline:
.esMean('smooth', 'temperature', { mean: 'tempAvg' })
// ↑
// node name — must be unique across the pipelineNode names serve two purposes:
- Identity: Each name must be unique — duplicates cause a build error
- Targeting: Controllers reference nodes by name to send control signals (e.g., enable, disable, reset)
.esMean('smooth', 'vibration', { mean: 'smoothVib' }) // node named 'smooth'
.esStats('stats', 'vibration', { stdev: 'vibStd' }) // node named 'stats'
.controller('ctrl', [{
when: msg => msg.anomaly === true,
triggers: [{ control: 'reset', targets: ['smooth', 'stats'] }]
// ↑ references nodes by name
}])Choosing What Each Node Computes and Adds to the Message
Every node can compute one or more statistics — you choose which ones you want via the stats object. Each requested stat becomes a new field on the message, available to all downstream nodes. Keys choose what to compute, values choose what to call it:
.esMean('smooth', 'temperature', { mean: 'tempAvg' })
// ↑ ↑
// compute mean call it 'tempAvg'Request multiple outputs from one node:
.esStats('monitor', 'temperature', { mean: 'avg', stdev: 'std', zScore: 'z' })
// Message gets three new fields: avg, std, zEach node reads fields already on the message — including fields written by upstream nodes — and adds its own. The message grows as it flows:
.esMean('smooth', 'temperature', { mean: 'smoothTemp' }, { halfLife: 5 })
.esStats('monitor', 'smoothTemp', { stdev: 'tempStd', zScore: 'tempZ' })
.threshold('check', 'tempZ', { active: 'anomaly' }, { mode: 'above', threshold: 3 })Incoming: { temperature: 92 }
After esMean: { temperature: 92, smoothTemp: 88.5 }
↑ reads temperature
After esStats: { ..., smoothTemp: 88.5, tempStd: 2.1, tempZ: 1.7 }
↑ reads smoothTemp
After threshold: { ..., tempZ: 1.7, anomaly: false }
↑ reads tempZSingle vs Multi-Field Processing
Nodes can process fields in two modes:
Single Field Mode (String Input)
Pass a single field name as a string. You specify the exact output name:
// Single field - YOU control the output name directly
.esMean('baseline', 'temperature', { mean: 'tempBaseline' }, { halfLife: 10 })
// Output field: tempBaseline (exactly what you specified)
.threshold('check', 'pressure', { active: 'highPressure' })
// Output field: highPressure (exactly what you specified)Key Points:
- Input: String field name
- Output: Exactly what you specify in the stats object
- Naming policy: NOT applied
- Use when: You want precise control over output names
Multi-Field Mode (Array Input)
Pass multiple field names as an array. The system auto-generates names using naming policy:
// Multi-field - SYSTEM generates names using naming policy
.namingPolicy('{param}_{stat}') // Define the pattern
.esMean('smooth', ['temperature', 'pressure'], { mean: 'smoothed' }, { halfLife: 10 })
// Output fields: temperature_smoothed, pressure_smoothed (auto-generated)
.sanitize('validate',
['inlet_pressure', 'outlet_pressure'], // Array = multi-field
{ failureReason: 'error', failedValue: 'value' }
)
// Output fields: inlet_pressure_error, inlet_pressure_value,
// outlet_pressure_error, outlet_pressure_valueKey Points:
- Input: Array of field names
- Output: Auto-generated using naming policy pattern
- Naming policy: APPLIED
- Use when: Processing multiple fields with consistent naming
Naming Policy (Multi-Field Only!)
The naming policy only applies to multi-field processing (array inputs):
flow('my-pipeline')
.namingPolicy('{param}_{stat}') // Template for multi-field auto-naming
// Multi-field: naming policy APPLIES
.esMean('smooth', ['temp', 'pressure'], { mean: 'avg' })
// Generates: temp_avg, pressure_avg
// Single field: naming policy IGNORED
.esMean('baseline', 'temp', { mean: 'myCustomBaseline' })
// Output: myCustomBaseline (your exact specification)Template Variables:
{param}- The input field name (e.g., ‘temperature’){stat}- The name YOU provided in the stats object (e.g., ‘avg’){name}- The node instance name (e.g., ‘smooth’)|dv- Drop vowels modifier (e.g., “temperature” → “tmprtr”)
Important: The {stat} variable uses the VALUE from your stats object, not the key:
.sanitize('check', ['temp'], { failureReason: 'invalid' })
// {stat} = 'invalid' (not 'failureReason')
// Generates: temp_invalidPer-Field Option Customization
When using multi-field mode, options can be customized per field using object syntax:
// Same halfLife for all fields
.esMean('smooth', ['temp', 'pressure'], { mean: 'avg' }, { halfLife: 10 })
// Different halfLife per field — temperature responds faster
.esMean('smooth', ['temp', 'pressure'], { mean: 'avg' }, {
halfLife: { temp: 5, pressure: 20 }
})Each field gets independent state, namespaced control signals, and isolated failure handling. A processing error on one field does not affect others.
Node Processing Types
Beyond single/multi-field modes, nodes have different processing behaviors:
| Type | Description | Example |
|---|---|---|
| Per-field | Each field processed independently | .esMean(), .threshold() |
| Field-pair | Operates between exactly two fields | .diff(), .ratio() |
| Field-Group | Analyzes multiple fields together | .esPairwiseCorrelation() |
| Condition | Evaluates boolean expressions | .passIf(), .persistenceCheck() |
| Control | Orchestrates other nodes | .controller() |
Per-Field Processing
These nodes process each field independently. Supports both single and multi-field modes:
// Single field - direct naming
.esMean('avg', 'temperature', { mean: 'avgTemp' })
// Output: avgTemp
// Multi-field - auto-naming with policy
.esMean('avg', ['temperature', 'pressure'], { mean: 'avg' })
// Outputs: temperature_avg, pressure_avg (with policy: '{param}_{stat}')Nodes supporting per-field processing:
- Signal Conditioning:
esMean,median3,sanitize,butterworthFilter,kernel - Feature Extraction:
lag,momentsDigest,trend - Detection:
threshold,pageHinkley
Field-Pair Operations
These operate between exactly two fields. Always single mode (no arrays):
// Calculate difference between two fields
.diff('delta', 'outlet_pressure', 'inlet_pressure', { diff: 'pressureDrop' })
// Output: pressureDrop
// Calculate ratio
.ratio('efficiency', 'output_power', 'input_power', { ratio: 'powerEfficiency' })
// Output: powerEfficiency
// Correlation between two fields
.esCorrelation('coupling', 'temp', 'pressure', { correlation: 'tempPressCorr' })
// Output: tempPressCorrField-Group Analysis
These analyze multiple fields as one group:
// Compute all pairwise correlations
.esPairwiseCorrelation('matrix',
['temperature', 'pressure', 'flow'], // Analyzes as a group
{
correlations: 'sensorCorr', // Vector of all correlations
pairNames: 'corrLabels' // "temp-pressure", "temp-flow", etc.
}
)
// Outputs: sensorCorr (array), corrLabels (array)
// Monitor multiple states together
.stateChangeDetector('monitor',
['machineState', 'qualityLevel'], // Watches as a group
{ dwellTime: 'stateTime' },
{ changeMode: 'any' } // Trigger on any change
)Group analyzers emit shared outputs; naming policy does not apply.
Condition-Based Processing
These evaluate predicates on messages and compute stats from the result:
// Track binary conditions
.dwellTimeTracker('uptime', msg => msg.running, {
dwellTime: 'runDuration',
dutyCycle: 'runPercentage'
})
// Output: runDuration, runPercentage
// Confirm persistence (2-of-3 voting)
.persistenceCheck('stable',
msg => msg.temperature > 80,
{ persistenceConfirmed: 'overheating' },
{ minVotes: 2, outOfTotal: 3 }
)
// Output: overheatingControl Flow
Orchestrate other nodes without processing data:
.controller('adaptive', [{
when: msg => msg.vibration > 2.5,
triggers: [
{ control: 'enable', targets: ['esStats', 'correlation'] }, // targets by node name
{ control: 'reset', targets: ['baseline'] } // 'baseline' is an esMean node name
]
}])What Happens When a Predicate Throws
Predicate functions can throw at runtime — for example, if a message lacks a field the predicate reads. Every predicate-using node catches the exception and handles it according to its role:
| Node | On exception | Effect |
|---|---|---|
| passIf | Message dropped | Same as returning false — downstream nodes never see the message |
| persistenceCheck | Treated as non-vote | The message doesn’t count toward confirmation |
| dwellTimeTracker | Outputs marked invalid | No state transition is recorded |
| controller | Condition skipped | Remaining conditions still evaluate (first-match-wins) |
| emitIf | Emission skipped | Emits a status signal to the target; recovers automatically |
| persistIf | Write skipped | First error per episode logged; subsequent suppressed until recovery |
| sanitize (custom predicate) | Value treated as invalid | Same as predicate returning false |
All predicate exceptions are logged to console.error — once per error episode, not once per message. On recovery (successful predicate call), the suppression resets so the next episode is logged. A controller-triggered reset also clears the suppression.
No predicate exception ever crashes the pipeline or affects other assets.
Which Node Type to Choose
| If you need to… | Use this node type | Example |
|---|---|---|
| Process each field the same way | Per-field | esMean, threshold |
| Compare two fields | Field-pair | diff, ratio |
| Analyze relationships | Field-Group | esPairwiseCorrelation |
| Filter or route messages | Condition | passIf, emitIf |
| Track binary states | dwellTimeTracker | Running/stopped |
| Track categorical changes | stateChangeDetector | State machines |
| Coordinate nodes | Control | controller |
Next Steps
- Composition Patterns — proven node combinations for common problems
- Semantics — give your computed values meaning with metadata