代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
from PyQt5 import QtCore
from PyQt5.QtWidgets import QFileDialog
import time
import json
from PyQt5.QtCore import QThreadPool
import numpy as np
from serialx import SerialX
import constantValues as cv
from lslSender import LSLSender
import debugPrinter as dp
from lslReceiver import LSLReceiver
from edfSaver import EDFSaver
import os
class Controller():
def __init__(self,mainwindow,curForm):
self.mw = mainwindow
self.lsl = LSLSender('dummy')
self.cf = curForm
self.fs = EDFSaver('./data')
self.lslRcv = LSLReceiver()
self.mp = None
self.serial = SerialX("dummy")
self.usr_config_file = os.path.join(os.path.dirname(__file__),"ui_config", "user_config.json")
with open(self.usr_config_file, 'r') as file:
self.usr_config_json = json.load(file)
# dp.dpt(self.usr_config_json[cv.JSON_MAC_KEY_STR])
# self.mw.new_mac(self.usr_config_json[cv.JSON_MAC_KEY_STR])
self.timer_ack = QtCore.QTimer()
self.timer_ack.setInterval(1000)
self.timer_ack.timeout.connect(self.ack_timer_handler)
self.timer_ack.start()
self.timer_set_mac = QtCore.QTimer()
self.timer_set_mac.setInterval(10000)
self.timer_set_mac.timeout.connect(self.send_mac_to_recv)
self.mw.evt_win.connect(self.win_evt)
# self.mw.evt_win_data.connect(self.win_evt_data)
self.serial.evt_com_list.connect(self.serial_com_list)
self.serial.evt_serial_cmd.connect(self.serial_evt_cmd)
self.serial.find_serial_ports()
self.packet_counter=0
self.threadpool = QThreadPool()
self.last_pkn=None
self.lslRcv.evt_lslRcv.connect(self.lslRcv_new_data)
self.p = None # Default empty value.
self.evt_serial_dataConnected = False
self.init_some()
self.start_time = time.time()
self.psyco_marker_counter = 0
# def __del__(self):
# print('del Controller')
def init_some(self):
self.serial.open_serial(self.usr_config_json[cv.JSON_COM_KEY_STR])
self.mw.set_combox_item(self.usr_config_json[cv.JSON_COM_KEY_STR])
def lslRcv_new_data(self,inlet_name,ts,arr):
if (inlet_name=='mi_acc'):
self.cf.deal_with_data_acc_inlet(ts,arr)
self.fs.new_data('acc',ts,arr)
if (inlet_name=='mi_eeg'):
## how do you want to use this eeg data
self.fs.new_data('eeg',ts,arr)
# arr = self.dsp.filter(arr)
# arr = self.dsp.filter_dummy(arr)
self.cf.deal_with_data_inlet(ts,arr)
if (inlet_name=='hb_eeg'):
self.fs.use_one_channel()
self.fs.new_data_1ch('eeg',ts,arr)
self.cf.deal_with_data_inlet(ts,arr)
if (inlet_name[:13] == 'psycho_marker'):
self.fs.new_data('mar',ts,arr)
if self.mp is not None:
self.mp.new_marker(arr)
if arr[0]==markers['trial_end']:
self.psyco_marker_counter = self.psyco_marker_counter+1
dp.dpt('trials: '+ str(self.psyco_marker_counter))
def get_marker_inlet(self,wanted_marker_inlets):
tmp = wanted_marker_inlets
for i in range(30):
time.sleep(1)
print('find marker_inlet...'+str(i))
new_marker_inlets = self.lslRcv.get_additional_marker_inlet()
for inlet in new_marker_inlets:
if inlet in tmp:
tmp.remove(inlet)
if len(tmp)==0:
return 's'
return 'f'
def win_evt(self,s,s2):
if s == cv.EVT_WIN_CMD_OPEN_COM:
if (self.serial.open_serial(s2)) :
self.usr_config_json[cv.JSON_COM_KEY_STR] = s2
elif s == cv.EVT_WIN_CMD_CLOSE_COM:
self.serial.close_serial()
elif s == cv.EVT_WIN_CMD_CON_DEV:
self.usr_config_json[cv.JSON_MAC_KEY_STR] = s2
self.send_mac_to_recv()
elif s == cv.SERIAL_CMD_STOP_DISCONNECT:
self.timer_set_mac.stop()
self.start_time = time.time()
self.serial.write_port(s)
elif s == cv.SERIAL_CMD_FALSH_LED:
self.serial.write_port(s)
elif s == cv.EVT_WIN_QUIT:
self.stop_recording()
self.serial.close_serial()
self.cf.close()
del self.cf
self.timer_set_mac.stop()
self.timer_ack.stop()
with open(self.usr_config_file, "w") as outfile:
json.dump(self.usr_config_json, outfile)
elif s==cv.EVT_WIN_BTN_START_RECORDING:
self.new_recording()
elif s==cv.EVT_WIN_BTN_STOP_RECORDING:
self.stop_recording()
def new_recording(self):
a = self.fs.get_name()
b = a.split('/')[-1]
text, ok = self.mw.get_input_fileName(b)
if ok and text:
c = a[:a.rfind('/')+1] + text
print(c)
if (self.fs.make_path(c)):
self.mw.show_error("this file has been exist!")
return 'f'
else:
self.fs.setup(c,text)
return 's'
return 'f'
def stop_recording(self):
if self.fs.save_on == 1:
self.fs.flush_data()
def send_mac_to_recv(self):
json_tmp={cv.JSON_RECV_KEY_MAC:self.usr_config_json[cv.JSON_MAC_KEY_STR]}
json_tmp_str = json.dumps(json_tmp)
# dp.dpt(self.usr_config_json[cv.JSON_MAC_KEY_STR])
self.serial.write_port(json_tmp_str)
def serial_evt_cmd(self,cmd):
self.mw.serial_cmd(cmd)
if cmd == cv.EVT_SERIAL_OPEN_SUC:
if not self.evt_serial_dataConnected:
self.serial.evt_serial_data.connect(self.deal_serial_data)
self.evt_serial_dataConnected=True
# dp.dpt('evt_serial_dataConnected')
else:
pass
# dp.dpt('evt_serial_data already connected ')
self.send_mac_to_recv()
self.timer_set_mac.start()
def parse_data(self,json_data):
if (cv.JSON_RECV_KEY_PACKET_NUM) in json_data.keys():
pkn = json_data.get(cv.JSON_RECV_KEY_PACKET_NUM)
if(self.last_pkn is None):
self.last_pkn = pkn
d = pkn - self.last_pkn - 1
# insert zeros # I do not know how to transfer NAN in lsl, so I use 0
if(d>0)&(d<20):
dp.dpt('lost package')
self.last_pkn = pkn
if (cv.JSON_RECV_KEY_CH_NUM) in json_data.keys():
n = json_data.get(cv.JSON_RECV_KEY_CH_NUM)
if n=="1":
if (cv.JSON_RECV_KEY_DATA_EEG) in json_data.keys():
e = json_data.get(cv.JSON_RECV_KEY_DATA_EEG)
arr = np.array(e)
if arr.size!=16:
dp.dpt('some thing wrong')
return
for i in range(16):
self.lsl.send_eeg_hb(e[i:i+1])
elif n=="8":
if (cv.JSON_RECV_KEY_DATA_EEG) in json_data.keys():
e = json_data.get(cv.JSON_RECV_KEY_DATA_EEG)
arr = np.array(e)
if arr.size!=80:
dp.dpt('some thing wrong')
return
for i in range(10):
self.lsl.send_eeg(e[i*8:i*8+8])
if (cv.JSON_RECV_KEY_DATA_ACC) in json_data.keys():
a = json_data.get(cv.JSON_RECV_KEY_DATA_ACC)
arr = np.array(a)
if arr.size!=4:
dp.dpt('some thing wrong')
return
self.lsl.send_acc(arr)
def deal_serial_data(self,ar):
try:
json_data = json.loads(bytearray(ar))
except ValueError as e:
dp.dpt('--- json error')
dp.dpt(e)
dp.dpt('###')
dp.dpt(ar)
# may not suitable
# self.serial.write_port(cv.SERIAL_ACK_OK)
self.serial.write_port_ack_ok()
return
# self.serial.write_port(cv.SERIAL_ACK_OK)
# dp.dpt(json_data.keys())
self.serial.write_port_ack_ok()
self.timer_ack.start() # like feed wdg
if (cv.JSON_RECV_KEY_EVT) in json_data.keys():
s = json_data.get(cv.JSON_RECV_KEY_EVT)
self.mw.dev_evt(s)
# dp.dpt(s)
if (cv.JSON_RECV_KEY_MAC) in json_data.keys():
s = json_data.get(cv.JSON_RECV_KEY_MAC)
# dp.dpt(s)
self.mw.new_mac(s)
if (cv.JSON_RECV_KEY_ALPHA_AMP) in json_data.keys():
s = json_data.get(cv.JSON_RECV_KEY_ALPHA_AMP)
arr = np.asarray(s.split(':'),dtype=np.int32)
# I did not send decimal fraction in the esp32, so I can use int here
if self.cfa.showing_state == False:
self.cfa.show()
self.cfa.showing_state = True
self.cfa.new_data(arr)
self.parse_data(json_data)
def serial_com_list(self,li):
self.mw.add_serial_port_to_combox(li)
def ack_timer_handler(self):
self.serial.write_port_ack_ok()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。