Subscriber
The Substrates API is an example of an Event Driven Architecture (EDA) applied to Observability. Instruments emit a data value into an Inlet part of a Conduit connected to a Circuit. Suppose there are Subscribers registered against the Source associated with the Conduit. In that case, the data value becomes the emission property of an Event, and the Event is dispatched to all Outlets registered by a Subscriber for the Subject (Instrument). Below is an example of boilerplate code for Event pipeline processing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
var circuit = cortex.circuit ( "counters" ); var conduit = circuit.conduit ( Counters.counters () ); var subscription = conduit .source () .subscribe ( /* ... */ ); |
It should be noted that the Event object reference is only valid for the call period to an Outlet to allow the underlying Substrates service provider interface (SPI) implementation to minimize object allocation and optimize memory footprint.
With an event-driven architectural approach as above, anytime a value needs to be calculated from a series of Events, a stateful consumer, typically an Outlet, invariably must use another Circuit Component, such as a Conduit or Adjunct, to publish the derived result and continuously after each Source Event receipt and processing. But there is an alternative option in the Source interface.
Reservoir
Instead of registering a Subscriber with a Source, the reservoir method can be called on a Source to create a Reservoir that temporarily stores Events in an order-maintaining buffer between calls to the drain method, which, when called, returns a Stream of Events, consisting of the Events published since the last call to the drain method or the creation of the Reservoir instance.
1 2 3 4 5 6 7 8 9 |
var reservoir = conduit .source () .reservoir (); var stream = reservoir.drain (); |
Because the Circuit turns an Instruments data emission into an Event, it is typical to call await on the Circuit’s Current before draining a Reservoir. For each call to the drain method, the queued-up Events in the buffer are removed and returned as a Stream, with the buffer continuing to fill up post-execution of the drain method. Below is an example of such Substrates API usage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
var counter = counters.instrument ( "name" ); for ( int i = 10; i > 0; i-- ) counter.inc (); circuit .current () .await (); var stream = reservoir.drain (); |
Stream
Now that the buffered Events are represented as a Stream, standard functional library operations provided by the Java SDK, such as grouping and summing, as shown in the following code, can be easily employed. Much of the unit test code uses Reservoirs as such.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
stream.collect ( Collectors.groupingBy ( event -> event .subject () .reference () .name (), Collectors.summingLong ( Event::emission ) ) ).forEach ( ( name, total ) -> System.out.printf ( "%s : %d%n", name, total ) ); |
Running the above code, the following is printed to a terminal.
>_
name : 10