Brain4edu - przykład użycia II

Interfejs użytkownika – elektrookulograf

Przykład prezentuje szkielet implementacji prostego systemu do rejestracji i analizy w czasie rzeczywistym ruchu gałki ocznej w systemie BCI-Framework. Program w założeniu ma wykrywać sekwencje ruchu gałki ocznej w czasie rzeczywistym, a wynik detekcji wypisywać na konsolę.

W celu uruchomienia przykładu należy dopisać klikanaście linijek kodu do modułu analizy sygnału ~/ex2/analisys/eog_realtime.py. Miejsce, w którym umieścisz swój kod zaznaczono na zielono. Poniższa instrukcja omawia krok po kroku sposób wykorzystania BCI Framework do stworzenia własnego scenariusza realizującego analizę w czasie rzeczywistym.

mkdir ~/ex2
mkdir ~/ex2/analysis
mkdir ~/ex2/scenarios
gedit ~ex2/scenarios/eog.ini

o zawartości:

;***********************************************
;***************** MAIN ************************
;***********************************************

[peers]
scenario_dir=

;***********************************************
[peers.config_server]
path=peers/control/config_server.py

;***********************************************
;***************** EOG *************************
;***********************************************

[peers.amplifier]
path=peers/drivers/amplifiers/tmsi_amplifier_peer.py
config=eog_cap.ini

;************ signal proccesing *****************
[peers.analysis]
path=~/ex2/analysis/eog_realtime.py

[peers.analysis.local_params]
signal_source=amplifier

[peers.analysis.config_sources]
signal_source=amplifier

W pliku wskazano na:
      a. plik ~/ex2/scenarios/eog_cap.ini jako miejsce, w którym znajdują się
parametry wzmacniacza (numery i nazwy kanałów, częstość próbkowania, ...)
      b. plik ~/ex2/analysis/eof_realtime.py jako miejsce, w którym znajduje się
moduł do analizy sygnału.

Należy te pliki stworzyć (patrz poniżej).

gedit ~/ex2/scenarios/eog_cap.ini

o zawartości:

[local_params]
channel_names=gora;dol;lewa;prawa
active_channels=0;1;2;3
sampling_rate=256
samples_per_packet=32
gedit ~/ex2/analysis/eog_realtime.py

W tym pliku umieść swój detektor opakowując go w poniższy kod. Moduł odbiera kolejne pakiety z próbkami. Każdy pakiet zawiera listę 32 znaczników czasu oraz macierz z 32x4 próbkami (32 próbki dla każdego z 4 kanałów). Twoje zadanie polega na dopisaniu kodu, który wypisze na konsolę odpowiedni komunikat, jeżeli w sygnale (tzn. w otrzymanym pakiecie) wystąpi określony wzorzec.

#!/usr/bin/env python3

import random
import time

from obci.core.configured_peer import ConfiguredPeer, SignalReceiverMixin
from obci.core.messages.types import DecisionMsg

__all__ = ('SampleAnalysis',)


class SampleAnalysis(SignalReceiverMixin, ConfiguredPeer):
    """Klasa odpowiedzialna za przetwarzenie sygnalu.

    Z uwagi na przejrzystosc kodu, techniczne szczegoly
    implementacji zostały przeniesione do nadklas."""

    ALLOW_EMPTY_SOURCE = False

    async def _signal_message_handler(self, msg):
        """Docelowa funkcja, ktora bedzie wywolywana ZA KAZDYM 
           razem kiedy zostanie odebrana wiadomosc."""

        # Otrzymana wiadomosc nazywamy pakietem.
        # Kazdy pakiet to dwuelementowy słownik o kluczach:
        # samples - macierz (numpy.array), ktorej kolumny sa kanalami
        # wzmacniacza (u nas pakiet zawiera 4 kanaly),
        # a wiersze numerami kolejnych probek
        # (u nas pakiet zawiera 32 probki)
        # ts - lista (numpy.array) znacznikow czasu (timestamp)
        # powstania kolejnych probek (u nas 32 znaczniki)

        # Otrzymalismy pakiet. Nazwijmy go "packet".
        packet = msg.data

        # Dostep do probek w pakiecie zapewnimy iterujac po kolejnych
        # wierszach listy "ts" tego pakietu.
        for idx in range(0, len(packet.ts)):

            # Weżmy znacznik czasu wiersza o numerze "idx"
            # i nazwijmy go "timestamp".
            timestamp = packet.ts[idx]

            # Zalogujmy go do specjalnego loggera.
            self._logger.debug("Got sample with timestamp: "+str(timestamp))

            # Wezmy probki z wiersza o numerze "idx" macierzy samples
            # otrzymanego pakietu. Liste tych probek nazwijmy "samples".
            samples = packet.samples[idx]

            # Co z tą listą zrobimy ?

            # ============================================================ #
            # ========= TU WPISZ SWOJ KOD BUFOROWANIA i ANALIZY  ========= #
            # ============================================================ #

            print(samples)  # na dobry poczatek wypisz te probki

            # Mozesz po nich iterowac
            for item in samples:
                self._logger.debug(item)

            # Albo zrealizowac swoj autorski pomysl na analize.

            # ============================================================ #
            # ============================================================ #
            # ============================================================ #

            # Wynikiem analizy jest zaklasyfikowanie sygnalu do jednej
            # z kilku grup. Ponizej zamieszczono przyklad prostego, losowego
            # klasyfikatora. Na kazda otrzymana wiadomosc generowana jest
            # losowa liczba z przedzialu [0,1]. Jezeli wylosowana liczba
            # jest wieksza niz 0.99 (czyli w 1% przypadkow), wewnatrz systemu
            # BCI framework rozeslana zostaje wiadomosc-decyzja.
            if random.random() > 0.99:
                # Jesli wylosowana z przedzialu [0,1] liczba jest wieksza
                # od 0.99 tworzymy wiadomosc-decyzje typu "thetarget"
                # z wynikiem, ktory jest losowa liczba calkowita z przedzialu
                # [0, 7] i dwoma znacznikami czasu - start_timestamp
                # i end_timestamp
                msg = DecisionMsg(decision="classification",
                                  score=random.randint(0, 7),
                                  decision_type='thetarget',
                                  start_timestamp=time.time(),
                                  end_timestamp=time.time())

                # Stworzona wiadomosc rozsylamy w systemie BCI framework
                await self._send_message(msg)

            # W tym cwiczeniu wystarczy jezeli wynik klasyfikacji zostanie
            # wypisany na ekran.

            # POWODZENIA!

Dla zachowania konwencji w tym samym katalogu stwórz plik o tej samej nazwie, z rozszerzeniem .ini, z parametrami dla modułu eog_realtime.py (patrz poniżej).

gedit ~/ex2/analysis/eog_realtime.ini

o zawartości:

[local_params]
signal_source=
save_file_name=
autoshutdown=1
autostart=1

[config_sources]
signal_source=

[external_params]
sampling_rate=signal_source.sampling_rate
sample_type=signal_source.sample_type
samples_per_packet=signal_source.samples_per_packet
channel_names=signal_source.channel_names
channel_gains=signal_source.channel_gains
channel_offsets=signal_source.channel_offsets
active_channels=signal_source.active_channels

Następnie uruchom serwer BCI framework komendą:

obci srv

Aby uruchomić swój scenariusz wpisz w konsoli:

obci launch ~/ex2/scenarios/eog.ini

Aby zatrzymać swój scenariusz wpisz w konsoli:

obci kill eog

Do wypisywania komunikatów wykorzystaj obiekt "LOGGER" zamiast funkcji "print". Tak wypisywane komunikaty opatrzone będą informacją o źródle komunikatu i czasie wystąpienia. Poza Twoimi komunikatami na konsoli wyświetlać się będą komunikaty ze wszystkich uruchomionych modulów BCI framework .

Jeżeli w jednym module BCI framework wystąpi błąd, to wszystkie inne moduły zostaną zamknięte. Jeden błąd powoduje najczęściej wystąpienie serii innych błędów. Przejrzyj komunikaty w konsoli. Przyczynę błędu znajdziesz w komunikacie błędu, który wystąpił najwcześniej. Komunikaty błędu opatrzone są napisem CRITCAL, ERROR lub Exception. Ustaw wcześniej bufor konsoli na nieograniczony: Edycja->Preferencje profilu->Przewijanie->Nieograniczone.

Urządzenie śledzące ruch gałki ocznej to tzw. okulograf (ang. eye tracker). Jeżeli pokonałeś wszystkie napotkane trudności, to właśnie zaimplementowaś swój pierwszy elektrookulograf.



GRATULACJE!