Changelog
- 11/23/2020: Initial draft
- 10/06/2022: Introduce plugin system based on hashicorp/go-plugin
- 10/14/2022:
- Add
ListenCommit
, flatten the state writes in a block to a single batch. - Remove listeners from cache stores, should only listen to
rootmulti.Store
. - Remove
HaltAppOnDeliveryError()
, the errors are propagated by default, the implementations should return nil if don’t want to propogate errors.
- Add
Status
ProposedAbstract
This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers.Context
Currently, KVStore data can be remotely accessed through Queries which proceed either through Tendermint and the ABCI, or through the gRPC server. In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time.Decision
We will modify theCommitMultiStore
interface and its concrete (rootmulti
) implementations and introduce a new listenkv.Store
to allow listening to state changes in underlying KVStores. We don’t need to listen to cache stores, because we can’t be sure that the writes will be committed eventually, and the writes are duplicated in rootmulti.Store
eventually, so we should only listen to rootmulti.Store
.
We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations.
Listening
In a new file,store/types/listening.go
, we will create a MemoryListener
struct for streaming out protobuf encoded KV pairs state changes from a KVStore.
The MemoryListener
will be used internally by the concrete rootmulti
implementation to collect state changes from KVStores.
ListenKVStore
We will create a newStore
type listenkv.Store
that the rootmulti
store will use to wrap a KVStore
to enable state listening.
We will configure the Store
with a MemoryListener
which will collect state changes for output to specific destinations.
MultiStore interface updates
We will update theCommitMultiStore
interface to allow us to wrap a Memorylistener
to a specific KVStore
.
Note that the MemoryListener
will be attached internally by the concrete rootmulti
implementation.
MultiStore implementation updates
We will adjust therootmulti
GetKVStore
method to wrap the returned KVStore
with a listenkv.Store
if listening is turned on for that Store
.
AddListeners
to manage KVStore listeners internally and implement PopStateCache
for a means of retrieving the current state.
rootmulti
CacheMultiStore
and CacheMultiStoreWithVersion
methods to enable listening in
the cache layer.
Exposing the data
Streaming Service
We will introduce a newABCIListener
interface that plugs into the BaseApp and relays ABCI requests and responses
so that the service can group the state changes with the ABCI requests.
BaseApp Registration
We will add a new method to theBaseApp
to enable the registration of StreamingService
s:
BaseApp
struct:
ABCI Event Hooks
We will modify theBeginBlock
, EndBlock
, DeliverTx
and Commit
methods to pass ABCI requests and responses
to any streaming service hooks registered with the BaseApp
.
Go Plugin System
We propose a plugin architecture to load and runStreaming
plugins and other types of implementations. We will introduce a plugin
system over gRPC that is used to load and run Cosmos-SDK plugins. The plugin system uses hashicorp/go-plugin.
Each plugin must have a struct that implements the plugin.Plugin
interface and an Impl
interface for processing messages over gRPC.
Each plugin must also have a message protocol defined for the gRPC service:
plugin.Plugin
interface has two methods Client
and Server
. For our GRPC service these are GRPCClient
and GRPCServer
The Impl
field holds the concrete implementation of our baseapp.ABCIListener
interface written in Go.
Note: this is only used for plugin implementations written in Go.
The advantage of having such a plugin system is that within each plugin authors can define the message protocol in a way that fits their use case.
For example, when state change listening is desired, the ABCIListener
message protocol can be defined as below (for illustrative purposes only).
When state change listening is not desired than ListenCommit
can be omitted from the protocol.
Impl
(this is only used for plugins that are written in Go):
(interface{}, error)
.
This provides the advantage of using versioned plugins where the plugin interface and gRPC protocol change over time.
In addition, it allows for building independent plugin that can expose different parts of the system over gRPC.
RegisterStreamingPlugin
function for the App to register NewStreamingPlugin
s with the App’s BaseApp.
Streaming plugins can be of Any
type; therefore, the function takes in an interface vs a concrete type.
For example, we could have plugins of ABCIListener
, WasmListener
or IBCListener
. Note that RegisterStreamingPluing
function
is helper function and not a requirement. Plugin registration can easily be moved from the App to the BaseApp directly.
NewStreamingPlugin
and RegisterStreamingPlugin
functions are used to register a plugin with the App’s BaseApp.
e.g. in NewSimApp
:
Configuration
The plugin system will be configured within an App’s TOML configuration files.ABCIListener
plugin: streaming.abci.plugin
, streaming.abci.keys
, streaming.abci.async
and streaming.abci.stop-node-on-err
.
streaming.abci.plugin
is the name of the plugin we want to use for streaming, streaming.abci.keys
is a set of store keys for stores it listens to,
streaming.abci.async
is bool enabling asynchronous listening and streaming.abci.stop-node-on-err
is a bool that stops the node when true and when operating
on synchronized mode streaming.abci.async=false
. Note that streaming.abci.stop-node-on-err=true
will be ignored if streaming.abci.async=true
.
The configuration above support additional streaming plugins by adding the plugin to the [streaming]
configuration section
and registering the plugin with RegisterStreamingPlugin
helper function.
Note the that each plugin must include streaming.{service}.plugin
property as it is a requirement for doing the lookup and registration of the plugin
with the App. All other properties are unique to the individual services.
Encoding and decoding streams
ADR-038 introduces the interfaces and types for streaming state changes out from KVStores, associating this data with their related ABCI requests and responses, and registering a service for consuming this data and streaming it to some destination in a final format. Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format. We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example, the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic.Consequences
These changes will provide a means of subscribing to KVStore state changes in real time.Backwards Compatibility
- This ADR changes the
CommitMultiStore
interface, implementations supporting the previous version of this interface will not support the new one
Positive
- Ability to listen to KVStore state changes in real time and expose these events to external consumers
Negative
- Changes
CommitMultiStore
interface and its implementations
Neutral
- Introduces additional- but optional- complexity to configuring and running a cosmos application
- If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application