Skip to Content
DocsConceptsUnder the Hood

Under the Hood

In Hello Flow you built a temperature monitor — four nodes that smooth, detect, confirm, and broadcast. This page explains what happens inside that pipeline and what the framework does to keep things running when data is messy or code throws errors.

How Messages Flow

Every message visits each node in order, gaining new fields at each stop.

Pipeline Diagram Symbols

Pipeline diagrams throughout the docs use colored borders to show each node’s role:

ActiveprocessingDisabledturned offEmitterexternal outputControllerorchestrationFilterdrops messages

How Each Node Enriches the Message

A message enters the pipeline and visits each node in order. Each node reads fields from the message, computes something, and adds new fields for downstream nodes to use:

msgesMeanreads: tempadds: avgthresholdreads: avgadds: overheatingpersistenceCheckreads: overheatingadds: confirmedemitIfreads: confirmedbroadcasts to MQTT
Note

emitIf broadcasts a copy to an external system but does not stop the flow — if more nodes followed, the message would continue through them unchanged.

Three Things That Can Happen

What happensWhenWhat downstream nodes see
ProcessNode is activeMessage gets new fields, continues forward
DisabledNode is turned off (via controller)Message passes through unchanged — node does nothing
FilteredpassIf condition returns falseMessage stops — downstream nodes never see it

How a Disabled Node Differs from a Filter

Disabled nodes pass messages through unchanged. Filters (passIf) drop messages entirely. See disable vs passIf for when to use each.


Timestamps

Important

Every timestamp in winkComposer must be an integer representing milliseconds since January 1, 1970 (UTC). Getting this wrong will silently produce incorrect durations, dwell times, and storage records.

Why Milliseconds Since Epoch

A millisecond epoch timestamp is a single number — like 1735500000000 — that pins down an exact moment. No timezone, no format ambiguity, no parsing. The same number means the same instant whether the pipeline runs in Tokyo, London, or New York. Timezones only matter when you display a timestamp to a person; the pipeline never does that.

This is also what JavaScript’s Date.now() returns, so it works naturally with the language.

What You Need to Do

If your messages include a timestamp field (for dwell time tracking, state change detection, time-based slope, or storage), make sure the value is milliseconds since epoch:

// Already correct — no conversion needed { ts: Date.now() } // 1735500000000 (13 digits) { ts: 1735500000000 } // From a system that already uses epoch ms // Needs conversion — seconds (10 digits) must be multiplied by 1000 { ts: 1735500000 } // ✗ This is seconds, not milliseconds { ts: 1735500000 * 1000 } // ✓ Now it's milliseconds // Needs conversion — ISO string must be parsed { ts: '2024-12-30T00:00:00Z' } // ✗ String, not a number { ts: Date.parse( '2024-12-30T00:00:00Z' ) } // ✓ Parsed to milliseconds

Quick check: if your timestamp has 13 digits, it’s milliseconds. If it has 10 digits, it’s seconds — multiply by 1000.

What Happens If You Don’t Provide a Timestamp

Nodes that need timing — like dwellTimeTracker, stateChangeDetector, and persistIf — accept an optional timestampField option. When omitted, they use the system clock (Date.now()) automatically. This is fine when messages arrive in real time. But if you’re replaying historical data or processing batched records, you must provide a timestamp field so that durations reflect the original event times, not the replay clock.

Where Timestamps Are Used

Node / ComponentWhat It Uses the Timestamp For
dwellTimeTrackerHow long a condition has been active (dwell time, duty cycle)
stateChangeDetectorHow long the current state has lasted (dwell time since last transition)
lagTime-normalized slope: (value change) ÷ (time change)
persistIfRow timestamp written to storage (QuestDB)
Time-sliding windowsEvicting entries older than the configured duration

All of these compute differences between timestamps — subtracting one from another to get a duration. If timestamps are in the wrong unit, every duration will be off by a factor of 1000.


What Happens When Things Go Wrong

winkComposer handles bad inputs, throwing functions, and multi-asset isolation — no crashes, no manual resets.

Bad Input

Sensors go offline, PLCs emit sentinel values (-9999, 65535), division by zero produces meaningless results, and the first few readings after startup can’t be trusted. winkComposer handles this at two levels:

First line of defense — the sanitize node. Place it at the start of your pipeline to catch bad data before it reaches the rest of the chain. It validates against configured ranges, value lists, or custom checks — and marks failures as invalid for all downstream nodes:

.sanitize('check', 'temperature', { failureReason: 'tempErr' }, { ranges: { temperature: { min: -40, max: 150 } } // PLC sentinel -9999 → marked invalid })

Safety net — automatic invalid-value propagation. Every computational node validates its input. When a node sees a bad value (missing, out of range, or the result of a failed upstream computation), it skips its own computation and marks all its outputs as invalid. Downstream nodes see the invalid marker, trigger their own validation, and propagate it further — a controlled cascade, not a crash. On the next valid message, nodes resume normal computation automatically. No reset needed.

In multi-field mode, each field is processed independently. If one sensor produces bad data, only that sensor’s outputs are marked invalid — the other sensors keep computing normally:

temperature: 92.5 → esMean computes → smoothTemp: 88.3 ✓ pressure: (invalid) → esMean skips → smoothPres: (invalid) ✗ vibration: 0.42 → esMean computes → smoothVib: 0.41 ✓

User Functions That Throw

You pass functions into the pipeline in two ways: as conditions that decide what happens (like the test in emitIf or passIf) and as dynamic options that adapt values at runtime (like a threshold that varies by operating mode). Both are guarded — every node catches the exception, continues processing, and logs once per error episode.

Each node handles the error according to its role: a filter drops the message, an emitter skips the emission, a controller skips the failing condition and tries the next one. See What Happens When a Predicate Throws for the per-node breakdown.

When a dynamic option function throws, the node continues with the last value that worked. If there is no previous value (first message), it falls back to a safe default. See What Happens When a Tunable Throws for details.

When a controller resets a node, the error suppression clears too. If the same error happens again after the reset, a fresh log entry appears — confirming that the reset did not fix the problem.

Each Asset Gets Its Own State

When you configure .assetId( 'machineId' ), each unique asset gets its own complete pipeline instance — with independent buffers, counters, smoothing history, and threshold state. You write one flow definition; the framework creates isolated instances as new assets appear:

machineId: 'pump-A' → [esMean state A] → [threshold state A] → ... machineId: 'pump-B' → [esMean state B] → [threshold state B] → ... machineId: 'pump-C' → [esMean state C] → [threshold state C] → ...

Each asset lives in its own world — independent smoothing history, independent threshold state, independent controller decisions. A misbehaving sensor on one asset cannot corrupt another asset’s results. A single message stream can carry data from hundreds of assets, and each one gets clean, dedicated processing as if it had its own pipeline.


One Flow, One Process

A flow owns its process. When you call .run(), it registers signal handlers for graceful shutdown. One running flow per process:

  • Use .assetId() for per-asset isolation within one flow
  • Use .switch()/.case() for specialized sub-pipelines within one flow
  • Use separate processes for truly independent flows

Next Steps

Last updated on