Server, Broker and Peers¶
To use OpenBCI a single instance of Server must be started on each host you are planning to run the experiment on.
Server is started with
Server is responsible for:
- managing experiments and peers from other experiments running on its machine
- autodiscovery of other OpenBCI experiments running on the same VLAN
- detection of amplifiers connected to host and broadcasting this information to other Servers
Broker is started once per experiment. Broker acts as a message proxy for peers in a single experiment.
Every Peer connects to Broker on initialization.
Peer is a worker unit, which communicates with other running peers and Broker.
Peer can be started with
obci run_peer peer.pycommand, but it is strongly recommended to start peers automatically, as a part of experiment.
Typical examples of peers are:
- EEG signal amplifiers
- EEG signal filters
- signal displays
Each peer has its own configuration file. It stores necessary parameters and defines dependencies on other peers in the system.
To perform some useful work you need to create and run an experiment.
Experiment is a set of running Peers and a Broker. Each peer can
be identified by its assigned
peer_id, thus allowing for launching multiple
instances of the same peer.
Peer IDs, peer program paths and peer configurations are predefined in experiment definition file or scenario file*.
Experiment definition file is a text file for defining an experiment. It
contains a list of peers, assignments of
peer_id and paths to peer
Following commands are available from command line:
obci srv- start OpenBCI Server
obci srv_kill- shutdown OpenBCI Server and all experiments it is running
obci gui- GUI for starting/stopping and monitoring experiments
obci launch- launch experiment by specifying scenario file
obci kill- shutdown OpenBCI experiment
--forceflag shutdowns experiment immediately, peers hove no chance to react and finalize their work
obci info- display information about OpenBCI Servers and experiments running on current machine and on nearby hosts
obci report_errorShow window for reporting error to Braintech.
New peer architecture¶
Broker derive from
BasePeer is derived from
ZmqAsyncioTaskManager is derived
AsyncioTaskManager manages a set of
tasks running inside
asyncio message loop. Message loop can be owned by
AsyncioTaskManager or borrowed. When
message loop is owned new message loop is started in a new thread.
ZmqAsyncioTaskManager adds ZMQ
asyncio context lifetime management to
AsyncioTaskManager. ZMQ context can be
borrowed or owned.
ZmqAsyncioTaskManager by adding
three ZMQ sockets:
- SUB used to receive broadcast messages, connects to Broker’s XPUB
- PUB used to send broadcast messages, connects to Broker’s XSUB
- REP used to answer synchronous messages (messages requiring an answer)
In ZMQ 3+ messages are filtered on PUB socket, so no redundant messages will be sent to clients.
BasePeer by adding:
- XPUB/XSUB message proxy
- centralized peer registration authority
- query authority
- heartbeat monitor
- initialization code/Broker connection code
- heartbeat message sending
Peer configuration is stored in one or more files, in INI-like format. The
most important (and mandatory) config file is the one stored in the same
directory as the peer source code file. Its name is the same as the peer it
configures: for a peer implemented in file
peer_a.py you define
peer_a.ini. This file is called basic config. Treat it as a part of
peer implementation. Basic config is always parsed and is the reference for
validation of any configuration overrides.
Configuration file contains four sections:
Parameters defined in these sections can be overwritten in scenario file and/or in additional configuration file loaded at the scenario start.
[local_params] is the most straightforward section. Here we define config
parameters which are owned by the peer – its properties. For an amplifier
such properties would be sampling rate or number of channels.:
[local_params] sampling_rate = 128 number_of_channels = 8 super_important_param = 0
A peer may be bound to parameters from other peers. For example a
signal filter would need sampling rate of an amplifier from which it takes the
signal. Each peer is identified by
peer_id, which must be unique in the
scope of a running experiment. Configuration file though has to be usable in
many experiments, so we cannot hardcode exact peer ID’s there. Instead we
define symbolic names for peers in
[config_sources] section, from which
configured peer should take parameters.:
[config_sources] signal_properties_source= ; or maybe just amplifier=
Here we just define internal (for this peer) names of configuration sources, hence empty spaces after equals sign. In run-time real ``peer_id``s are assigned to these config sources using information stored in scenario file.
In a similar fashion we define
[launch_dependencies]. Launch dependencies
are peers we need to synchronize with: a peer will not start its actual work
until all the peers it depends on report that they are ready.:
Both config and launch dependencies’ names are visible in the scope of the
configuration file. So ‘amplifier’ from the example above is the same peer as
the one in
[external_params] section. Let’s say a peer needs a parameter
‘sampling_rate’ from some other peer which we symbolically named ‘amplifier’.
It will store the parameter as ‘amp_sampling_rate’. We can represent this as
[external_params] amp_sampling_rate = amplifier.sampling_rate
All parameter names we define in a config file should be unique. You can move
[local_params] in config files
loaded after the basic config. This may be useful during module development
when we want to quickly test how the module works without loading other peers.
Also the other way may be useful if in some experiment we want to change the
way parameters are passed.
As mentioned above, you can pass custom configuration files to the peer. The
one restriction is that you cannot define any new parameter names that are not defined in
basic config. You can move a parameter form local to external section, add a
launch or config dependency. If a config dependency is not referenced in
[external_params] configuration, peer will not require passing dependency’s
peer_id on launch.
Command line - peer invocation¶
Below is the somewhat messy usage help generated by the config processing module using argparse:
usage: some_peer.py peer_id [options] positional arguments: peer_id Unique name for this instance of this peer optional arguments: -h, --help show this help message and exit -p LOCAL_PARAMS LOCAL_PARAMS, --local_params LOCAL_PARAMS LOCAL_PARAMS Local parameter override value: param_name, value. -e EXTERNAL_PARAMS EXTERNAL_PARAMS, --external_params EXTERNAL_PARAMS EXTERNAL_PARAMS External parameter override value: param_name value_def . -c CONFIG_SOURCES CONFIG_SOURCES, --config_sources CONFIG_SOURCES CONFIG_SOURCES Config source ID assignment: src_name peer_id -d LAUNCH_DEPENDENCIES LAUNCH_DEPENDENCIES, --launch_dependencies LAUNCH_DEPENDENCIES LAUNCH_DEPENDENCIES Launch dependency ID assignment: dep_name peer_id -f CONFIG_FILE, --config_file CONFIG_FILE Additional configuration file (overrides): path_to_file.
When starting a peer with config support you need to provide a peer_id for it and peer_ids of its launch/config dependecies. Custom config files – option -f. You can also override some parameters: using option -p / –local-params param_name value or -e / –external-params param_name value_definitions.
Assume we want to run two peers:
peer_a.py with default configuration
(defined in peer_a.ini in source directory)
[config_sources] amp1_signal= peerb= [launch_dependencies] peerb= [external_params] ext_txt = peerb.text [local_params] my_param = 1234 p = some text here
peer_b.py with configuration peer_b.ini
[config_sources] some_peer= [external_params] ext_p = some_peer.p [launch_dependencies] [local_params] text = text text tralala
peer_a takes parameter ‘text’ from a peer ‘peerb’, and
parameter ‘p’ from a peer ‘some_peer’. Peer_a also waits for ‘peerb’ readiness.
We want them to take those parameters from each other.
Invocation of those peers with just assigning them peer_id’s would look like this:
python peer_a.py i_am_roger -c peerb sue python peer_b.py sue -c some_peer i_am_roger
We do not need to provide launch dependency ID for peer_a because this time it’s the same peer as in config dependencies - assignment will be automatic.
Programming OpenBCI peers with configuration support¶
To enable configuration processing in a peer, your peer must inherit
from obci.core.configured_peer import ConfiguredPeer class MyPeer(ConfiguredPeer)
Then you can use variables defined in config to initialize your peer in
class MyPeer(ConfiguredPeer): async def _connections_established(self): await super()._connections_established() # use variables from config: :: self.needed_to_work = self.config.get_param('important_setting') # here we read parameter from configuration # Letting everyone know that MyPeer is configured and ready to work. # Other peers which have MyPeer in launch_dependencies will wait on their :meth:`await self.ready()` # until MyPeer invokes next line: await self.ready()
alternatively you can use param_property as class variable:
class MyPeer(ConfiguredPeer): wait_time = param_property("wait_time", float) # declaration that we want to read config parameter "wait_time" as float async def _connections_established(self): await super()._connections_established() # use variables from config: :: if self.wait_time is not None: # if the wait_time parameter was left empty in the config it will change into None time.sleep(self.wait_time) # Letting everyone know that MyPeer is configured and ready to work. # Other peers which have MyPeer in launch_dependencies will wait on their :meth:`await self.ready()` # until MyPeer invokes next line: await self.ready()
BaseMessage is a base class for all OBCI messages.
Every message should inherit this class. Every message has a :attribute:`sender` field.
To add more fields to your message you have to define them using a
class BrokerHelloMsg(BaseMessage): __TYPE__ = 'BROKER_HELLO' peer_url = Field(List[str], str) broker_url = Field(str)
Field class accepts a number of arguments: accepted types of this field. If you add None
to the types it will mean that this field is optional. During instantialization of the message fields are checked
for validity. All types should be serializable to JSON (the default serializer). If you need to have a message with
custom data you should provide
__TYPE__ is an optional attribute which you can provide if you need specific network header in the message (ex. external Peers in other langueges, etc). If you don’t provide it __TYPE__ will be autogenerated by converting message class name from CamelCase to snake_case.
eg: BrokerHelloMsg would be: broker_hello_msg
On the networking level every message is sent in 2 parts as ZMQ message/packet. First part is the header (ASCII): type_string^sender_id_string^, second part is the serialized data (JSON or your custom serialized data).
OBCI peers can send any message in one of two modes:
- broadcast mode - message is sent using
Peer.send_messageand delivered only to Peers that subscribed to such messages.
Peer.query- direct mode - message is send directly (using REQ-REP sockets) to some other peer or Broker.
Peers can subscribe to message type and register a handler for it by calling
or by decorating handler method with
Technically message is send using PUB socket connected to Broker’s XSUB socket and can received by SUB sockets inside peers that connect to Broker’s XPUB.
OBCI programming and directory guide¶
Every Peer (that is a child of
which does some useful work is contained in
File containing peer class code should be
snake_case_named after the peer it provides. Peer classes should use
CamelCase with last word being Peer.
We have Peer which does something really important:
class VeryImportantPeer(Peer): pass
Which should be placed in appropriate sub-module inside module
obci.peers inside file named:
Additionally such module should export that peer using
__all__ mechanism, at the start of file there should be code:
__all__ = ('VeryImportantPeer', )
In order to extend peer functionality you have to do the following:
__init__()to perform some basic initialization, without outside communication
- Override (async)
_connection_established()to perform custom initialization, which might take some time ie. communicate with other peers (like in
ConfiguredPeer), or initialize resources. After this method finishes, peer is considered to be ready to work. If your additional configuration requires talking to other peers, especialy own launch_dependencies, you should use
- Override (async)
_shutting_down()to perform long cleaunup task ie. inform other peers about something, send messages.
_cleanup()free up acquired resources.
- Override (async)
Peermain task (ie start generating and sending data or save data to file)
- Override (async)
Peermain task (ie turn off amplifier, close file on disk)
- Write your message handlers methods
- message handlers should be coroutines
- message handlers for queries must return response messages
- message handlers can use (async)
_send_message()for immediate message sending (
send_message()puts messages on the queue)
- Register message handlers and subscribe
- You should register message handler for message types you want to receive by using
register_message_handler()decorator on handler method. After that you will be able to handle sync (your method must return response message) and async messages
- In order to receive all async messages of that type messages you should use
subscribe_message_handler()on handler method
- If you only want to receive messages from certain peer you have to use
You can also use peers without subclassing, just instantiate it, wait for connection and use methods:
send_message() - to send messages through pub and sub
create_task() - to run background asyncio tasks
See Peer States for more info.
Simple peer which averages signal across channels:
class AveragingPeer(ConfiguredPeer): async def _connections_established(self): await super()._connections_established() # subscribe to signal going from amplifier peer # for every SignalMessage self.signal_message_handler function will be called self.subscribe_for_specific_msg_subtype(SignalMessage, 'amplifier', self.signal_message_handler) await self.ready() async def signal_message_handler(self, msg): input = msg.data # retrieve SamplePacket # create new SamplePacket averaged across channels output = SamplePacket(ts=input.ts, samples=numpy.mean(input.samples, axis=1, keepdims=True)) # send created SamplePacket in a SignalMessage msg = SignalMessage(data=output) await self._send_message(msg) # every peer subscribed to AveragingPeers SignalMessages will receive this # new message and could for example display it # such peer could be running on a different computer.
Drivers for amplifiers are split between amplifier classes, which derive from
EEGAmplifier, and amplifier peers which
utilize those classes derive from
Drivers should be placed inside
obci.drivers.eeg module and corresponding peers
If your driver implements
EEGAmplifier API and doesn’t need any
additional external parameters it is very likely that code for the peer will very concise:
from obci.peers.drivers.amplifiers.amplifier_peer import AmplifierPeer from obci.drivers.eeg.my_great_amp import MyGreatAmplifier __all__ = ('MyGreatAmplifierPeer',) class MyGreatAmplifierPeer(AmplifierPeer): AmplifierClass = RandomAmplifier
AmplifierPeer class has its own underlying mechanisms to retrieve samples from EEGAmplifier type classes and send those samples as messages.
OBCI driver discovery¶
To make your driver discoverable by OBCI driver discovery (i.e. visible in Svarog) you should do following:
- add amplifier class inside
- additionally add path to the amplifier peer
- add path to the template scenario which can run your amplifier
Scenarios internally for OBCI should abide by these rules:
- Scenarios for amplifiers and peers should be located inside
- Scenarios which run only amplifiers in different configurations
(ex to be viewed in Svarog) should be placed in
- Scenarios which save data from your amplifier should be placed in
- If you want those scenarios to be visible in obci_gui you should edit
For example scenario which saves signal from TMSI amplifier connected to USB port:
[peers] scenario_dir= ;*********************************************** ;*********************************************** [peers.config_server] path=peers/control/config_server.py ;*********************************************** ; here peers.SOMETHING - SOMETHING is the peer_id for the loaded peer. [peers.amplifier] path=peers/drivers/amplifiers/tmsi_amplifier_peer.py ;*********************************************** [peers.signal_saver] ; path to peers are looked up: ; - first: peer .py files: ; * first in the main obci directory ; * directory relative to scenario location ; * global path (including ~ expansion) ; - next: importable Python 3 path: ; * like this: path=obci.peers.acquisition.signal_saver_peer path=peers/acquisition/signal_saver_peer.py [peers.signal_saver.launch_dependencies] ; signal saver has external params depending on signal_source ; signal source could be any peer, in this case it is tmsi_amplifier_peer signal_source=amplifier