加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
controller.py 9.22 KB
一键复制 编辑 原始数据 按行查看 历史
tuanergou 提交于 2024-04-05 11:45 . project initialization
# -*- coding: utf-8 -*-
from PyQt5 import QtCore
from PyQt5.QtWidgets import QFileDialog
import time
import json
from PyQt5.QtCore import QThreadPool
import numpy as np
from serialx import SerialX
import constantValues as cv
from lslSender import LSLSender
import debugPrinter as dp
from lslReceiver import LSLReceiver
from edfSaver import EDFSaver
import os
class Controller():
def __init__(self,mainwindow,curForm):
self.mw = mainwindow
self.lsl = LSLSender('dummy')
self.cf = curForm
self.fs = EDFSaver('./data')
self.lslRcv = LSLReceiver()
self.mp = None
self.serial = SerialX("dummy")
self.usr_config_file = os.path.join(os.path.dirname(__file__),"ui_config", "user_config.json")
with open(self.usr_config_file, 'r') as file:
self.usr_config_json = json.load(file)
# dp.dpt(self.usr_config_json[cv.JSON_MAC_KEY_STR])
# self.mw.new_mac(self.usr_config_json[cv.JSON_MAC_KEY_STR])
self.timer_ack = QtCore.QTimer()
self.timer_ack.setInterval(1000)
self.timer_ack.timeout.connect(self.ack_timer_handler)
self.timer_ack.start()
self.timer_set_mac = QtCore.QTimer()
self.timer_set_mac.setInterval(10000)
self.timer_set_mac.timeout.connect(self.send_mac_to_recv)
self.mw.evt_win.connect(self.win_evt)
# self.mw.evt_win_data.connect(self.win_evt_data)
self.serial.evt_com_list.connect(self.serial_com_list)
self.serial.evt_serial_cmd.connect(self.serial_evt_cmd)
self.serial.find_serial_ports()
self.packet_counter=0
self.threadpool = QThreadPool()
self.last_pkn=None
self.lslRcv.evt_lslRcv.connect(self.lslRcv_new_data)
self.p = None # Default empty value.
self.evt_serial_dataConnected = False
self.init_some()
self.start_time = time.time()
self.psyco_marker_counter = 0
# def __del__(self):
# print('del Controller')
def init_some(self):
self.serial.open_serial(self.usr_config_json[cv.JSON_COM_KEY_STR])
self.mw.set_combox_item(self.usr_config_json[cv.JSON_COM_KEY_STR])
def lslRcv_new_data(self,inlet_name,ts,arr):
if (inlet_name=='mi_acc'):
self.cf.deal_with_data_acc_inlet(ts,arr)
self.fs.new_data('acc',ts,arr)
if (inlet_name=='mi_eeg'):
## how do you want to use this eeg data
self.fs.new_data('eeg',ts,arr)
# arr = self.dsp.filter(arr)
# arr = self.dsp.filter_dummy(arr)
self.cf.deal_with_data_inlet(ts,arr)
if (inlet_name=='hb_eeg'):
self.fs.use_one_channel()
self.fs.new_data_1ch('eeg',ts,arr)
self.cf.deal_with_data_inlet(ts,arr)
if (inlet_name[:13] == 'psycho_marker'):
self.fs.new_data('mar',ts,arr)
if self.mp is not None:
self.mp.new_marker(arr)
if arr[0]==markers['trial_end']:
self.psyco_marker_counter = self.psyco_marker_counter+1
dp.dpt('trials: '+ str(self.psyco_marker_counter))
def get_marker_inlet(self,wanted_marker_inlets):
tmp = wanted_marker_inlets
for i in range(30):
time.sleep(1)
print('find marker_inlet...'+str(i))
new_marker_inlets = self.lslRcv.get_additional_marker_inlet()
for inlet in new_marker_inlets:
if inlet in tmp:
tmp.remove(inlet)
if len(tmp)==0:
return 's'
return 'f'
def win_evt(self,s,s2):
if s == cv.EVT_WIN_CMD_OPEN_COM:
if (self.serial.open_serial(s2)) :
self.usr_config_json[cv.JSON_COM_KEY_STR] = s2
elif s == cv.EVT_WIN_CMD_CLOSE_COM:
self.serial.close_serial()
elif s == cv.EVT_WIN_CMD_CON_DEV:
self.usr_config_json[cv.JSON_MAC_KEY_STR] = s2
self.send_mac_to_recv()
elif s == cv.SERIAL_CMD_STOP_DISCONNECT:
self.timer_set_mac.stop()
self.start_time = time.time()
self.serial.write_port(s)
elif s == cv.SERIAL_CMD_FALSH_LED:
self.serial.write_port(s)
elif s == cv.EVT_WIN_QUIT:
self.stop_recording()
self.serial.close_serial()
self.cf.close()
del self.cf
self.timer_set_mac.stop()
self.timer_ack.stop()
with open(self.usr_config_file, "w") as outfile:
json.dump(self.usr_config_json, outfile)
elif s==cv.EVT_WIN_BTN_START_RECORDING:
self.new_recording()
elif s==cv.EVT_WIN_BTN_STOP_RECORDING:
self.stop_recording()
def new_recording(self):
a = self.fs.get_name()
b = a.split('/')[-1]
text, ok = self.mw.get_input_fileName(b)
if ok and text:
c = a[:a.rfind('/')+1] + text
print(c)
if (self.fs.make_path(c)):
self.mw.show_error("this file has been exist!")
return 'f'
else:
self.fs.setup(c,text)
return 's'
return 'f'
def stop_recording(self):
if self.fs.save_on == 1:
self.fs.flush_data()
def send_mac_to_recv(self):
json_tmp={cv.JSON_RECV_KEY_MAC:self.usr_config_json[cv.JSON_MAC_KEY_STR]}
json_tmp_str = json.dumps(json_tmp)
# dp.dpt(self.usr_config_json[cv.JSON_MAC_KEY_STR])
self.serial.write_port(json_tmp_str)
def serial_evt_cmd(self,cmd):
self.mw.serial_cmd(cmd)
if cmd == cv.EVT_SERIAL_OPEN_SUC:
if not self.evt_serial_dataConnected:
self.serial.evt_serial_data.connect(self.deal_serial_data)
self.evt_serial_dataConnected=True
# dp.dpt('evt_serial_dataConnected')
else:
pass
# dp.dpt('evt_serial_data already connected ')
self.send_mac_to_recv()
self.timer_set_mac.start()
def parse_data(self,json_data):
if (cv.JSON_RECV_KEY_PACKET_NUM) in json_data.keys():
pkn = json_data.get(cv.JSON_RECV_KEY_PACKET_NUM)
if(self.last_pkn is None):
self.last_pkn = pkn
d = pkn - self.last_pkn - 1
# insert zeros # I do not know how to transfer NAN in lsl, so I use 0
if(d>0)&(d<20):
dp.dpt('lost package')
self.last_pkn = pkn
if (cv.JSON_RECV_KEY_CH_NUM) in json_data.keys():
n = json_data.get(cv.JSON_RECV_KEY_CH_NUM)
if n=="1":
if (cv.JSON_RECV_KEY_DATA_EEG) in json_data.keys():
e = json_data.get(cv.JSON_RECV_KEY_DATA_EEG)
arr = np.array(e)
if arr.size!=16:
dp.dpt('some thing wrong')
return
for i in range(16):
self.lsl.send_eeg_hb(e[i:i+1])
elif n=="8":
if (cv.JSON_RECV_KEY_DATA_EEG) in json_data.keys():
e = json_data.get(cv.JSON_RECV_KEY_DATA_EEG)
arr = np.array(e)
if arr.size!=80:
dp.dpt('some thing wrong')
return
for i in range(10):
self.lsl.send_eeg(e[i*8:i*8+8])
if (cv.JSON_RECV_KEY_DATA_ACC) in json_data.keys():
a = json_data.get(cv.JSON_RECV_KEY_DATA_ACC)
arr = np.array(a)
if arr.size!=4:
dp.dpt('some thing wrong')
return
self.lsl.send_acc(arr)
def deal_serial_data(self,ar):
try:
json_data = json.loads(bytearray(ar))
except ValueError as e:
dp.dpt('--- json error')
dp.dpt(e)
dp.dpt('###')
dp.dpt(ar)
# may not suitable
# self.serial.write_port(cv.SERIAL_ACK_OK)
self.serial.write_port_ack_ok()
return
# self.serial.write_port(cv.SERIAL_ACK_OK)
# dp.dpt(json_data.keys())
self.serial.write_port_ack_ok()
self.timer_ack.start() # like feed wdg
if (cv.JSON_RECV_KEY_EVT) in json_data.keys():
s = json_data.get(cv.JSON_RECV_KEY_EVT)
self.mw.dev_evt(s)
# dp.dpt(s)
if (cv.JSON_RECV_KEY_MAC) in json_data.keys():
s = json_data.get(cv.JSON_RECV_KEY_MAC)
# dp.dpt(s)
self.mw.new_mac(s)
if (cv.JSON_RECV_KEY_ALPHA_AMP) in json_data.keys():
s = json_data.get(cv.JSON_RECV_KEY_ALPHA_AMP)
arr = np.asarray(s.split(':'),dtype=np.int32)
# I did not send decimal fraction in the esp32, so I can use int here
if self.cfa.showing_state == False:
self.cfa.show()
self.cfa.showing_state = True
self.cfa.new_data(arr)
self.parse_data(json_data)
def serial_com_list(self,li):
self.mw.add_serial_port_to_combox(li)
def ack_timer_handler(self):
self.serial.write_port_ack_ok()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化