Skip to Content
DocsConceptsComposition Patterns

Composition Patterns

Proven node combinations for common streaming problems — from noise-tolerant alarms to adaptive diagnostics and layered flows.

Pattern Catalog

Proven node combinations for common problems:

PatternPipelineUse Case
Bearing failurebutterworthFilter → esMean → thresholdVibration anomaly
Noise-tolerant alarmmedian3 → esMean → thresholdSpike-resistant alerting
Drift detectionesMean(fast) → esMean(slow) → diff → pageHinkleyGradual shift detection
Western Electric RulesesStats → persistenceCheck2-of-3 beyond 2σ via zScore
Correlation driftesPairwiseCorrelation → vectorDistance → emitIfMulti-sensor relationship change
State-aware persistencedwellTimeTracker → invertFlag → persistIfWrite on state exit, not entry

Layered Flows

Flows can feed into other flows, building layers where each one produces higher-level insights. A local flow processes raw sensor data — smoothing, detecting, and emitting digests via MQTT. An aggregate flow subscribes to those digests, combines results from multiple assets, and persists to a database:

Local flowsensor datasmoothdetectemitIfAggregate flowMQTT digestsdigestMomentstrend detectionpersistIfMQTTwrites to QuestDB

Both are ordinary flows — same nodes, same configuration. Each layer can run anywhere (edge device, cloud, or both). The power is composition: each layer builds on what the previous one produced.


Adaptive Diagnostics

All nodes start active. To keep expensive nodes dormant until needed, pair them with a controller that disables them when the triggering condition is absent. On the very first message, the non-anomaly condition matches and the controller disables the expensive nodes — saving up to ~95% of compute:

.esMean('smooth', 'vibration', { mean: 'smoothVib' }, { halfLife: 5 }) .threshold('detect', 'smoothVib', { active: 'anomaly' }, { mode: 'above', threshold: 2.5 }) .controller('adaptive', [ { when: msg => msg.anomaly === true, triggers: [ { control: 'enable', targets: ['stats', 'correlation'] }, { control: 'reset', targets: ['stats'] } ] }, { when: msg => msg.anomaly === false, triggers: [{ control: 'disable', targets: ['stats', 'correlation'] }] } ]) .esStats('stats', 'vibration', { mean: 'vibMean', stdev: 'vibStd' }, { halfLife: 20 }) .esCorrelation('correlation', 'vibration', 'temperature', { correlation: 'vibTempCorr' })

The two pipeline states (see Pipeline Diagram Symbols for the visual legend):

Normal:
msgesMeanthresholdcontrolleresStatsesCorrelation
Anomaly:
msgesMeanthresholdcontrolleresStatsesCorrelation

Downsampling for Storage

Collect raw samples into compact digests, then persist only when a window completes:

.momentsDigest('digest', 'temperature', { windowSize: 60 }) .digestMoments('stats', 'temperature', { mean: 'tempMean', stddev: 'tempStd', min: 'tempMin', max: 'tempMax' }) .persistIf('save', msg => msg.digest === true, { storageName: 'questdb', insightType: 'minuteStats' })

For multi-level aggregation (seconds → minutes → hours), chain with cascade: true:

.momentsDigest('perSecond', 'temperature', { windowSize: 100 }) .momentsDigest('perMinute', 'temperature', { windowSize: 60, cascade: true })

Choosing the Right Tool

These four mechanisms control what happens in your pipeline. They look similar but serve different purposes:

What you’re trying to doWrong toolRight toolWhy
Suspend expensive nodes during normal operationpassIf — messages vanish, downstream sees nothingcontroller + disable — messages pass through, nodes sleepStop computation, not messages
Remove bad data before it propagatescontroller + disable — bad data still flows throughpassIf — message is dropped entirelyStop messages, not computation

passIf — Drop messages that shouldn’t continue

Use when you want to remove messages from the pipeline entirely. Downstream nodes never see dropped messages.

Good for:

  • Quality gates — only pass messages with valid data
  • Sampling — pass every Nth message to reduce volume
  • Startup warmup — skip the first N messages while sensors stabilize
.passIf('quality', msg => msg.confidence > 0.9) .passIf('downsample', ( msg, counter ) => counter % 100 === 0)

emitIf — Broadcast without disrupting the flow

Use when you want to send a copy of the message to an external system (MQTT, terminal) while the original continues through the pipeline unchanged.

Good for:

  • Alerts — broadcast when a condition is detected
  • Telemetry — send periodic status updates
  • Debugging — tap into the pipeline to see what’s happening
.emitIf('alert', msg => msg.faultConfirmed, { target: 'mqtt', insightType: 'faultAlert' })

controller — Change how the pipeline behaves

Use when you want to turn nodes on or off, reset their state, or flush accumulated data. The controller reads the message but doesn’t change it — it sends signals to other nodes.

Good for:

  • Adaptive computation — enable expensive processing only during anomalies
  • State machines — switch between operational modes
  • Coordinated resets — clear state across multiple nodes at once
.controller('adaptive', [{ when: msg => msg.anomaly, triggers: [{ control: 'enable', targets: ['stats', 'corr'] }] }])

disable vs passIf — The key difference

passIf (filter)disable (via controller)
ScopeOne message at a timeAll messages while disabled
Message fateDropped — gone foreverPasses through unchanged
Downstream impactDownstream sees nothingDownstream sees the original message
Reversible?No — need a new messageYes — enable signal resumes processing
When to useBad data, samplingSuspend expensive computation

Rule of thumb: If you want to stop messages, use passIf. If you want to stop computation, use a controller with disable.


Error Handling

Every user-supplied function — conditions and dynamic options — is guarded. A throwing function never crashes the pipeline or affects other assets. See What Happens When a Predicate Throws and What Happens When a Tunable Throws for how each node responds.


Pipeline Ordering Guidelines

The order you place nodes matters:

  1. Clean firstsanitize at the start catches bad data before it propagates
  2. Smooth before detectesMean or median3 before threshold reduces false alarms
  3. Confirm before actpersistenceCheck before emitIf avoids alerting on transients
  4. Controller before targets — controllers can only target nodes that appear after them
  5. Observe lastemitIf and persistIf near the end see the fully enriched message

Next Steps

  • Semantics — define what your data means so every downstream system speaks the same language
  • Use Cases — see these patterns at production scale with real datasets
Last updated on