Universal Concept
Channels are fundamental concepts in programming languages, data pipelines, and systems observability. They act as pipes for information flow between producers and consumers, simplifying system design by decoupling components and enabling asynchronous operations. Channels support safe and efficient concurrency in structured-type systems like Go or Rust, and lightweight messaging paradigms like MQTT.
Channels facilitate precise communication between distributed agents, recasting raw data into actionable insights. In data pipelines, channels maintain structured or unstructured data flow while supporting transformations, filtering, and aggregation.
In observability, channels underpin streaming architectures for real-time monitoring and alerting, enabling dynamic system reactions and scalability. Understanding and leveraging channels creates resilient, scalable, and maintainable systems, which is essential in today’s interconnected world.
Instruments provide a high-level, domain-specific interface on top of channels. Beneath the surface, they are essentially specialized wrappers around channel operations. An instrument interface defines the measurement parameters, while the instrument’s implementation specifies the measurement methods. Finally, a channel addresses the means of data transmission along pipelines and pathways.
Channeling Data
In the Substrates API, a Channel is effectively a named pipe. The Channel interface extends the Model interface, which provides access to the Subject—the reference for emitted data. Additionally, it extends the Inlet interface, which offers access to a Pipe—the means to transmit data along the Channel.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
interface Model { Subject subject (); } interface Inlet < E > { Pipe < E > pipe (); } interface Channel < E > extends Model, Inlet < E > { Path < E > path (); } |
To create a Channel, use one of the conduit methods in the Circuit interface. Usually, a Composer is passed to the conduit method to create an instrument around the Channel. However, most basic instruments, like a Counter or a Gauge, only need access to the Channel’s Pipe. Below the Pipe is the instrument returned.
1 2 3 4 5 6 7 8 9 10 |
var cortex = cortex (); var circuit = cortex.circuit (); var counters = circuit.conduit ( Integer.class, Inlet::pipe ); |
Using the ‘counters’ Conduit we can create named Pipes that only allow the emittance of Integers.
1 2 3 4 5 6 7 8 9 10 11 12 |
var counter = counters.compose ( cortex.name ( "pipe-counter" ) ); for ( int i = 1; i <= 10; i++ ) { counter.emit ( i ); } |
Data Pipelining
Most instruments aren’t focused on their channel’s subject. Instead, a subject is a concern for other observers (subscribers) who can create instruments using its name. In the code below, we create a new conduit for accumulators, but instead of returning the raw pipe, we create a pipelining path with a single accumulation stage and then the pipe. We then subscribe to the source of the counter and when a new subject is created we link the outflow from the counter pipe to the inflow of the accumulator pipe, changing the name in doing so to make it easier to sniff on the underlying flows through channels.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
var accumulators = circuit.conduit ( Integer.class, channel -> channel .pipe ( path -> path.reduce ( 0, Integer::sum ) ) ); var subscription = counters .source () .subscribe ( subject -> accumulators.compose ( subject .name () .name ( "total" ) ) ); |
Here is the console output following the execution of the loop above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
pipe-counter -> 1 pipe-counter.total -> 1 pipe-counter -> 2 pipe-counter.total -> 3 pipe-counter -> 3 pipe-counter.total -> 6 pipe-counter -> 4 pipe-counter.total -> 10 pipe-counter -> 5 pipe-counter.total -> 15 pipe-counter -> 6 pipe-counter.total -> 21 pipe-counter -> 7 pipe-counter.total -> 28 pipe-counter -> 8 pipe-counter.total -> 36 pipe-counter -> 9 pipe-counter.total -> 45 pipe-counter -> 10 pipe-counter.total -> 55 |
Pipeline Sifting
The Path interface provides many operations commonly found in pipeline constructs, including guard (filter), map (transformer), diff (change), and sift (comparator). In the code below, we’ve enhanced the pipeline to process only values greater than zero. This could have been achieved using the guard operation in the Path interface. The Sift interface offers additional operations like min, max, high, low, and range.
1 2 3 4 5 6 7 8 9 10 11 12 |
channel -> channel .pipe ( path -> path .sift ( Integer::compareTo, sift -> sift.above ( 0 ) ) .limit ( 2 ) .reduce ( 0, Integer::sum ) ) |
It is important to note the above Path and Sift operations are not executed by the calling thread that emits a value to the pipe. These operations are executed within the Circuit, something to be discussed next.