From 54c05753c9029c3f57261da699af812c93eb0f5f Mon Sep 17 00:00:00 2001 From: Eric Yu Date: Thu, 18 Aug 2022 02:56:58 -0700 Subject: [PATCH] document code and minor bug fix --- read_arduino.py | 13 ++++---- serial_plotter.py | 75 +++++++++++++++++++++++++++++++---------------- test.py | 66 ++++++++++++++++++++++++++--------------- ui.py | 2 +- 4 files changed, 102 insertions(+), 54 deletions(-) diff --git a/read_arduino.py b/read_arduino.py index 3323a70..fbb03a8 100644 --- a/read_arduino.py +++ b/read_arduino.py @@ -10,6 +10,9 @@ import numpy as np SENSORS_MAX = 4 # maximum sensor ports def read(): + """ + read the data from a board, if any. If "no device" selected generate random values for demo + """ baud = 19200 settings = json.load(open('settings.json', 'r')) resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware @@ -23,8 +26,9 @@ def read(): if np.any(sensor_ports >= SENSORS_MAX): raise ValueError("Port range is 0-3!") if "- No Device -" in port: + # generate random value while True: - dat_list = np.random.randint(0, 3300, len(sensor_ports)) # create a randomized voltage data + dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) @@ -39,7 +43,6 @@ def read(): else: controller = serial.Serial(port, baudrate=baud) - # TODO: separate data in each run but still keep them in one csv file while controller.isOpen(): try: read_data = controller.readline().decode("utf-8") @@ -58,8 +61,8 @@ def read(): f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() - # except KeyboardInterrupt as e: - # print(e.__class__.__name__) - # break + except KeyboardInterrupt as e: + print(e.__class__.__name__) + break except (json.decoder.JSONDecodeError, UnicodeDecodeError): print('decoder error') diff --git a/serial_plotter.py b/serial_plotter.py index fc5c5e6..2ceef0b 100644 --- a/serial_plotter.py +++ b/serial_plotter.py @@ -1,62 +1,87 @@ from datetime import datetime +from matplotlib.animation import FuncAnimation +import os, json, traceback, wx import numpy as np import matplotlib import matplotlib.pyplot as plt -from matplotlib.animation import FuncAnimation -import os, json, traceback, ui, wx -matplotlib.use("WXAgg") # for JetBrains IDE to force use the wx backend + +matplotlib.use("WXAgg") # for JetBrains IDE to force use wxPython as backend UI for plotting + class SerialPlotter: - def __init__(self) -> None: + def __init__(self, parent: wx.Frame = None) -> None: + """ + Dynamically plot a graph with moving window, if a finite window size set by the user. It reads the .csv file + read_arduino generated, use the last line as the latest data and update the graph accordingly. It will plot + multiple lines on the graph if there are more than one values on the last row of the .csv file + :param parent: optional, parent frame + """ + self.parent = parent self.settings = json.load(open('settings.json', 'r')) self.sensors = len(self.settings['sensor_ports']) - self.windowsize = self.settings['winSize'] + self.windowsize = self.settings['winSize'] self.delay = self.settings["delay"] / 1000 - + self.colors = ['blue', 'orange', 'green', 'yellow'] - self.fig, self.axs = plt.subplots(1, 1, figsize=(7,5)) # TODO: make the figure size an UI option and pass into the settings.json + # TODO: make the figure size an UI option and pass into the settings.json + self.fig, self.axs = plt.subplots(1, 1, figsize=(9, 6)) - self.fig.canvas.mpl_connect('close_event', self._close_event) + self.fig.canvas.mpl_connect('close_event', self.event_close) self.timeStamps = {} self.sensorsData = {} + self.timeElapsed = 0 # time have passed since the graph started, in seconds for i in range(self.sensors): - self.timeStamps[i] = ['']#*self.windowsize - self.sensorsData[i] = [0]#*self.windowsize + self.timeStamps[i] = [''] + self.sensorsData[i] = [0] - plt.tight_layout() - - - def _close_event(self, event) -> None: - """ Actiions need to be executed when the graph has closed. Start a new .csv file to get read for new graph and bring back the UI, if hidden """ + def event_close(self, event) -> None: + """ + Actiions need to be executed when the graph has closed. Start a new .csv file to get read for new graph and + bring back the UI, if exist + """ file = self.settings["file_name"] wx.MessageBox(f"File has saved as {os.path.split(file)[1]} under {os.path.split(file)[0]} directory!\n") + if self.parent: + self.parent.Show() - def animation(self, t:int) -> None: - """ render a frame of the animated graph """ + def animation(self, t: int) -> None: + """ + render a frame of the animated graph + """ try: plt.cla() # clear previous frame - # read the last line from the .csv file, the data start from the second colunm so omit index # 0 + # read the last line from the .csv file, the data start from the second column so omit index #0 file = open(self.settings["file_name"], "r") ndata = np.array([np.asarray(line.split(", ")[1:], dtype=np.float32) for line in file]) if len(ndata) > 0: row = ndata[-1] i = 0 while i < self.sensors: - # shift all data left by 1 index, pop out the leftmost value - if self.windowsize > 0 and len(self.timeStamps[i]) > self.windowsize: # TODO: make sure the two lists have the same size + # shift all data left by 1 index, pop out the leftmost value, if windowsize is not 0 (infinite) + # TODO: make sure the two lists have the same size + # if self.windowsize > 0 and len(self.timeStamps[i]) > self.windowsize + if 0 < self.windowsize < len(self.timeStamps[i]): self.timeStamps[i].pop(0) self.sensorsData[i].pop(0) - self.timeStamps[i].append(datetime.now().strftime('%H:%M:%S')) + # self.timeStamps[i].append(datetime.now().strftime('%H:%M:%S')) # version 1 + + # version 2, if we decide to go with this one change the list type to list[int] from list[str] + self.timeStamps[i].append(str(self.timeElapsed)) + self.sensorsData[i].append(row[i]) # plot a line # TODO: round the number to 1-2- decimal places - self.axs.plot(self.timeStamps[i] ,self.sensorsData[i], color=self.colors[i], label=f'sensor {i + 1}, latest: {np.int32(self.sensorsData[i][-1])}') + self.axs.plot(self.timeStamps[i], self.sensorsData[i], color=self.colors[i], + label=f'sensor {i + 1}, latest: {np.floor(self.sensorsData[i][-1])} $\Omega$') + self.axs.set_xlabel('Time (seconds)') + self.axs.set_ylabel(u'Resistance ($\Omega$)') i += 1 - + self.timeElapsed += 1 # increment time + # Acknowledgement: https://stackoverflow.com/a/13589144 handles, labels = self.axs.get_legend_handles_labels() by_label = dict(zip(labels, handles)) @@ -65,6 +90,6 @@ class SerialPlotter: traceback.print_exc() def plotting(self) -> FuncAnimation: - ani = FuncAnimation(self.fig, self.animation, blit=False) + """ animate the dynamic plot """ + ani = FuncAnimation(self.fig, self.animation, blit=False, interval=self.delay * 1000) return ani - diff --git a/test.py b/test.py index 00e7c63..94459ed 100644 --- a/test.py +++ b/test.py @@ -1,8 +1,8 @@ from ui import Frame -from multiprocessing import * from read_arduino import * from serial_plotter import * -import os, sys, json, wx,importlib, warnings +from multiprocessing import * +import os, sys, json, wx, importlib, warnings import serial.tools.list_ports import numpy as np @@ -11,6 +11,7 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) # acknowledgement: https://stackoverflow.com/a/68666505. handles splash screens if '_PYIBoot_SPLASH' in os.environ and importlib.util.find_spec("pyi_splash"): import pyi_splash + pyi_splash.update_text('UI Loaded ...') pyi_splash.close() @@ -22,9 +23,11 @@ global ani, t1 def main(): #################### USER INPUTS ########################### - resistors = [float(frame.r_ref_1.GetValue()), float(frame.r_ref_2.GetValue()), float(frame.r_ref_3.GetValue()), - float(frame.r_ref_4.GetValue()), - 0] # resisters for each An port, where n is an integer from 0-3. Use 0 if none. in Ohms + r_ref = np.array( + [frame.r_ref_1.GetValue(), frame.r_ref_2.GetValue(), frame.r_ref_3.GetValue(), frame.r_ref_4.GetValue(), + 0]) # resisters for each An port, where n is an integer from 0-3. Use 0 if none. in Ohms + r_ref[r_ref == ''] = '0' # correcting the emply values + resistors = r_ref.astype(np.float32).tolist() # convert string to numbers input_voltage = float(frame.input_voltage.GetValue()) bit_rate = float(frame.adjusted_volt.GetValue()) port = frame.dev_list.GetValue() @@ -38,7 +41,8 @@ def main(): dat_folder = "RecordedData" os.makedirs(dat_folder, exist_ok=True) filename = os.path.join(os.getcwd(), dat_folder, f"{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.csv") - delay = 1000 # millisec pe r data, defined in the firmware + delay = 1000 # millisec per data point, defined in the firmware + # for now we have 5 reference resistors, there is an extra one (we set 4 on the UI) just in case if needed in the future if not (len(resistors) == 5): raise ValueError(f"expecting 5 resistor values, but got {len(resistors)}!!!") @@ -48,6 +52,13 @@ def main(): def gen_settings(resistors, input_voltage, bits, port, filename, window_size, delay): """ export all inputs from main() to a .json file + :param resistors: list of reference resistances of the sensors + :param input_voltage: Vin of the voltage divider + :param bits: ADC resolution for the board. Usualy 12-bits for ESP32 (and 10 bits for Arduino Mega) + :param port: serial port the board is connected to on the computer + :param filename: file name and absolute directory which the serial data will be recorded to + :param window_size: numbers of x values to display on the graph, or the x interval + :param delay: amount of time to wait for a task's next start. 1000 ms for now to be consistent with the firmware """ name = "settings.json" settings = {} @@ -71,13 +82,13 @@ def run(e): main() if 't1' in globals(): t1.terminate() # end the previous serial reads, if there is any - + # place the read() function from read_arduino into another process to run it in background t1 = Process(target=read, args=()) t1.start() # run the plotter. Note that we should not put the plotter class, or function, in another process since # matplot's FuncAnimation doesn't like that - plotter = SerialPlotter() + plotter = SerialPlotter(frame) ani = plotter.plotting() plotter.fig.show() @@ -87,31 +98,40 @@ def run(e): if __name__ == '__main__': - sys.stdout.write("this is an alpha version of the design. This debug terminal will be gone on official release\n") + console_title = 'Program Crashed! Here are the details' # this is there for now to redirect any errors + # Acknowledgement: https://stackoverflow.com/a/27694505 if sys.platform.startswith('win'): # On Windows calling this function is necessary. freeze_support() global t1 - app = wx.App() + app = wx.App(useBestVisual=True) frame = Frame(None) + app.SetTopWindow(frame) + # set where the uncaught errors should be displayed + console = wx.PyOnDemandOutputWindow(console_title) + console.SetParent(frame) + sys.stderr = console + sys.stdout.write("this is an alpha version of the design. This debug terminal will be gone on official release\n") ports = [comport.device for comport in serial.tools.list_ports.comports()] # get all available ports frame.dev_list.AppendItems(ports) - frame.SetTitle("Cease your resistance! - alpha 0.2.0") + frame.SetTitle("SeeDatResistance - Beta 0.1.0 RC2") frame.btLaunch.Bind(wx.EVT_BUTTON, run) if os.path.isfile("settings.json"): - sys.stdout.write("Found existing settings.json, auto-fill previous inputs!\n") - settings = json.load(open('settings.json', 'r')) - frame.r_ref_1.SetValue(str(settings["refRes"][0])) - frame.r_ref_2.SetValue(str(settings["refRes"][1])) - frame.r_ref_3.SetValue(str(settings["refRes"][2])) - frame.r_ref_4.SetValue(str(settings["refRes"][3])) - frame.input_voltage.SetValue(str(settings["v_in"])) - frame.adjusted_volt.SetValue(str(settings["resolution"])) - frame.m_textCtrl26.SetValue(str(settings["winSize"])) - if settings["port"] in ports: # auto-select device port if exist - frame.dev_list.SetValue(settings["port"]) - + try: + sys.stdout.write("Found existing settings.json, auto-fill previous inputs!\n") + settings = json.load(open('settings.json', 'r')) + frame.r_ref_1.SetValue(str(settings["refRes"][0])) + frame.r_ref_2.SetValue(str(settings["refRes"][1])) + frame.r_ref_3.SetValue(str(settings["refRes"][2])) + frame.r_ref_4.SetValue(str(settings["refRes"][3])) + frame.input_voltage.SetValue(str(settings["v_in"])) + frame.adjusted_volt.SetValue(str(settings["resolution"])) + frame.m_textCtrl26.SetValue(str(settings["winSize"])) + if settings["port"] in ports: # auto-select device port if exist + frame.dev_list.SetValue(settings["port"]) + except json.decoder.JSONDecodeError: # invalid settings file, ignore + pass frame.Show() app.MainLoop() if 't1' in globals(): diff --git a/ui.py b/ui.py index 2dd0dba..40975f1 100644 --- a/ui.py +++ b/ui.py @@ -16,7 +16,7 @@ import wx class Frame ( wx.Frame ): def __init__( self, parent ): - wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = wx.EmptyString, pos = wx.DefaultPosition, size = wx.Size( 470,330 ), style = wx.DEFAULT_FRAME_STYLE ) + wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = wx.EmptyString, pos = wx.DefaultPosition, size = wx.Size( 470,337 ), style = wx.DEFAULT_FRAME_STYLE ) self.SetSizeHintsSz( wx.Size( 470,330 ), wx.DefaultSize ) self.SetForegroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_BTNTEXT ) )