Composition Patterns
Proven node combinations for common streaming problems — from noise-tolerant alarms to adaptive diagnostics and layered flows.
Pattern Catalog
Proven node combinations for common problems:
| Pattern | Pipeline | Use Case |
|---|---|---|
| Bearing failure | butterworthFilter → esMean → threshold | Vibration anomaly |
| Noise-tolerant alarm | median3 → esMean → threshold | Spike-resistant alerting |
| Drift detection | esMean(fast) → esMean(slow) → diff → pageHinkley | Gradual shift detection |
| Western Electric Rules | esStats → persistenceCheck | 2-of-3 beyond 2σ via zScore |
| Correlation drift | esPairwiseCorrelation → vectorDistance → emitIf | Multi-sensor relationship change |
| State-aware persistence | dwellTimeTracker → invertFlag → persistIf | Write on state exit, not entry |
Layered Flows
Flows can feed into other flows, building layers where each one produces higher-level insights. A local flow processes raw sensor data — smoothing, detecting, and emitting digests via MQTT. An aggregate flow subscribes to those digests, combines results from multiple assets, and persists to a database:
Both are ordinary flows — same nodes, same configuration. Each layer can run anywhere (edge device, cloud, or both). The power is composition: each layer builds on what the previous one produced.
Adaptive Diagnostics
All nodes start active. To keep expensive nodes dormant until needed, pair them with a controller that disables them when the triggering condition is absent. On the very first message, the non-anomaly condition matches and the controller disables the expensive nodes — saving up to ~95% of compute:
.esMean('smooth', 'vibration', { mean: 'smoothVib' }, { halfLife: 5 })
.threshold('detect', 'smoothVib', { active: 'anomaly' },
{ mode: 'above', threshold: 2.5 })
.controller('adaptive', [
{
when: msg => msg.anomaly === true,
triggers: [
{ control: 'enable', targets: ['stats', 'correlation'] },
{ control: 'reset', targets: ['stats'] }
]
},
{
when: msg => msg.anomaly === false,
triggers: [{ control: 'disable', targets: ['stats', 'correlation'] }]
}
])
.esStats('stats', 'vibration',
{ mean: 'vibMean', stdev: 'vibStd' }, { halfLife: 20 })
.esCorrelation('correlation', 'vibration', 'temperature',
{ correlation: 'vibTempCorr' })The two pipeline states (see Pipeline Diagram Symbols for the visual legend):
Downsampling for Storage
Collect raw samples into compact digests, then persist only when a window completes:
.momentsDigest('digest', 'temperature', { windowSize: 60 })
.digestMoments('stats', 'temperature', {
mean: 'tempMean', stddev: 'tempStd', min: 'tempMin', max: 'tempMax'
})
.persistIf('save', msg => msg.digest === true, {
storageName: 'questdb', insightType: 'minuteStats'
})For multi-level aggregation (seconds → minutes → hours), chain with
cascade: true:
.momentsDigest('perSecond', 'temperature', { windowSize: 100 })
.momentsDigest('perMinute', 'temperature', { windowSize: 60, cascade: true })Choosing the Right Tool
These four mechanisms control what happens in your pipeline. They look similar but serve different purposes:
| What you’re trying to do | Wrong tool | Right tool | Why |
|---|---|---|---|
| Suspend expensive nodes during normal operation | passIf — messages vanish, downstream sees nothing | controller + disable — messages pass through, nodes sleep | Stop computation, not messages |
| Remove bad data before it propagates | controller + disable — bad data still flows through | passIf — message is dropped entirely | Stop messages, not computation |
passIf — Drop messages that shouldn’t continue
Use when you want to remove messages from the pipeline entirely. Downstream nodes never see dropped messages.
Good for:
- Quality gates — only pass messages with valid data
- Sampling — pass every Nth message to reduce volume
- Startup warmup — skip the first N messages while sensors stabilize
.passIf('quality', msg => msg.confidence > 0.9)
.passIf('downsample', ( msg, counter ) => counter % 100 === 0)emitIf — Broadcast without disrupting the flow
Use when you want to send a copy of the message to an external system (MQTT, terminal) while the original continues through the pipeline unchanged.
Good for:
- Alerts — broadcast when a condition is detected
- Telemetry — send periodic status updates
- Debugging — tap into the pipeline to see what’s happening
.emitIf('alert', msg => msg.faultConfirmed, {
target: 'mqtt', insightType: 'faultAlert'
})controller — Change how the pipeline behaves
Use when you want to turn nodes on or off, reset their state, or flush accumulated data. The controller reads the message but doesn’t change it — it sends signals to other nodes.
Good for:
- Adaptive computation — enable expensive processing only during anomalies
- State machines — switch between operational modes
- Coordinated resets — clear state across multiple nodes at once
.controller('adaptive', [{
when: msg => msg.anomaly,
triggers: [{ control: 'enable', targets: ['stats', 'corr'] }]
}])disable vs passIf — The key difference
passIf (filter) | disable (via controller) | |
|---|---|---|
| Scope | One message at a time | All messages while disabled |
| Message fate | Dropped — gone forever | Passes through unchanged |
| Downstream impact | Downstream sees nothing | Downstream sees the original message |
| Reversible? | No — need a new message | Yes — enable signal resumes processing |
| When to use | Bad data, sampling | Suspend expensive computation |
Rule of thumb: If you want to stop messages, use passIf. If
you want to stop computation, use a controller with disable.
Error Handling
Every user-supplied function — conditions and dynamic options — is guarded. A throwing function never crashes the pipeline or affects other assets. See What Happens When a Predicate Throws and What Happens When a Tunable Throws for how each node responds.
Pipeline Ordering Guidelines
The order you place nodes matters:
- Clean first —
sanitizeat the start catches bad data before it propagates - Smooth before detect —
esMeanormedian3beforethresholdreduces false alarms - Confirm before act —
persistenceCheckbeforeemitIfavoids alerting on transients - Controller before targets — controllers can only target nodes that appear after them
- Observe last —
emitIfandpersistIfnear the end see the fully enriched message