Przykład prezentuje szkielet implementacji prostego systemu do rejestracji i analizy w czasie rzeczywistym ruchu gałki ocznej w systemie BCI-Framework. Program w założeniu ma wykrywać sekwencje ruchu gałki ocznej w czasie rzeczywistym, a wynik detekcji wypisywać na konsolę.
W celu uruchomienia przykładu należy dopisać klikanaście linijek kodu do modułu analizy sygnału ~/ex2/analisys/eog_realtime.py. Miejsce, w którym umieścisz swój kod zaznaczono na zielono. Poniższa instrukcja omawia krok po kroku sposób wykorzystania BCI Framework do stworzenia własnego scenariusza realizującego analizę w czasie rzeczywistym.
mkdir ~/ex2
mkdir ~/ex2/analysis
mkdir ~/ex2/scenarios
gedit ~ex2/scenarios/eog.ini
o zawartości:
;***********************************************
;***************** MAIN ************************
;***********************************************
[peers]
scenario_dir=
;***********************************************
[peers.config_server]
path=peers/control/config_server.py
;***********************************************
;***************** EOG *************************
;***********************************************
[peers.amplifier]
path=peers/drivers/amplifiers/tmsi_amplifier_peer.py
config=eog_cap.ini
;************ signal proccesing *****************
[peers.analysis]
path=~/ex2/analysis/eog_realtime.py
[peers.analysis.local_params]
signal_source=amplifier
[peers.analysis.config_sources]
signal_source=amplifier
W pliku wskazano na:
a. plik ~/ex2/scenarios/eog_cap.ini jako miejsce, w którym znajdują się
parametry wzmacniacza (numery i nazwy kanałów, częstość próbkowania, ...)
b. plik ~/ex2/analysis/eof_realtime.py jako miejsce, w którym znajduje się
moduł do analizy sygnału.
Należy te pliki stworzyć (patrz poniżej).
gedit ~/ex2/scenarios/eog_cap.ini
o zawartości:
[local_params]
channel_names=gora;dol;lewa;prawa
active_channels=0;1;2;3
sampling_rate=256
samples_per_packet=32
gedit ~/ex2/analysis/eog_realtime.py
W tym pliku umieść swój detektor opakowując go w poniższy kod. Moduł odbiera kolejne pakiety z próbkami. Każdy pakiet zawiera listę 32 znaczników czasu oraz macierz z 32x4 próbkami (32 próbki dla każdego z 4 kanałów). Twoje zadanie polega na dopisaniu kodu, który wypisze na konsolę odpowiedni komunikat, jeżeli w sygnale (tzn. w otrzymanym pakiecie) wystąpi określony wzorzec.
#!/usr/bin/env python3
import random
import time
from obci.core.configured_peer import ConfiguredPeer, SignalReceiverMixin
from obci.core.messages.types import DecisionMsg
__all__ = ('SampleAnalysis',)
class SampleAnalysis(SignalReceiverMixin, ConfiguredPeer):
"""Klasa odpowiedzialna za przetwarzenie sygnalu.
Z uwagi na przejrzystosc kodu, techniczne szczegoly
implementacji zostały przeniesione do nadklas."""
ALLOW_EMPTY_SOURCE = False
async def _signal_message_handler(self, msg):
"""Docelowa funkcja, ktora bedzie wywolywana ZA KAZDYM
razem kiedy zostanie odebrana wiadomosc."""
# Otrzymana wiadomosc nazywamy pakietem.
# Kazdy pakiet to dwuelementowy słownik o kluczach:
# samples - macierz (numpy.array), ktorej kolumny sa kanalami
# wzmacniacza (u nas pakiet zawiera 4 kanaly),
# a wiersze numerami kolejnych probek
# (u nas pakiet zawiera 32 probki)
# ts - lista (numpy.array) znacznikow czasu (timestamp)
# powstania kolejnych probek (u nas 32 znaczniki)
# Otrzymalismy pakiet. Nazwijmy go "packet".
packet = msg.data
# Dostep do probek w pakiecie zapewnimy iterujac po kolejnych
# wierszach listy "ts" tego pakietu.
for idx in range(0, len(packet.ts)):
# Weżmy znacznik czasu wiersza o numerze "idx"
# i nazwijmy go "timestamp".
timestamp = packet.ts[idx]
# Zalogujmy go do specjalnego loggera.
self._logger.debug("Got sample with timestamp: "+str(timestamp))
# Wezmy probki z wiersza o numerze "idx" macierzy samples
# otrzymanego pakietu. Liste tych probek nazwijmy "samples".
samples = packet.samples[idx]
# Co z tą listą zrobimy ?
# ============================================================ #
# ========= TU WPISZ SWOJ KOD BUFOROWANIA i ANALIZY ========= #
# ============================================================ #
print(samples) # na dobry poczatek wypisz te probki
# Mozesz po nich iterowac
for item in samples:
self._logger.debug(item)
# Albo zrealizowac swoj autorski pomysl na analize.
# ============================================================ #
# ============================================================ #
# ============================================================ #
# Wynikiem analizy jest zaklasyfikowanie sygnalu do jednej
# z kilku grup. Ponizej zamieszczono przyklad prostego, losowego
# klasyfikatora. Na kazda otrzymana wiadomosc generowana jest
# losowa liczba z przedzialu [0,1]. Jezeli wylosowana liczba
# jest wieksza niz 0.99 (czyli w 1% przypadkow), wewnatrz systemu
# BCI framework rozeslana zostaje wiadomosc-decyzja.
if random.random() > 0.99:
# Jesli wylosowana z przedzialu [0,1] liczba jest wieksza
# od 0.99 tworzymy wiadomosc-decyzje typu "thetarget"
# z wynikiem, ktory jest losowa liczba calkowita z przedzialu
# [0, 7] i dwoma znacznikami czasu - start_timestamp
# i end_timestamp
msg = DecisionMsg(decision="classification",
score=random.randint(0, 7),
decision_type='thetarget',
start_timestamp=time.time(),
end_timestamp=time.time())
# Stworzona wiadomosc rozsylamy w systemie BCI framework
await self._send_message(msg)
# W tym cwiczeniu wystarczy jezeli wynik klasyfikacji zostanie
# wypisany na ekran.
# POWODZENIA!
Dla zachowania konwencji w tym samym katalogu stwórz plik o tej samej nazwie, z rozszerzeniem .ini, z parametrami dla modułu eog_realtime.py (patrz poniżej).
gedit ~/ex2/analysis/eog_realtime.ini
o zawartości:
[local_params]
signal_source=
save_file_name=
autoshutdown=1
autostart=1
[config_sources]
signal_source=
[external_params]
sampling_rate=signal_source.sampling_rate
sample_type=signal_source.sample_type
samples_per_packet=signal_source.samples_per_packet
channel_names=signal_source.channel_names
channel_gains=signal_source.channel_gains
channel_offsets=signal_source.channel_offsets
active_channels=signal_source.active_channels
Następnie uruchom serwer BCI framework komendą:
obci srv
Aby uruchomić swój scenariusz wpisz w konsoli:
obci launch ~/ex2/scenarios/eog.ini
Aby zatrzymać swój scenariusz wpisz w konsoli:
obci kill eog
Do wypisywania komunikatów wykorzystaj obiekt "LOGGER" zamiast funkcji "print". Tak wypisywane komunikaty opatrzone będą informacją o źródle komunikatu i czasie wystąpienia. Poza Twoimi komunikatami na konsoli wyświetlać się będą komunikaty ze wszystkich uruchomionych modulów BCI framework .
Jeżeli w jednym module BCI framework wystąpi błąd, to wszystkie inne moduły zostaną zamknięte. Jeden błąd powoduje najczęściej wystąpienie serii innych błędów. Przejrzyj komunikaty w konsoli. Przyczynę błędu znajdziesz w komunikacie błędu, który wystąpił najwcześniej. Komunikaty błędu opatrzone są napisem CRITCAL, ERROR lub Exception. Ustaw wcześniej bufor konsoli na nieograniczony: Edycja->Preferencje profilu->Przewijanie->Nieograniczone.
Urządzenie śledzące ruch gałki ocznej to tzw. okulograf (ang. eye tracker). Jeżeli pokonałeś wszystkie napotkane trudności, to właśnie zaimplementowaś swój pierwszy elektrookulograf.
GRATULACJE!