Under the Hood
In Hello Flow you built a temperature monitor — four nodes that smooth, detect, confirm, and broadcast. This page explains what happens inside that pipeline and what the framework does to keep things running when data is messy or code throws errors.
How Messages Flow
Every message visits each node in order, gaining new fields at each stop.
Pipeline Diagram Symbols
Pipeline diagrams throughout the docs use colored borders to show each node’s role:
How Each Node Enriches the Message
A message enters the pipeline and visits each node in order. Each node reads fields from the message, computes something, and adds new fields for downstream nodes to use:
emitIf broadcasts a copy to an external system but does not stop the flow — if more nodes followed, the message would continue through them unchanged.
Three Things That Can Happen
| What happens | When | What downstream nodes see |
|---|---|---|
| Process | Node is active | Message gets new fields, continues forward |
| Disabled | Node is turned off (via controller) | Message passes through unchanged — node does nothing |
| Filtered | passIf condition returns false | Message stops — downstream nodes never see it |
How a Disabled Node Differs from a Filter
Disabled nodes pass messages through unchanged. Filters (passIf)
drop messages entirely. See
disable vs passIf
for when to use each.
Timestamps
Every timestamp in winkComposer must be an integer representing milliseconds since January 1, 1970 (UTC). Getting this wrong will silently produce incorrect durations, dwell times, and storage records.
Why Milliseconds Since Epoch
A millisecond epoch timestamp is a single number — like
1735500000000 — that pins down an exact moment. No timezone, no
format ambiguity, no parsing. The same number means the same instant
whether the pipeline runs in Tokyo, London, or New York. Timezones
only matter when you display a timestamp to a person; the pipeline
never does that.
This is also what JavaScript’s Date.now() returns, so it works
naturally with the language.
What You Need to Do
If your messages include a timestamp field (for dwell time tracking, state change detection, time-based slope, or storage), make sure the value is milliseconds since epoch:
// Already correct — no conversion needed
{ ts: Date.now() } // 1735500000000 (13 digits)
{ ts: 1735500000000 } // From a system that already uses epoch ms
// Needs conversion — seconds (10 digits) must be multiplied by 1000
{ ts: 1735500000 } // ✗ This is seconds, not milliseconds
{ ts: 1735500000 * 1000 } // ✓ Now it's milliseconds
// Needs conversion — ISO string must be parsed
{ ts: '2024-12-30T00:00:00Z' } // ✗ String, not a number
{ ts: Date.parse( '2024-12-30T00:00:00Z' ) } // ✓ Parsed to millisecondsQuick check: if your timestamp has 13 digits, it’s milliseconds. If it has 10 digits, it’s seconds — multiply by 1000.
What Happens If You Don’t Provide a Timestamp
Nodes that need timing — like dwellTimeTracker,
stateChangeDetector, and persistIf — accept an optional
timestampField option. When omitted, they use the system clock
(Date.now()) automatically. This is fine when messages arrive in
real time. But if you’re replaying historical data or processing
batched records, you must provide a timestamp field so that
durations reflect the original event times, not the replay clock.
Where Timestamps Are Used
| Node / Component | What It Uses the Timestamp For |
|---|---|
dwellTimeTracker | How long a condition has been active (dwell time, duty cycle) |
stateChangeDetector | How long the current state has lasted (dwell time since last transition) |
lag | Time-normalized slope: (value change) ÷ (time change) |
persistIf | Row timestamp written to storage (QuestDB) |
| Time-sliding windows | Evicting entries older than the configured duration |
All of these compute differences between timestamps — subtracting one from another to get a duration. If timestamps are in the wrong unit, every duration will be off by a factor of 1000.
What Happens When Things Go Wrong
winkComposer handles bad inputs, throwing functions, and multi-asset isolation — no crashes, no manual resets.
Bad Input
Sensors go offline, PLCs emit sentinel values (-9999, 65535), division by zero produces meaningless results, and the first few readings after startup can’t be trusted. winkComposer handles this at two levels:
First line of defense — the sanitize node. Place it at the
start of your pipeline to catch bad data before it reaches the rest
of the chain. It validates against configured ranges, value lists, or
custom checks — and marks failures as invalid for all downstream
nodes:
.sanitize('check', 'temperature', { failureReason: 'tempErr' }, {
ranges: { temperature: { min: -40, max: 150 } } // PLC sentinel -9999 → marked invalid
})Safety net — automatic invalid-value propagation. Every computational node validates its input. When a node sees a bad value (missing, out of range, or the result of a failed upstream computation), it skips its own computation and marks all its outputs as invalid. Downstream nodes see the invalid marker, trigger their own validation, and propagate it further — a controlled cascade, not a crash. On the next valid message, nodes resume normal computation automatically. No reset needed.
In multi-field mode, each field is processed independently. If one sensor produces bad data, only that sensor’s outputs are marked invalid — the other sensors keep computing normally:
temperature: 92.5 → esMean computes → smoothTemp: 88.3 ✓
pressure: (invalid) → esMean skips → smoothPres: (invalid) ✗
vibration: 0.42 → esMean computes → smoothVib: 0.41 ✓User Functions That Throw
You pass functions into the pipeline in two ways: as conditions
that decide what happens (like the test in emitIf or passIf) and
as dynamic options that adapt values at runtime (like a threshold
that varies by operating mode). Both are guarded — every node catches
the exception, continues processing, and logs once per error episode.
Each node handles the error according to its role: a filter drops the message, an emitter skips the emission, a controller skips the failing condition and tries the next one. See What Happens When a Predicate Throws for the per-node breakdown.
When a dynamic option function throws, the node continues with the last value that worked. If there is no previous value (first message), it falls back to a safe default. See What Happens When a Tunable Throws for details.
When a controller resets a node, the error suppression clears too. If the same error happens again after the reset, a fresh log entry appears — confirming that the reset did not fix the problem.
Each Asset Gets Its Own State
When you configure .assetId( 'machineId' ), each unique asset gets
its own complete pipeline instance — with independent buffers,
counters, smoothing history, and threshold state. You write one flow
definition; the framework creates isolated instances as new assets
appear:
machineId: 'pump-A' → [esMean state A] → [threshold state A] → ...
machineId: 'pump-B' → [esMean state B] → [threshold state B] → ...
machineId: 'pump-C' → [esMean state C] → [threshold state C] → ...Each asset lives in its own world — independent smoothing history, independent threshold state, independent controller decisions. A misbehaving sensor on one asset cannot corrupt another asset’s results. A single message stream can carry data from hundreds of assets, and each one gets clean, dedicated processing as if it had its own pipeline.
One Flow, One Process
A flow owns its process. When you call .run(), it registers signal
handlers for graceful shutdown. One running flow per process:
- Use
.assetId()for per-asset isolation within one flow - Use
.switch()/.case()for specialized sub-pipelines within one flow - Use separate processes for truly independent flows
Next Steps
- Flow Language — the full reference for building pipelines
- Composition Patterns — proven node combinations for common problems