OpenBCI introduction¶
System architecture¶
Server, Broker and Peers¶
- Server
To use OpenBCI a single instance of Server must be started on each host you are planning to run the experiment on.
Server is started with
obci srv
command.Server is responsible for:
- managing experiments and peers from other experiments running on its machine
- autodiscovery of other OpenBCI experiments running on the same VLAN
- detection of amplifiers connected to host and broadcasting this information to other Servers
- Broker
Broker is started once per experiment. Broker acts as a message proxy for peers in a single experiment.
Every Peer connects to Broker on initialization.
- Peer
Peer is a worker unit, which communicates with other running peers and Broker.
Peer can be started with
obci run_peer peer.py
command, but it is strongly recommended to start peers automatically, as a part of experiment.Typical examples of peers are:
- EEG signal amplifiers
- EEG signal filters
- signal displays
Each peer has its own configuration file. It stores necessary parameters and defines dependencies on other peers in the system.
Experiment¶
To perform some useful work you need to create and run an experiment.
Experiment is a set of running Peers and a Broker. Each peer can
be identified by its assigned peer_id
, thus allowing for launching multiple
instances of the same peer.
Peer IDs, peer program paths and peer configurations are predefined in experiment definition file or scenario file*.
Experiment definition file is a text file for defining an experiment. It
contains a list of peers, assignments of peer_id
and paths to peer
configurations.
OpenBCI commands¶
Following commands are available from command line:
obci srv
- start OpenBCI Serverobci srv_kill
- shutdown OpenBCI Server and all experiments it is runningobci gui
- GUI for starting/stopping and monitoring experimentsobci launch
- launch experiment by specifying scenario fileobci kill
- shutdown OpenBCI experiment
- additional
--force
flag shutdowns experiment immediately, peers hove no chance to react and finalize their work
obci info
- display information about OpenBCI Servers and experiments running on current machine and on nearby hostsobci report_error
Show window for reporting error to Braintech.
New peer architecture¶
Peer
and Broker
derive from
BasePeer
class.
BasePeer
is derived from
ZmqAsyncioTaskManager
.
ZmqAsyncioTaskManager
is derived
from AsyncioTaskManager
.
AsyncioTaskManager
manages a set of
tasks running inside asyncio
message loop. Message loop can be owned by
AsyncioTaskManager
or borrowed. When
message loop is owned new message loop is started in a new thread.
ZmqAsyncioTaskManager
adds ZMQ
asyncio context lifetime management to
AsyncioTaskManager
. ZMQ context can be
borrowed or owned.
BasePeer
extends
ZmqAsyncioTaskManager
by adding
three ZMQ sockets:
- SUB used to receive broadcast messages, connects to Broker’s XPUB
- PUB used to send broadcast messages, connects to Broker’s XSUB
- REP used to answer synchronous messages (messages requiring an answer)
In ZMQ 3+ messages are filtered on PUB socket, so no redundant messages will be sent to clients.
Broker
extends
BasePeer
by adding:
- XPUB/XSUB message proxy
- centralized peer registration authority
- query authority
- heartbeat monitor
Peer
extends BasePeer
by
adding:
- initialization code/Broker connection code
- heartbeat message sending
Peer configuration¶
Peer configuration is stored in one or more files, in INI-like format. The
most important (and mandatory) config file is the one stored in the same
directory as the peer source code file. Its name is the same as the peer it
configures: for a peer implemented in file peer_a.py
you define
peer_a.ini
. This file is called basic config. Treat it as a part of
peer implementation. Basic config is always parsed and is the reference for
validation of any configuration overrides.
Configuration file contains four sections:
[config_sources]
[local_params]
[external_params]
[launch_dependencies]
Parameters defined in these sections can be overwritten in scenario file and/or in additional configuration file loaded at the scenario start.
[local_params]
is the most straightforward section. Here we define config
parameters which are owned by the peer – its properties. For an amplifier
such properties would be sampling rate or number of channels.:
[local_params]
sampling_rate = 128
number_of_channels = 8
super_important_param = 0
A peer may be bound to parameters from other peers. For example a
signal filter would need sampling rate of an amplifier from which it takes the
signal. Each peer is identified by peer_id
, which must be unique in the
scope of a running experiment. Configuration file though has to be usable in
many experiments, so we cannot hardcode exact peer ID’s there. Instead we
define symbolic names for peers in [config_sources]
section, from which
configured peer should take parameters.:
[config_sources]
signal_properties_source=
; or maybe just
amplifier=
Here we just define internal (for this peer) names of configuration sources, hence empty spaces after equals sign. In run-time real ``peer_id``s are assigned to these config sources using information stored in scenario file.
In a similar fashion we define [launch_dependencies]
. Launch dependencies
are peers we need to synchronize with: a peer will not start its actual work
until all the peers it depends on report that they are ready.:
[launch_dependencies]
amplifier=
Both config and launch dependencies’ names are visible in the scope of the
configuration file. So ‘amplifier’ from the example above is the same peer as
the one in [config_sources]
.
Now the [external_params]
section. Let’s say a peer needs a parameter
‘sampling_rate’ from some other peer which we symbolically named ‘amplifier’.
It will store the parameter as ‘amp_sampling_rate’. We can represent this as
follows:
[external_params]
amp_sampling_rate = amplifier.sampling_rate
All parameter names we define in a config file should be unique. You can move
parameters between [external_params]
and [local_params]
in config files
loaded after the basic config. This may be useful during module development
when we want to quickly test how the module works without loading other peers.
Also the other way may be useful if in some experiment we want to change the
way parameters are passed.
Config overriding¶
As mentioned above, you can pass custom configuration files to the peer. The
one restriction is that you cannot define any new parameter names that are not defined in
basic config. You can move a parameter form local to external section, add a
launch or config dependency. If a config dependency is not referenced in
[external_params]
configuration, peer will not require passing dependency’s
real peer_id
on launch.
Command line - peer invocation¶
Below is the somewhat messy usage help generated by the config processing module using argparse:
usage: some_peer.py peer_id [options]
positional arguments:
peer_id Unique name for this instance of this peer
optional arguments:
-h, --help show this help message and exit
-p LOCAL_PARAMS LOCAL_PARAMS, --local_params LOCAL_PARAMS LOCAL_PARAMS
Local parameter override value: param_name, value.
-e EXTERNAL_PARAMS EXTERNAL_PARAMS, --external_params EXTERNAL_PARAMS EXTERNAL_PARAMS
External parameter override value: param_name value_def .
-c CONFIG_SOURCES CONFIG_SOURCES, --config_sources CONFIG_SOURCES CONFIG_SOURCES
Config source ID assignment: src_name peer_id
-d LAUNCH_DEPENDENCIES LAUNCH_DEPENDENCIES, --launch_dependencies LAUNCH_DEPENDENCIES LAUNCH_DEPENDENCIES
Launch dependency ID assignment: dep_name peer_id
-f CONFIG_FILE, --config_file CONFIG_FILE
Additional configuration file (overrides):
path_to_file.
When starting a peer with config support you need to provide a peer_id for it and peer_ids of its launch/config dependecies. Custom config files – option -f. You can also override some parameters: using option -p / –local-params param_name value or -e / –external-params param_name value_definitions.
Assume we want to run two peers: peer_a.py
with default configuration
(defined in peer_a.ini in source directory)
[config_sources]
amp1_signal=
peerb=
[launch_dependencies]
peerb=
[external_params]
ext_txt = peerb.text
[local_params]
my_param = 1234
p = some text here
and peer_b.py
with configuration peer_b.ini
[config_sources]
some_peer=
[external_params]
ext_p = some_peer.p
[launch_dependencies]
[local_params]
text = text text tralala
peer_a
takes parameter ‘text’ from a peer ‘peerb’, and peer_b
takes
parameter ‘p’ from a peer ‘some_peer’. Peer_a also waits for ‘peerb’ readiness.
We want them to take those parameters from each other.
Invocation of those peers with just assigning them peer_id’s would look like this:
python peer_a.py i_am_roger -c peerb sue
python peer_b.py sue -c some_peer i_am_roger
We do not need to provide launch dependency ID for peer_a because this time it’s the same peer as in config dependencies - assignment will be automatic.
Programming OpenBCI peers with configuration support¶
To enable configuration processing in a peer, your peer must inherit
ConfiguredPeer
class:
from obci.core.configured_peer import ConfiguredPeer
class MyPeer(ConfiguredPeer)
Then you can use variables defined in config to initialize your peer in
Peer._connections_established
coroutine:
class MyPeer(ConfiguredPeer):
async def _connections_established(self):
await super()._connections_established()
# use variables from config: ::
self.needed_to_work = self.config.get_param('important_setting') # here we read parameter from configuration
# Letting everyone know that MyPeer is configured and ready to work.
# Other peers which have MyPeer in launch_dependencies will wait on their :meth:`await self.ready()`
# until MyPeer invokes next line:
await self.ready()
alternatively you can use param_property as class variable:
class MyPeer(ConfiguredPeer):
wait_time = param_property("wait_time", float) # declaration that we want to read config parameter "wait_time" as float
async def _connections_established(self):
await super()._connections_established()
# use variables from config: ::
if self.wait_time is not None: # if the wait_time parameter was left empty in the config it will change into None
time.sleep(self.wait_time)
# Letting everyone know that MyPeer is configured and ready to work.
# Other peers which have MyPeer in launch_dependencies will wait on their :meth:`await self.ready()`
# until MyPeer invokes next line:
await self.ready()
Messages¶
BaseMessage
is a base class for all OBCI messages.
Every message should inherit this class. Every message has a :attribute:`sender` field.
To add more fields to your message you have to define them using a Field
class:
class BrokerHelloMsg(BaseMessage):
__TYPE__ = 'BROKER_HELLO'
peer_url = Field(List[str], str)
broker_url = Field(str)
Field
class accepts a number of arguments: accepted types of this field. If you add None
to the types it will mean that this field is optional. During instantialization of the message fields are checked
for validity. All types should be serializable to JSON (the default serializer). If you need to have a message with
custom data you should provide serialize_data()
and deserialize_data()
methods.
__TYPE__ is an optional attribute which you can provide if you need specific network header in the message (ex. external Peers in other langueges, etc). If you don’t provide it __TYPE__ will be autogenerated by converting message class name from CamelCase to snake_case.
eg: BrokerHelloMsg would be: broker_hello_msg
On the networking level every message is sent in 2 parts as ZMQ message/packet. First part is the header (ASCII): type_string^sender_id_string^, second part is the serialized data (JSON or your custom serialized data).
OBCI peers can send any message in one of two modes:
- broadcast mode - message is sent using
Peer.send_message
and delivered only to Peers that subscribed to such messages. Peer.query
- direct mode - message is send directly (using REQ-REP sockets) to some other peer or Broker.
Peers can subscribe to message type and register a handler for it by calling
Peer.subscribe_for_specific_msg_subtype
method,
Peer.subscribe_for_all_msg_subtype
method
or by decorating handler method with subscribe_message_handler()
.
Technically message is send using PUB socket connected to Broker’s XSUB socket and can received by SUB sockets inside peers that connect to Broker’s XPUB.
OBCI programming and directory guide¶
Peers¶
Every Peer (that is a child of ConfiguredPeer
or Peer
)
which does some useful work is contained in obci.peers
module.
File containing peer class code should be snake_case_named
after the peer it provides. Peer classes should use
CamelCase
with last word being Peer.
Example:
We have Peer which does something really important:
class VeryImportantPeer(Peer):
pass
Which should be placed in appropriate sub-module inside module obci.peers
inside file named:
very_important_peer.py
Additionally such module should export that peer using __all__
mechanism, at the start of file there should be code:
__all__ = ('VeryImportantPeer', )
In order to extend peer functionality you have to do the following:
- Create
Peer
subclass
- Override
__init__()
to perform some basic initialization, without outside communication- Override (async)
_connection_established()
to perform custom initialization, which might take some time ie. communicate with other peers (like inConfiguredPeer
), or initialize resources. After this method finishes, peer is considered to be ready to work. If your additional configuration requires talking to other peers, especialy own launch_dependencies, you should use_dependencies_are_ready()
.- Override (async)
_shutting_down()
to perform long cleaunup task ie. inform other peers about something, send messages.- Override
_cleanup()
free up acquired resources.- Override (async)
_start()
to initializePeer
main task (ie start generating and sending data or save data to file)- Override (async)
_stop()
to stopPeer
main task (ie turn off amplifier, close file on disk)
- Write your message handlers methods
- message handlers should be coroutines
- message handlers for queries must return response messages
- message handlers can use (async)
_send_message()
for immediate message sending (send_message()
puts messages on the queue)
- Register message handlers and subscribe
- You should register message handler for message types you want to receive by using
register_message_handler()
decorator on handler method. After that you will be able to handle sync (your method must return response message) and async messages- In order to receive all async messages of that type messages you should use
subscribe_message_handler()
on handler method- If you only want to receive messages from certain peer you have to use
subscribe_for_specific_msg_subtype()
method in_connections_established()
You can also use peers without subclassing, just instantiate it, wait for connection and use methods:
- register_message_handler()
- subscribe()
- send_message()
- to send messages through pub and sub
- create_task()
- to run background asyncio tasks
See Peer States for more info.
Example:
Simple peer which averages signal across channels:
class AveragingPeer(ConfiguredPeer):
async def _connections_established(self):
await super()._connections_established()
# subscribe to signal going from amplifier peer
# for every SignalMessage self.signal_message_handler function will be called
self.subscribe_for_specific_msg_subtype(SignalMessage, 'amplifier', self.signal_message_handler)
await self.ready()
async def signal_message_handler(self, msg):
input = msg.data # retrieve SamplePacket
# create new SamplePacket averaged across channels
output = SamplePacket(ts=input.ts, samples=numpy.mean(input.samples, axis=1, keepdims=True))
# send created SamplePacket in a SignalMessage
msg = SignalMessage(data=output)
await self._send_message(msg) # every peer subscribed to AveragingPeers SignalMessages will receive this
# new message and could for example display it
# such peer could be running on a different computer.
Amplifier drivers¶
Drivers for amplifiers are split between amplifier classes, which derive from
EEGAmplifier
, and amplifier peers which
utilize those classes derive from AmplifierPeer
.
Drivers should be placed inside obci.drivers.eeg
module and corresponding peers
inside obci.peers.drivers.amplifiers
module.
If your driver implements EEGAmplifier
API and doesn’t need any
additional external parameters it is very likely that code for the peer will very concise:
from obci.peers.drivers.amplifiers.amplifier_peer import AmplifierPeer
from obci.drivers.eeg.my_great_amp import MyGreatAmplifier
__all__ = ('MyGreatAmplifierPeer',)
class MyGreatAmplifierPeer(AmplifierPeer):
AmplifierClass = RandomAmplifier
AmplifierPeer class has its own underlying mechanisms to retrieve samples from EEGAmplifier type classes and send those samples as messages.
OBCI driver discovery¶
To make your driver discoverable by OBCI driver discovery (i.e. visible in Svarog) you should do following:
- add amplifier class inside
obci.drivers.eeg.driver_discovery.get_amp_classes_defs()
- additionally add path to the amplifier peer
- add path to the template scenario which can run your amplifier
Scenarios¶
Scenarios internally for OBCI should abide by these rules:
- Scenarios for amplifiers and peers should be located inside obci/scenarios/
- Scenarios which run only amplifiers in different configurations
(ex to be viewed in Svarog) should be placed inobci/scenarios/amplifier/amp_name/
- Scenarios which save data from your amplifier should be placed in
obci/scenarios/acquisition/amp_name/
- If you want those scenarios to be visible in obci_gui you should edit
obci/control/gui/presets/default.ini
file appropriately.
For example scenario which saves signal from TMSI amplifier connected to USB port:
[peers]
scenario_dir=
;***********************************************
;***********************************************
[peers.config_server]
path=peers/control/config_server.py
;***********************************************
; here peers.SOMETHING - SOMETHING is the peer_id for the loaded peer.
[peers.amplifier]
path=peers/drivers/amplifiers/tmsi_amplifier_peer.py
;***********************************************
[peers.signal_saver]
; path to peers are looked up:
; - first: peer .py files:
; * first in the main obci directory
; * directory relative to scenario location
; * global path (including ~ expansion)
; - next: importable Python 3 path:
; * like this: path=obci.peers.acquisition.signal_saver_peer
path=peers/acquisition/signal_saver_peer.py
[peers.signal_saver.launch_dependencies]
; signal saver has external params depending on signal_source
; signal source could be any peer, in this case it is tmsi_amplifier_peer
signal_source=amplifier