OpenBCI introduction

System architecture

OpenBCI system architecture diagram

Server, Broker and Peers

Server

To use OpenBCI a single instance of Server must be started on each host you are planning to run the experiment on.

Server is started with obci srv command.

Server is responsible for:

  • managing experiments and peers from other experiments running on its machine
  • autodiscovery of other OpenBCI experiments running on the same VLAN
  • detection of amplifiers connected to host and broadcasting this information to other Servers
Broker

Broker is started once per experiment. Broker acts as a message proxy for peers in a single experiment.

Every Peer connects to Broker on initialization.

Peer

Peer is a worker unit, which communicates with other running peers and Broker.

Peer can be started with obci run_peer peer.py command, but it is strongly recommended to start peers automatically, as a part of experiment.

Typical examples of peers are:

  • EEG signal amplifiers
  • EEG signal filters
  • signal displays

Each peer has its own configuration file. It stores necessary parameters and defines dependencies on other peers in the system.

Experiment

To perform some useful work you need to create and run an experiment.

Experiment is a set of running Peers and a Broker. Each peer can be identified by its assigned peer_id, thus allowing for launching multiple instances of the same peer.

Peer IDs, peer program paths and peer configurations are predefined in experiment definition file or scenario file*.

Experiment definition file is a text file for defining an experiment. It contains a list of peers, assignments of peer_id and paths to peer configurations.

OpenBCI commands

Following commands are available from command line:

  • obci srv - start OpenBCI Server
  • obci srv_kill - shutdown OpenBCI Server and all experiments it is running
  • obci gui - GUI for starting/stopping and monitoring experiments
  • obci launch - launch experiment by specifying scenario file
  • obci kill - shutdown OpenBCI experiment
  • additional --force flag shutdowns experiment immediately, peers hove no chance to react and finalize their work
  • obci info - display information about OpenBCI Servers and experiments running on current machine and on nearby hosts
  • obci report_error Show window for reporting error to Braintech.

New peer architecture

Peer and Broker derive from BasePeer class. BasePeer is derived from ZmqAsyncioTaskManager. ZmqAsyncioTaskManager is derived from AsyncioTaskManager.

AsyncioTaskManager manages a set of tasks running inside asyncio message loop. Message loop can be owned by AsyncioTaskManager or borrowed. When message loop is owned new message loop is started in a new thread.

ZmqAsyncioTaskManager adds ZMQ asyncio context lifetime management to AsyncioTaskManager. ZMQ context can be borrowed or owned.

BasePeer extends ZmqAsyncioTaskManager by adding three ZMQ sockets:

  • SUB used to receive broadcast messages, connects to Broker’s XPUB
  • PUB used to send broadcast messages, connects to Broker’s XSUB
  • REP used to answer synchronous messages (messages requiring an answer)

In ZMQ 3+ messages are filtered on PUB socket, so no redundant messages will be sent to clients.

Broker extends BasePeer by adding:

  • XPUB/XSUB message proxy
  • centralized peer registration authority
  • query authority
  • heartbeat monitor

Peer extends BasePeer by adding:

  • initialization code/Broker connection code
  • heartbeat message sending

Peer configuration

Peer configuration is stored in one or more files, in INI-like format. The most important (and mandatory) config file is the one stored in the same directory as the peer source code file. Its name is the same as the peer it configures: for a peer implemented in file peer_a.py you define peer_a.ini. This file is called basic config. Treat it as a part of peer implementation. Basic config is always parsed and is the reference for validation of any configuration overrides.

Configuration file contains four sections:

  • [config_sources]
  • [local_params]
  • [external_params]
  • [launch_dependencies]

Parameters defined in these sections can be overwritten in scenario file and/or in additional configuration file loaded at the scenario start.

[local_params] is the most straightforward section. Here we define config parameters which are owned by the peer – its properties. For an amplifier such properties would be sampling rate or number of channels.:

[local_params]

sampling_rate = 128
number_of_channels = 8
super_important_param = 0

A peer may be bound to parameters from other peers. For example a signal filter would need sampling rate of an amplifier from which it takes the signal. Each peer is identified by peer_id, which must be unique in the scope of a running experiment. Configuration file though has to be usable in many experiments, so we cannot hardcode exact peer ID’s there. Instead we define symbolic names for peers in [config_sources] section, from which configured peer should take parameters.:

[config_sources]
signal_properties_source=
; or maybe just
amplifier=

Here we just define internal (for this peer) names of configuration sources, hence empty spaces after equals sign. In run-time real ``peer_id``s are assigned to these config sources using information stored in scenario file.

In a similar fashion we define [launch_dependencies]. Launch dependencies are peers we need to synchronize with: a peer will not start its actual work until all the peers it depends on report that they are ready.:

[launch_dependencies]
amplifier=

Both config and launch dependencies’ names are visible in the scope of the configuration file. So ‘amplifier’ from the example above is the same peer as the one in [config_sources].

Now the [external_params] section. Let’s say a peer needs a parameter ‘sampling_rate’ from some other peer which we symbolically named ‘amplifier’. It will store the parameter as ‘amp_sampling_rate’. We can represent this as follows:

[external_params]
amp_sampling_rate = amplifier.sampling_rate

All parameter names we define in a config file should be unique. You can move parameters between [external_params] and [local_params] in config files loaded after the basic config. This may be useful during module development when we want to quickly test how the module works without loading other peers. Also the other way may be useful if in some experiment we want to change the way parameters are passed.

Config overriding

As mentioned above, you can pass custom configuration files to the peer. The one restriction is that you cannot define any new parameter names that are not defined in basic config. You can move a parameter form local to external section, add a launch or config dependency. If a config dependency is not referenced in [external_params] configuration, peer will not require passing dependency’s real peer_id on launch.

Command line - peer invocation

Below is the somewhat messy usage help generated by the config processing module using argparse:

usage: some_peer.py peer_id [options]

positional arguments:
  peer_id               Unique name for this instance of this peer

optional arguments:
  -h, --help            show this help message and exit
  -p LOCAL_PARAMS LOCAL_PARAMS, --local_params LOCAL_PARAMS LOCAL_PARAMS
                        Local parameter override value: param_name, value.
  -e EXTERNAL_PARAMS EXTERNAL_PARAMS, --external_params EXTERNAL_PARAMS EXTERNAL_PARAMS
                        External parameter override value: param_name value_def .
  -c CONFIG_SOURCES CONFIG_SOURCES, --config_sources CONFIG_SOURCES CONFIG_SOURCES
                        Config source ID assignment: src_name peer_id
  -d LAUNCH_DEPENDENCIES LAUNCH_DEPENDENCIES, --launch_dependencies LAUNCH_DEPENDENCIES LAUNCH_DEPENDENCIES
                        Launch dependency ID assignment: dep_name peer_id
  -f CONFIG_FILE, --config_file CONFIG_FILE
                        Additional configuration file (overrides):
                        path_to_file.

When starting a peer with config support you need to provide a peer_id for it and peer_ids of its launch/config dependecies. Custom config files – option -f. You can also override some parameters: using option -p / –local-params param_name value or -e / –external-params param_name value_definitions.

Assume we want to run two peers: peer_a.py with default configuration (defined in peer_a.ini in source directory)

[config_sources]
amp1_signal=
peerb=

[launch_dependencies]
peerb=

[external_params]
ext_txt = peerb.text

[local_params]
my_param = 1234
p = some text here

and peer_b.py with configuration peer_b.ini

[config_sources]
some_peer=

[external_params]
ext_p = some_peer.p

[launch_dependencies]

[local_params]
text = text text tralala

peer_a takes parameter ‘text’ from a peer ‘peerb’, and peer_b takes parameter ‘p’ from a peer ‘some_peer’. Peer_a also waits for ‘peerb’ readiness. We want them to take those parameters from each other.

Invocation of those peers with just assigning them peer_id’s would look like this:

python peer_a.py i_am_roger -c peerb sue
python peer_b.py sue -c some_peer i_am_roger

We do not need to provide launch dependency ID for peer_a because this time it’s the same peer as in config dependencies - assignment will be automatic.

Programming OpenBCI peers with configuration support

To enable configuration processing in a peer, your peer must inherit ConfiguredPeer class:

from obci.core.configured_peer import ConfiguredPeer
class MyPeer(ConfiguredPeer)

Then you can use variables defined in config to initialize your peer in Peer._connections_established coroutine:

class MyPeer(ConfiguredPeer):
async def _connections_established(self):
    await super()._connections_established()

    # use variables from config: ::

    self.needed_to_work = self.config.get_param('important_setting')  # here we read parameter from configuration

    # Letting everyone know that MyPeer is configured and ready to work.
    # Other peers which have MyPeer in launch_dependencies will wait on their :meth:`await self.ready()`
    # until MyPeer invokes next line:

    await self.ready()

alternatively you can use param_property as class variable:

class MyPeer(ConfiguredPeer):

wait_time = param_property("wait_time", float)  # declaration that we want to read config parameter "wait_time" as float

async def _connections_established(self):
    await super()._connections_established()

    # use variables from config: ::
    if self.wait_time is not None:  # if the wait_time parameter was left empty in the config it will change into None
        time.sleep(self.wait_time)

    # Letting everyone know that MyPeer is configured and ready to work.
    # Other peers which have MyPeer in launch_dependencies will wait on their :meth:`await self.ready()`
    # until MyPeer invokes next line:

    await self.ready()

Messages

BaseMessage is a base class for all OBCI messages. Every message should inherit this class. Every message has a :attribute:`sender` field. To add more fields to your message you have to define them using a Field class:

class BrokerHelloMsg(BaseMessage):
     __TYPE__ = 'BROKER_HELLO'
     peer_url = Field(List[str], str)
     broker_url = Field(str)

Field class accepts a number of arguments: accepted types of this field. If you add None to the types it will mean that this field is optional. During instantialization of the message fields are checked for validity. All types should be serializable to JSON (the default serializer). If you need to have a message with custom data you should provide serialize_data() and deserialize_data() methods.

__TYPE__ is an optional attribute which you can provide if you need specific network header in the message (ex. external Peers in other langueges, etc). If you don’t provide it __TYPE__ will be autogenerated by converting message class name from CamelCase to snake_case.

eg: BrokerHelloMsg would be: broker_hello_msg

On the networking level every message is sent in 2 parts as ZMQ message/packet. First part is the header (ASCII): type_string^sender_id_string^, second part is the serialized data (JSON or your custom serialized data).

OBCI peers can send any message in one of two modes:

  • broadcast mode - message is sent using Peer.send_message and delivered only to Peers that subscribed to such messages.
  • Peer.query - direct mode - message is send directly (using REQ-REP sockets) to some other peer or Broker.

Peers can subscribe to message type and register a handler for it by calling Peer.subscribe_for_specific_msg_subtype method, Peer.subscribe_for_all_msg_subtype method or by decorating handler method with subscribe_message_handler().

Technically message is send using PUB socket connected to Broker’s XSUB socket and can received by SUB sockets inside peers that connect to Broker’s XPUB.

OBCI programming and directory guide

Peers

Every Peer (that is a child of ConfiguredPeer or Peer) which does some useful work is contained in obci.peers module.

File containing peer class code should be snake_case_named after the peer it provides. Peer classes should use CamelCase with last word being Peer.

Example:

We have Peer which does something really important:

class VeryImportantPeer(Peer):
    pass

Which should be placed in appropriate sub-module inside module obci.peers inside file named:

very_important_peer.py

Additionally such module should export that peer using __all__ mechanism, at the start of file there should be code:

__all__ = ('VeryImportantPeer', )

In order to extend peer functionality you have to do the following:

  1. Create Peer subclass
  • Override __init__() to perform some basic initialization, without outside communication
  • Override (async) _connection_established() to perform custom initialization, which might take some time ie. communicate with other peers (like in ConfiguredPeer), or initialize resources. After this method finishes, peer is considered to be ready to work. If your additional configuration requires talking to other peers, especialy own launch_dependencies, you should use _dependencies_are_ready().
  • Override (async) _shutting_down() to perform long cleaunup task ie. inform other peers about something, send messages.
  • Override _cleanup() free up acquired resources.
  • Override (async) _start() to initialize Peer main task (ie start generating and sending data or save data to file)
  • Override (async) _stop() to stop Peer main task (ie turn off amplifier, close file on disk)
  1. Write your message handlers methods
  • message handlers should be coroutines
  • message handlers for queries must return response messages
  • message handlers can use (async) _send_message() for immediate message sending (send_message() puts messages on the queue)
  1. Register message handlers and subscribe
  • You should register message handler for message types you want to receive by using register_message_handler() decorator on handler method. After that you will be able to handle sync (your method must return response message) and async messages
  • In order to receive all async messages of that type messages you should use subscribe_message_handler() on handler method
  • If you only want to receive messages from certain peer you have to use subscribe_for_specific_msg_subtype() method in _connections_established()

You can also use peers without subclassing, just instantiate it, wait for connection and use methods: - register_message_handler() - subscribe() - send_message() - to send messages through pub and sub - create_task() - to run background asyncio tasks

See Peer States for more info.

Example:

Simple peer which averages signal across channels:

class AveragingPeer(ConfiguredPeer):

    async def _connections_established(self):
        await super()._connections_established()

        # subscribe to signal going from amplifier peer
        # for every SignalMessage self.signal_message_handler function will be called
        self.subscribe_for_specific_msg_subtype(SignalMessage, 'amplifier', self.signal_message_handler)

        await self.ready()

    async def signal_message_handler(self, msg):
        input = msg.data  # retrieve SamplePacket

        # create new SamplePacket averaged across channels
        output = SamplePacket(ts=input.ts, samples=numpy.mean(input.samples, axis=1, keepdims=True))

        # send created SamplePacket in a SignalMessage
        msg = SignalMessage(data=output)
        await self._send_message(msg)  # every peer subscribed to AveragingPeers SignalMessages will receive this
        # new message and could for example display it
        # such peer could be running on a different computer.

Amplifier drivers

Drivers for amplifiers are split between amplifier classes, which derive from EEGAmplifier, and amplifier peers which utilize those classes derive from AmplifierPeer.

Drivers should be placed inside obci.drivers.eeg module and corresponding peers inside obci.peers.drivers.amplifiers module.

If your driver implements EEGAmplifier API and doesn’t need any additional external parameters it is very likely that code for the peer will very concise:

from obci.peers.drivers.amplifiers.amplifier_peer import AmplifierPeer
from obci.drivers.eeg.my_great_amp import MyGreatAmplifier

__all__ = ('MyGreatAmplifierPeer',)


class MyGreatAmplifierPeer(AmplifierPeer):
    AmplifierClass = RandomAmplifier

AmplifierPeer class has its own underlying mechanisms to retrieve samples from EEGAmplifier type classes and send those samples as messages.

OBCI driver discovery

To make your driver discoverable by OBCI driver discovery (i.e. visible in Svarog) you should do following:

  • add amplifier class inside obci.drivers.eeg.driver_discovery.get_amp_classes_defs()
  • additionally add path to the amplifier peer
  • add path to the template scenario which can run your amplifier

Scenarios

Scenarios internally for OBCI should abide by these rules: - Scenarios for amplifiers and peers should be located inside obci/scenarios/ - Scenarios which run only amplifiers in different configurations

(ex to be viewed in Svarog) should be placed in obci/scenarios/amplifier/amp_name/
  • Scenarios which save data from your amplifier should be placed in obci/scenarios/acquisition/amp_name/
  • If you want those scenarios to be visible in obci_gui you should edit obci/control/gui/presets/default.ini file appropriately.

For example scenario which saves signal from TMSI amplifier connected to USB port:

[peers]
scenario_dir=
;***********************************************


;***********************************************
[peers.config_server]
path=peers/control/config_server.py


;***********************************************
; here peers.SOMETHING - SOMETHING is the peer_id for the loaded peer.
[peers.amplifier]
path=peers/drivers/amplifiers/tmsi_amplifier_peer.py


;***********************************************
[peers.signal_saver]
; path to peers are looked up:
; - first: peer .py files:
;   * first in the main obci directory
;   * directory relative to scenario location
;   * global path (including ~ expansion)
; - next: importable Python 3 path:
;   * like this: path=obci.peers.acquisition.signal_saver_peer
path=peers/acquisition/signal_saver_peer.py

[peers.signal_saver.launch_dependencies]
; signal saver has external params depending on signal_source
; signal source could be any peer, in this case it is tmsi_amplifier_peer
signal_source=amplifier