import PyQt5.QtWidgets as qtw from PyQt5.QtCore import Qt, QObject, QThread, pyqtSignal, pyqtSlot from PyQt5.QtGui import QIntValidator from matplotlib.backends.backend_qt5agg import FigureCanvas import matplotlib as mpl import numpy as np import serial import serial.tools.list_ports import struct import sys from scipy import signal, fft import pandas as pd import time MAX_LEN_TX_SIGNAL = 2000 len_tx_signal = 0 MAX_SAMPLE_VALUE = np.uint16(4095) MAX_VOLTAGE = 30 MAX_LEN_RX_SIGNAL= 4000 SAMPLING_RATE = 2e6 BAUD_RATE = 1e6 do_terminate_app = False MODE_PULSE_ECHO = 0 MODE_CONTINUOUS = 1 # App und UI Handling class MainWindow(qtw.QWidget): # Initializer def __init__(self): global MAX_LEN_RX_SIGNAL, MAX_LEN_TX_SIGNAL, len_tx_signal global MAX_SAMPLE_VALUE, MAX_VOLTAGE super().__init__() self.setWindowTitle("LCMUT measurement UI") # Oberer Teil # COM Port, Tx Signal, Controls layout_top = qtw.QHBoxLayout() # Serial Port com_panel = qtw.QVBoxLayout() temp_lbl = qtw.QLabel('Serial Port') temp_lbl.setMaximumHeight(20) com_panel.addWidget(temp_lbl) self.combo_com_ports = qtw.QComboBox() com_panel.addWidget(self.combo_com_ports) self.btn_com_connect = qtw.QPushButton('Connect') self.btn_com_connect.clicked.connect(self.on_com_connect) com_panel.addWidget(self.btn_com_connect) layout_top.addLayout(com_panel, 1) # TX Signal plot self.fig_tx = FigureCanvas() layout_top.addWidget(self.fig_tx, 2) tx_panel = qtw.QVBoxLayout() f0_panel = qtw.QHBoxLayout() f0_panel.addWidget(qtw.QLabel('f0')) self.inp_f0 = qtw.QLineEdit('60000') f0_panel.addWidget(self.inp_f0) tx_panel.addLayout(f0_panel) f1_panel = qtw.QHBoxLayout() f1_panel.addWidget(qtw.QLabel('f1')) self.inp_f1 = qtw.QLineEdit('100000') f1_panel.addWidget(self.inp_f1) tx_panel.addLayout(f1_panel) per_panel = qtw.QHBoxLayout() per_panel.addWidget(qtw.QLabel('# Periods/Chirplen')) self.inp_periodes = qtw.QLineEdit('10') per_panel.addWidget(self.inp_periodes) tx_panel.addLayout(per_panel) vpp_panel = qtw.QHBoxLayout() vpp_panel.addWidget(qtw.QLabel('Vpeak-peak')) self.inp_vpp = qtw.QLineEdit('10') vpp_panel.addWidget(self.inp_vpp) tx_panel.addLayout(vpp_panel) signal_panel = qtw.QHBoxLayout() self.btn_signal_sine = qtw.QPushButton('Sine') self.btn_signal_sine.clicked.connect(self.on_signal_sine) self.btn_signal_chirp = qtw.QPushButton('Chirp') self.btn_signal_chirp.clicked.connect(self.on_signal_chirp) self.btn_signal_step = qtw.QPushButton('Step') self.btn_signal_step.clicked.connect(self.on_signal_step) self.btn_signal_multifreq = qtw.QPushButton('MultiFreq') self.btn_signal_multifreq.clicked.connect(self.on_signal_multifreq) signal_panel.addWidget(self.btn_signal_sine) signal_panel.addWidget(self.btn_signal_chirp) signal_panel.addWidget(self.btn_signal_step) signal_panel.addWidget(self.btn_signal_multifreq) tx_panel.addLayout(signal_panel) mesmode_panel = qtw.QHBoxLayout() self.btn_mode_pulseecho = qtw.QPushButton('Pulse-Echo') self.btn_mode_pulseecho.clicked.connect(self.on_mode_pulseecho) self.btn_mode_cont = qtw.QPushButton('Continuous') self.btn_mode_cont.clicked.connect(self.on_mode_continuous) mesmode_panel.addWidget(self.btn_mode_pulseecho) mesmode_panel.addWidget(self.btn_mode_cont) tx_panel.addLayout(mesmode_panel) layout_top.addLayout(tx_panel, 1) # Unterer Teil # Messdatenplot, Daten speichern, ... layout_bottom = qtw.QVBoxLayout() # Datenplot self.fig_measure = FigureCanvas() layout_bottom.addWidget(self.fig_measure) self._ax_measure = self.fig_measure.figure.subplots(1, 1) # self.fig_measure.figure.tight_layout() self._ax_measure.set_xlabel('Sample #') self._ax_measure.set_ylabel('Amplitude') self._ax_measure.set_xlim([0, MAX_LEN_RX_SIGNAL]) self._ax_measure.set_ylim([0, MAX_SAMPLE_VALUE]) self.line_measure, = self._ax_measure.plot(np.zeros(MAX_LEN_RX_SIGNAL)) self.line_measure2, = self._ax_measure.plot(np.zeros(MAX_LEN_RX_SIGNAL)) self.line_measure3, = self._ax_measure.plot(np.zeros(MAX_LEN_RX_SIGNAL)) self.line_measure4, = self._ax_measure.plot(np.zeros(MAX_LEN_RX_SIGNAL)) # Save save_panel = qtw.QHBoxLayout() self.btn_toggle_measurement = qtw.QPushButton('Start recording') self.btn_toggle_measurement.clicked.connect(self.on_toggle_recording) self.lbl_num_data = qtw.QLabel('Data #') self.btn_save_data = qtw.QPushButton('Save Data') self.btn_save_data.clicked.connect(self.on_save_data) save_panel.addWidget(self.btn_toggle_measurement, 1) save_panel.addWidget(self.lbl_num_data, 1) save_panel.addWidget(self.btn_save_data, 1) layout_bottom.addLayout(save_panel) # Layouts zusammenfügen layout = qtw.QVBoxLayout() layout.addLayout(layout_top, 1) layout.addLayout(layout_bottom, 1) self.setLayout(layout) # Serielle Ports scannen self.com_ports = [] self.com_names = [] for x in serial.tools.list_ports.comports(): self.com_ports.append(x[0]) self.com_names.append(x[1]) self.combo_com_ports.addItem(x[0] + ' - ' + x[1]) # DAQ Thread starten self.daq = DAQThread() self.thread = QThread() self.daq.moveToThread(self.thread) self.thread.started.connect(self.daq.run) self.daq.finished.connect(self.thread.quit) self.thread.finished.connect(self.plot_data) self.thread.start() # Tx Signal Plot self.tx_signal = np.ones(MAX_LEN_TX_SIGNAL, dtype=np.uint16) * 2048 self.tx_signal_len = MAX_LEN_TX_SIGNAL self.tx_signal_x = np.arange(0, MAX_LEN_TX_SIGNAL, 1) / SAMPLING_RATE self.tx_signal_y = MAX_VOLTAGE * (np.linspace(0, MAX_SAMPLE_VALUE) / MAX_SAMPLE_VALUE) self._ax_tx = self.fig_tx.figure.subplots(1, 1) # self.fig_tx.figure.tight_layout() self._ax_tx.set_xlim([0, self.tx_signal_x[-1]]) self._ax_tx.set_ylim([0, MAX_VOLTAGE]) self.line_tx, = self._ax_tx.plot(self.tx_signal_x, MAX_VOLTAGE*(self.tx_signal/MAX_SAMPLE_VALUE)) self.daq.update_tx_signal(self.tx_signal, self.tx_signal_len) # Daten aufnehmen self.data_df = None self.do_record_data = False self.data1_df = None self.do_record_data1 = False self.data2_df = None self.do_record_data2 = False self.data3_df = None self.do_record_data3 = False self.data4_df = None self.do_record_data4 = False def closeEvent(self, event): global do_terminate_app self.daq.com_disconnect() do_terminate_app = True if self.thread.isRunning(): self.thread.exit() qtw.QApplication.quit() @pyqtSlot() def on_com_connect(self): if self.combo_com_ports.currentIndex() >= 0: self.daq.com_connect(self.com_ports[self.combo_com_ports.currentIndex()]) @pyqtSlot() def on_signal_calculate(self, sigtype): global MAX_LEN_TX_SIGNAL, len_tx_signal pass def plot_data(self): if do_terminate_app is False: if self.do_record_data == True: ndf = pd.DataFrame(columns=['RX', 'TX'], data=[[self.daq.data, self.tx_signal[0:self.tx_signal_len]]]) self.data_df = pd.concat([self.data_df, ndf], ignore_index=True) dnum = int(len(self.data_df)) self.lbl_num_data.setText(f'Recorded data: {dnum}') else: self.lbl_num_data.setText('Not recording data') self.line_measure.set_ydata(self.daq.data) self._ax_measure.draw_artist(self._ax_measure.patch) self._ax_measure.draw_artist(self.line_measure) self.fig_measure.update() self.thread.start() # Aktualisieren der Messdatenanzeige für das erste Signal self.line_measure.set_ydata(self.daq.data1) self._ax_measure.draw_artist(self._ax_measure.patch) self._ax_measure.draw_artist(self.line_measure) # Aktualisieren der Messdatenanzeige für das zweite Signal self.line_measure2.set_ydata(self.daq.data2) self._ax_measure.draw_artist(self.line_measure2) # Aktualisieren der Messdatenanzeige für das dritte Signal self.line_measure3.set_ydata(self.daq.data3) self._ax_measure.draw_artist(self.line_measure3) # Aktualisieren der Messdatenanzeige für das vierte Signal self.line_measure4.set_ydata(self.daq.data4) self._ax_measure.draw_artist(self.line_measure4) # Aktualisieren der Anzeige self.fig_measure.update() self.thread.start() @pyqtSlot() def on_mode_pulseecho(self): global MODE_PULSE_ECHO self.daq.update_tx_signal(self.tx_signal, self.tx_signal_len, MODE_PULSE_ECHO) @pyqtSlot() def on_mode_continuous(self): global MODE_CONTINUOUS self.daq.update_tx_signal(self.tx_signal, self.tx_signal_len, MODE_CONTINUOUS) @pyqtSlot() def on_signal_sine(self): global MAX_LEN_TX_SIGNAL, SAMPLING_RATE p = int(self.inp_vpp.text()) / 2 f = int(self.inp_f0.text()) per = int(self.inp_periodes.text()) siglen = int(np.ceil(per * SAMPLING_RATE / f)) if siglen > MAX_LEN_TX_SIGNAL: print('len > maxlen') return temp_signal = np.ones(MAX_LEN_TX_SIGNAL, dtype=np.uint16) * (MAX_VOLTAGE / 2) for i in range(siglen): temp_signal[i] = np.sin(2 * np.pi * f * (i / SAMPLING_RATE)) * p + (MAX_VOLTAGE / 2) temp_signal[temp_signal > MAX_VOLTAGE] = MAX_VOLTAGE temp_signal[temp_signal < 0] = 0 self.line_tx.set_ydata(temp_signal) self._ax_tx.set_xlim([0, siglen / SAMPLING_RATE]) self._ax_tx.draw_artist(self._ax_tx.patch) self._ax_tx.draw_artist(self._ax_tx.xaxis) self._ax_tx.draw_artist(self.line_tx) self.fig_tx.update() self.tx_signal_len = siglen self.tx_signal1 = np.uint16((temp_signal / 10) / 3.3 * 4095) self.tx_signal2 = np.uint16((temp_signal / 10) / 3.3 * 4095) self.tx_signal3 = np.uint16((temp_signal / 10) / 3.3 * 4095) self.tx_signal4 = np.uint16((temp_signal / 10) / 3.3 * 4095) @pyqtSlot() def on_signal_chirp(self): global MAX_LEN_TX_SIGNAL, SAMPLING_RATE p = int(self.inp_vpp.text()) / 2 f0 = int(self.inp_f0.text()) f1 = int(self.inp_f1.text()) siglen = int(self.inp_periodes.text()) if siglen > MAX_LEN_TX_SIGNAL: print('len > maxlen') return temp_signal = np.ones(MAX_LEN_TX_SIGNAL, dtype=np.uint16) * (MAX_VOLTAGE / 2) #temp_signal[0:siglen] = signal.chirp(window.tx_signal_x[0:siglen], f0, window.tx_signal_x[siglen-1], f1) * p + (MAX_VOLTAGE / 2) temp_signal[0:siglen] = np.sin(2*np.pi*(f0 + (f1-f0)*self.tx_signal_x[0:siglen]/self.tx_signal_x[siglen])*self.tx_signal_x[0:siglen]) * p + (MAX_VOLTAGE / 2) temp_signal[temp_signal > MAX_VOLTAGE] = MAX_VOLTAGE temp_signal[temp_signal < 0] = 0 self.line_tx.set_ydata(temp_signal) self._ax_tx.set_xlim([0, siglen / SAMPLING_RATE]) self._ax_tx.draw_artist(self._ax_tx.patch) self._ax_tx.draw_artist(self.line_tx) self.fig_tx.update() self.tx_signal_len = siglen self.tx_signal = np.uint16((temp_signal / 10) / 3.3 * 4095) @pyqtSlot() def on_signal_step(self): global MAX_LEN_TX_SIGNAL, SAMPLING_RATE p = int(self.inp_vpp.text()) / 2 f0 = int(self.inp_f0.text()) f1 = int(self.inp_f1.text()) steps = int(self.inp_periodes.text()) per = 5 gsiglen = 0 fsteps = np.linspace(f0, f1, steps) for f in fsteps: gsiglen = gsiglen + int(np.ceil(per * SAMPLING_RATE / f)) if gsiglen > MAX_LEN_TX_SIGNAL: print('len > maxlen') return temp_signal = np.ones(MAX_LEN_TX_SIGNAL, dtype=np.uint16) * (MAX_VOLTAGE / 2) offset = 0 for f in fsteps: siglen = int(np.ceil(per * SAMPLING_RATE / f)) for i in range(siglen-1): temp_signal[i + offset] = np.sin(2 * np.pi * f * (i / SAMPLING_RATE)) * p + (MAX_VOLTAGE / 2) offset = offset + i self.line_tx.set_ydata(temp_signal) self._ax_tx.set_xlim([0, gsiglen / SAMPLING_RATE]) self._ax_tx.draw_artist(self._ax_tx.patch) self._ax_tx.draw_artist(self.line_tx) self.fig_tx.update() self.tx_signal_len = gsiglen self.tx_signal = np.uint16((temp_signal / 10) / 3.3 * 4095) @pyqtSlot() def on_signal_multifreq(self): global MAX_LEN_TX_SIGNAL, SAMPLING_RATE, MAX_VOLTAGE p = int(self.inp_vpp.text()) / 2 buf = np.zeros(MAX_LEN_TX_SIGNAL) f_res = (SAMPLING_RATE / 2) / (MAX_LEN_TX_SIGNAL / 2) # freq_list = [10e3, 20e3, 30e3, 40e3, 50e3, 60e3, 70e3, 80e3, 90e3, 100e3] # freq_weight = [2, 2, 1, 1, 4, 1, 3, 4, 4, 2, 3] freq_list = [40e3, 60e3, 65e3] freq_weight = [1, 1, 2] ''' for f in freq_list: buf[int(f / f_res)] = 1 ''' for i in range(len(freq_list)): j = int(freq_list[i] / f_res) buf[j] = freq_weight[i] x = fft.ifft(buf) temp_signal = (np.real(x) * (p / np.abs(x))) + (MAX_VOLTAGE / 2) temp_signal[np.isnan(temp_signal)] = (MAX_VOLTAGE / 2) temp_signal[temp_signal > MAX_VOLTAGE] = MAX_VOLTAGE temp_signal[temp_signal < 0] = (MAX_VOLTAGE / 2) self.line_tx.set_ydata(temp_signal) self._ax_tx.set_xlim([0, MAX_LEN_TX_SIGNAL / SAMPLING_RATE]) self._ax_tx.draw_artist(self._ax_tx.patch) self._ax_tx.draw_artist(self.line_tx) self.fig_tx.update() self.tx_signal_len = MAX_LEN_TX_SIGNAL self.tx_signal = np.uint16((temp_signal / 10) / 3.3 * 4095) @pyqtSlot() def on_toggle_recording(self): if self.do_record_data == False: # Datenaufnahme starten self.btn_toggle_measurement.setText('Stop recording') self.data_df = pd.DataFrame(columns=['RX', 'TX']) self.do_record_data = True else: # Datenaufnahme stoppen self.btn_toggle_measurement.setText('Start recording') self.do_record_data = False @pyqtSlot() def on_save_data(self): fn, _ = qtw.QFileDialog.getSaveFileName(None, 'Save File', '', 'Pandas DataFrame Pickle(*.pickle)') if fn: self.data_df.to_pickle(fn) # DAQ Thread class DAQThread(QObject): finished = pyqtSignal() def __init__(self): global MAX_LEN_RX_SIGNAL, MAX_LEN_TX_SIGNAL, SAMPLING_RATE super().__init__() self.serial = None self.data = np.zeros(MAX_LEN_RX_SIGNAL) self.data1 = np.zeros(MAX_LEN_RX_SIGNAL) self.data2 = np.zeros(MAX_LEN_RX_SIGNAL) self.data3 = np.zeros(MAX_LEN_RX_SIGNAL) self.data4 = np.zeros(MAX_LEN_RX_SIGNAL) self.tx_signal = np.ones(MAX_LEN_TX_SIGNAL, dtype=np.uint16) * 1024 self.tx_signal_len = 0 self.tx_mode = 0 self.update_tx = False # Filter self.filt_a = 1 self.filt_b = signal.firwin(31, 250e3, pass_zero='lowpass', fs=SAMPLING_RATE, window='hann') def com_disconnect(self): if self.serial is not None: self.serial.close() self.serial = None def com_connect(self, port): global BAUD_RATE if self.serial is None: self.serial = serial.Serial(port=port, baudrate=BAUD_RATE) def update_tx_signal(self, txsignal, siglen, mode=0): self.tx_signal = txsignal self.tx_signal_len = siglen self.tx_mode = mode self.update_tx = True def run(self): global MAX_LEN_TX_SIGNAL global MODE_PULSE_ECHO, MODE_CONTINUOUS sync = bytearray(1) dlen_b = bytearray(2) if self.serial is None: self.finished.emit() return # Neues Tx Signal schicken if self.update_tx == True: self.serial.write(b'-') temp = struct.pack(str(MAX_LEN_TX_SIGNAL) + 'H', *self.tx_signal) self.serial.write(temp) temp = struct.pack('H', self.tx_signal_len) self.serial.write(temp) if self.tx_mode == MODE_PULSE_ECHO: self.serial.write(b'#') elif self.tx_mode == MODE_CONTINUOUS: self.serial.write(b'*') self.update_tx = False # Daten abrufen else: self.serial.write(b'+') time.sleep(1) self.serial.readinto(sync) if sync == b'+': self.serial.readinto(dlen_b) self.data_len = struct.unpack('