Hello Flow!
A flow chains composable building blocks — called nodes — to process streaming data. Each node reads fields from the incoming message, computes something, and adds new fields for the next node to use. The message grows richer as it moves through the chain.
In this example we build a simple temperature monitor. The input is a stream of noisy sensor readings that fluctuate around 70°F — sometimes drifting above safe limits, sometimes dropping back. The flow uses four nodes to smooth the noise, detect when the temperature stays too high, and broadcast an alert.
Step 1 — Smooth noisy data
.esMean('smooth', 'temperature', { mean: 'smoothTemp' }, { halfLife: 5 })Reads temperature from each message, computes an exponentially
smoothed mean, and adds smoothTemp to the message. Noisy spikes
flatten out; the underlying trend emerges.
Step 2 — Detect when it’s too hot
.threshold('check', 'smoothTemp', { active: 'overheating' },
{ mode: 'above', threshold: 80 })Reads smoothTemp (added by step 1) and adds overheating: true
when the value crosses above 80.
Step 3 — Confirm it’s not a fluke
.persistenceCheck('confirm',
msg => msg.overheating === true,
{ persistenceConfirmed: 'confirmed' },
{ minVotes: 2, outOfTotal: 3 })A single spike shouldn’t trigger an alert. This node sets
confirmed: true only if overheating persists in at least 2 of the
last 3 messages.
Step 4 — Broadcast the alert
.emitIf('alert',
msg => msg.confirmed === true,
{ target: 'mqtt', insightType: 'tempAlert' })Sends a copy of the message to MQTT when confirmed. The message
continues through the flow — emitIf is a side effect, not a stop.
See it run
Click each step below to add a node and watch the flow come alive — these are real winkComposer nodes running in your browser.
Put it together
import { flow } from '@winkjs/composer';
flow('temperature-monitor')
.esMean('smooth', 'temperature', { mean: 'smoothTemp' }, { halfLife: 5 })
.threshold('check', 'smoothTemp', { active: 'overheating' },
{ mode: 'above', threshold: 80 })
.persistenceCheck('confirm',
msg => msg.overheating === true,
{ persistenceConfirmed: 'confirmed' },
{ minVotes: 2, outOfTotal: 3 })
.emitIf('alert',
msg => msg.confirmed === true,
{ target: 'mqtt', insightType: 'tempAlert' })
.run();Each node reads what previous nodes added and contributes its own fields — the message grows as it flows through:
Next Steps
- Under the Hood — understand what happens inside the pipeline you just built
- Detecting Bearing Failure — watch a real failure detected in real time