This post walks through one of the Substrates showcase examples hosted on GitHub, demonstrating two of the most critical aspects of observability pipelines – the production and consumption of captured measurement data and published events.
Modeling the Game
The code introduced in multiple parts below is based on the Game of Ping Pong, where a Ball is passed back and forth between two Players. In the Game, a Player is both a producer and consumer of an Event, the Event being the serving of the Ball from one Player who previously received the Ball from the other Player.
In the code, a Player is modeled as an Instrument that emits Events where the emission property of an Event is a Ball. The Player interface acts as a protocol decorator around an Inlet provided by the Substrates framework. Typically, such interface methods would capture, transform, and validate data before passing it onto the Inlet and the pipeline.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
interface Ball {} record Player( Inlet< Player, Ball > inlet ) implements Instrument< Ball > { void serve ( Ball ball ) { inlet.emit ( ball ); } boolean is ( Subject subject ) { return subject == inlet.subject (); } } |
A Game is modeled as a Subscriber. Whenever a Player is created, the accept method of the Game interface is called, and an Outlet is registered. The Outlet created within the bind method, shown later, will call serve on the Player for the Ball received. So whenever the Subject is pong, the Outlet forwards the Ball onto ping. When the Subject is ping, the Outlet forwards onto pong. A Game here acts as an observer and a controller, wiring up pipelined events with actions.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
record Game( long serves, Player ping, Player pong ) implements Subscriber< Ball > { @Override public void accept ( Subject subject, Registrar< Ball > registrar ) { if ( pong.is ( subject ) ) { bind ( registrar, ping ); } else if ( ping.is ( subject ) ) { bind ( registrar, pong ); } } } |
Within the bind method of the Game class, the Game uses the registrar of a Player to register (link) an Outlet that extracts the emission from an Event and forwards it to another Player via the serve method. The Outlet is decorated with a filter that forwards the Ball as long as the Game is not over, which is determined by the serves property.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
void bind ( Registrar< Ball > registrar, Player player ) { registrar.register ( Outlet.of ( player::serve ) .filter ( this::isNotOver ) ); } |
The count of serves could have been tracked in the Ball or Game types, but instead, the Event sequence property is used.
1 2 3 4 5 6 7 8 9 10 |
boolean isNotOver ( Event< Ball > event ) { return event.sequence () <= serves; } |
One last method, start, needs to be added to the Game type that starts the Ball rolling by serving to ping first.
1 2 3 4 5 6 7 8 9 |
void start () { ping.serve ( new Ball () {} ); } |
That is it for the model in which Ball was mapped to Event emission, Player to Instrument, and Game to Subscriber.
Wiring the Circuitry
The Cortex interface is the surface of the pipeline fabric from where all circuitry and referent types are assembled. In the code below, the lifetime of the Circuit (executor) created by the Cortex is scoped to a try-finally block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
var cortex = Substrates.cortex (); try ( var scope = cortex.scope ( cortex.circuit ( cortex.name ( PingPong.class ) ) ) ) { /* create a conduit and instruments */ } |
With a Circuit in hand, a Conduit (channel) can now be created for the Player Instruments to transfer Event emissions between Inlets (producers) and Outlets (consumers). Creating a Conduit requires a Cartridge, an interface that is an abstraction for interfacing with Instrument factories (suppliers). A Cartridge is created from the Player constructor.
1 2 3 4 5 6 7 8 9 10 11 |
var circuit = scope.get (); var conduit = circuit.conduit ( Cartridge.of ( Player::new ) ); |
Calling the instrument method on the Conduit, the two Player instances required for the Game are automatically created.
1 2 3 4 5 6 7 8 9 10 11 12 |
var game = new Game ( 1_000_000_000L, conduit.instrument ( cortex.name ( "Bill" ) ), conduit.instrument ( cortex.name ( "Alice" ) ) ); |
Before starting the Game, it must subscribe to the Conduit‘s Source component to receive and react to Events.
1 2 3 4 5 |
conduit .source () .subscribe ( game ); |
Running the Game
Finally, the Game can be started. But to prevent the main Thread from exiting the try block and closing the Circuit Resource immediately, the Queue component is used to await the delivery of all Ball serve Events emitted by Players.
1 2 3 4 5 6 7 |
game.start (); circuit .queue () .flush (); |
Wrapping it up
The showcase example above covers several key concepts within the Substrates API. However, unless one is developing a new Instrument, as in the Player above, knowledge of the Circuit, Conduit, and Cartridge interfaces is sufficient for most purposes.
In a future post, the ability and benefits of pipelining multiple instances of Circuit and Conduit will be explored.