document code and minor bug fix
This commit is contained in:
parent
4613f9dcea
commit
54c05753c9
|
@ -10,6 +10,9 @@ import numpy as np
|
|||
SENSORS_MAX = 4 # maximum sensor ports
|
||||
|
||||
def read():
|
||||
"""
|
||||
read the data from a board, if any. If "no device" selected generate random values for demo
|
||||
"""
|
||||
baud = 19200
|
||||
settings = json.load(open('settings.json', 'r'))
|
||||
resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware
|
||||
|
@ -23,8 +26,9 @@ def read():
|
|||
if np.any(sensor_ports >= SENSORS_MAX):
|
||||
raise ValueError("Port range is 0-3!")
|
||||
if "- No Device -" in port:
|
||||
# generate random value
|
||||
while True:
|
||||
dat_list = np.random.randint(0, 3300, len(sensor_ports)) # create a randomized voltage data
|
||||
dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data
|
||||
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
|
||||
dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
||||
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1)
|
||||
|
@ -39,7 +43,6 @@ def read():
|
|||
else:
|
||||
controller = serial.Serial(port, baudrate=baud)
|
||||
|
||||
# TODO: separate data in each run but still keep them in one csv file
|
||||
while controller.isOpen():
|
||||
try:
|
||||
read_data = controller.readline().decode("utf-8")
|
||||
|
@ -58,8 +61,8 @@ def read():
|
|||
f = open(file_name, "a", newline="", encoding="utf-8")
|
||||
f.write(dat + '\n')
|
||||
f.close()
|
||||
# except KeyboardInterrupt as e:
|
||||
# print(e.__class__.__name__)
|
||||
# break
|
||||
except KeyboardInterrupt as e:
|
||||
print(e.__class__.__name__)
|
||||
break
|
||||
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
|
||||
print('decoder error')
|
||||
|
|
|
@ -1,13 +1,22 @@
|
|||
from datetime import datetime
|
||||
from matplotlib.animation import FuncAnimation
|
||||
import os, json, traceback, wx
|
||||
import numpy as np
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.animation import FuncAnimation
|
||||
import os, json, traceback, ui, wx
|
||||
matplotlib.use("WXAgg") # for JetBrains IDE to force use the wx backend
|
||||
|
||||
matplotlib.use("WXAgg") # for JetBrains IDE to force use wxPython as backend UI for plotting
|
||||
|
||||
|
||||
class SerialPlotter:
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, parent: wx.Frame = None) -> None:
|
||||
"""
|
||||
Dynamically plot a graph with moving window, if a finite window size set by the user. It reads the .csv file
|
||||
read_arduino generated, use the last line as the latest data and update the graph accordingly. It will plot
|
||||
multiple lines on the graph if there are more than one values on the last row of the .csv file
|
||||
:param parent: optional, parent frame
|
||||
"""
|
||||
self.parent = parent
|
||||
self.settings = json.load(open('settings.json', 'r'))
|
||||
self.sensors = len(self.settings['sensor_ports'])
|
||||
self.windowsize = self.settings['winSize']
|
||||
|
@ -15,47 +24,63 @@ class SerialPlotter:
|
|||
|
||||
self.colors = ['blue', 'orange', 'green', 'yellow']
|
||||
|
||||
self.fig, self.axs = plt.subplots(1, 1, figsize=(7,5)) # TODO: make the figure size an UI option and pass into the settings.json
|
||||
# TODO: make the figure size an UI option and pass into the settings.json
|
||||
self.fig, self.axs = plt.subplots(1, 1, figsize=(9, 6))
|
||||
|
||||
self.fig.canvas.mpl_connect('close_event', self._close_event)
|
||||
self.fig.canvas.mpl_connect('close_event', self.event_close)
|
||||
|
||||
self.timeStamps = {}
|
||||
self.sensorsData = {}
|
||||
self.timeElapsed = 0 # time have passed since the graph started, in seconds
|
||||
|
||||
for i in range(self.sensors):
|
||||
self.timeStamps[i] = ['']#*self.windowsize
|
||||
self.sensorsData[i] = [0]#*self.windowsize
|
||||
self.timeStamps[i] = ['']
|
||||
self.sensorsData[i] = [0]
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
|
||||
def _close_event(self, event) -> None:
|
||||
""" Actiions need to be executed when the graph has closed. Start a new .csv file to get read for new graph and bring back the UI, if hidden """
|
||||
def event_close(self, event) -> None:
|
||||
"""
|
||||
Actiions need to be executed when the graph has closed. Start a new .csv file to get read for new graph and
|
||||
bring back the UI, if exist
|
||||
"""
|
||||
file = self.settings["file_name"]
|
||||
wx.MessageBox(f"File has saved as {os.path.split(file)[1]} under {os.path.split(file)[0]} directory!\n")
|
||||
if self.parent:
|
||||
self.parent.Show()
|
||||
|
||||
def animation(self, t:int) -> None:
|
||||
""" render a frame of the animated graph """
|
||||
def animation(self, t: int) -> None:
|
||||
"""
|
||||
render a frame of the animated graph
|
||||
"""
|
||||
try:
|
||||
plt.cla() # clear previous frame
|
||||
# read the last line from the .csv file, the data start from the second colunm so omit index # 0
|
||||
# read the last line from the .csv file, the data start from the second column so omit index #0
|
||||
file = open(self.settings["file_name"], "r")
|
||||
ndata = np.array([np.asarray(line.split(", ")[1:], dtype=np.float32) for line in file])
|
||||
if len(ndata) > 0:
|
||||
row = ndata[-1]
|
||||
i = 0
|
||||
while i < self.sensors:
|
||||
# shift all data left by 1 index, pop out the leftmost value
|
||||
if self.windowsize > 0 and len(self.timeStamps[i]) > self.windowsize: # TODO: make sure the two lists have the same size
|
||||
# shift all data left by 1 index, pop out the leftmost value, if windowsize is not 0 (infinite)
|
||||
# TODO: make sure the two lists have the same size
|
||||
# if self.windowsize > 0 and len(self.timeStamps[i]) > self.windowsize
|
||||
if 0 < self.windowsize < len(self.timeStamps[i]):
|
||||
self.timeStamps[i].pop(0)
|
||||
self.sensorsData[i].pop(0)
|
||||
|
||||
self.timeStamps[i].append(datetime.now().strftime('%H:%M:%S'))
|
||||
# self.timeStamps[i].append(datetime.now().strftime('%H:%M:%S')) # version 1
|
||||
|
||||
# version 2, if we decide to go with this one change the list type to list[int] from list[str]
|
||||
self.timeStamps[i].append(str(self.timeElapsed))
|
||||
|
||||
self.sensorsData[i].append(row[i])
|
||||
# plot a line
|
||||
# TODO: round the number to 1-2- decimal places
|
||||
self.axs.plot(self.timeStamps[i] ,self.sensorsData[i], color=self.colors[i], label=f'sensor {i + 1}, latest: {np.int32(self.sensorsData[i][-1])}')
|
||||
self.axs.plot(self.timeStamps[i], self.sensorsData[i], color=self.colors[i],
|
||||
label=f'sensor {i + 1}, latest: {np.floor(self.sensorsData[i][-1])} $\Omega$')
|
||||
self.axs.set_xlabel('Time (seconds)')
|
||||
self.axs.set_ylabel(u'Resistance ($\Omega$)')
|
||||
i += 1
|
||||
self.timeElapsed += 1 # increment time
|
||||
|
||||
# Acknowledgement: https://stackoverflow.com/a/13589144
|
||||
handles, labels = self.axs.get_legend_handles_labels()
|
||||
|
@ -65,6 +90,6 @@ class SerialPlotter:
|
|||
traceback.print_exc()
|
||||
|
||||
def plotting(self) -> FuncAnimation:
|
||||
ani = FuncAnimation(self.fig, self.animation, blit=False)
|
||||
""" animate the dynamic plot """
|
||||
ani = FuncAnimation(self.fig, self.animation, blit=False, interval=self.delay * 1000)
|
||||
return ani
|
||||
|
||||
|
|
42
test.py
42
test.py
|
@ -1,8 +1,8 @@
|
|||
from ui import Frame
|
||||
from multiprocessing import *
|
||||
from read_arduino import *
|
||||
from serial_plotter import *
|
||||
import os, sys, json, wx,importlib, warnings
|
||||
from multiprocessing import *
|
||||
import os, sys, json, wx, importlib, warnings
|
||||
import serial.tools.list_ports
|
||||
import numpy as np
|
||||
|
||||
|
@ -11,6 +11,7 @@ warnings.filterwarnings("ignore", category=DeprecationWarning)
|
|||
# acknowledgement: https://stackoverflow.com/a/68666505. handles splash screens
|
||||
if '_PYIBoot_SPLASH' in os.environ and importlib.util.find_spec("pyi_splash"):
|
||||
import pyi_splash
|
||||
|
||||
pyi_splash.update_text('UI Loaded ...')
|
||||
pyi_splash.close()
|
||||
|
||||
|
@ -22,9 +23,11 @@ global ani, t1
|
|||
|
||||
def main():
|
||||
#################### USER INPUTS ###########################
|
||||
resistors = [float(frame.r_ref_1.GetValue()), float(frame.r_ref_2.GetValue()), float(frame.r_ref_3.GetValue()),
|
||||
float(frame.r_ref_4.GetValue()),
|
||||
0] # resisters for each An port, where n is an integer from 0-3. Use 0 if none. in Ohms
|
||||
r_ref = np.array(
|
||||
[frame.r_ref_1.GetValue(), frame.r_ref_2.GetValue(), frame.r_ref_3.GetValue(), frame.r_ref_4.GetValue(),
|
||||
0]) # resisters for each An port, where n is an integer from 0-3. Use 0 if none. in Ohms
|
||||
r_ref[r_ref == ''] = '0' # correcting the emply values
|
||||
resistors = r_ref.astype(np.float32).tolist() # convert string to numbers
|
||||
input_voltage = float(frame.input_voltage.GetValue())
|
||||
bit_rate = float(frame.adjusted_volt.GetValue())
|
||||
port = frame.dev_list.GetValue()
|
||||
|
@ -38,7 +41,8 @@ def main():
|
|||
dat_folder = "RecordedData"
|
||||
os.makedirs(dat_folder, exist_ok=True)
|
||||
filename = os.path.join(os.getcwd(), dat_folder, f"{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.csv")
|
||||
delay = 1000 # millisec pe r data, defined in the firmware
|
||||
delay = 1000 # millisec per data point, defined in the firmware
|
||||
# for now we have 5 reference resistors, there is an extra one (we set 4 on the UI) just in case if needed in the future
|
||||
if not (len(resistors) == 5):
|
||||
raise ValueError(f"expecting 5 resistor values, but got {len(resistors)}!!!")
|
||||
|
||||
|
@ -48,6 +52,13 @@ def main():
|
|||
def gen_settings(resistors, input_voltage, bits, port, filename, window_size, delay):
|
||||
"""
|
||||
export all inputs from main() to a .json file
|
||||
:param resistors: list of reference resistances of the sensors
|
||||
:param input_voltage: Vin of the voltage divider
|
||||
:param bits: ADC resolution for the board. Usualy 12-bits for ESP32 (and 10 bits for Arduino Mega)
|
||||
:param port: serial port the board is connected to on the computer
|
||||
:param filename: file name and absolute directory which the serial data will be recorded to
|
||||
:param window_size: numbers of x values to display on the graph, or the x interval
|
||||
:param delay: amount of time to wait for a task's next start. 1000 ms for now to be consistent with the firmware
|
||||
"""
|
||||
name = "settings.json"
|
||||
settings = {}
|
||||
|
@ -77,7 +88,7 @@ def run(e):
|
|||
t1.start()
|
||||
# run the plotter. Note that we should not put the plotter class, or function, in another process since
|
||||
# matplot's FuncAnimation doesn't like that
|
||||
plotter = SerialPlotter()
|
||||
plotter = SerialPlotter(frame)
|
||||
ani = plotter.plotting()
|
||||
plotter.fig.show()
|
||||
|
||||
|
@ -87,19 +98,27 @@ def run(e):
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.stdout.write("this is an alpha version of the design. This debug terminal will be gone on official release\n")
|
||||
console_title = 'Program Crashed! Here are the details' # this is there for now to redirect any errors
|
||||
|
||||
# Acknowledgement: https://stackoverflow.com/a/27694505
|
||||
if sys.platform.startswith('win'):
|
||||
# On Windows calling this function is necessary.
|
||||
freeze_support()
|
||||
global t1
|
||||
app = wx.App()
|
||||
app = wx.App(useBestVisual=True)
|
||||
frame = Frame(None)
|
||||
app.SetTopWindow(frame)
|
||||
# set where the uncaught errors should be displayed
|
||||
console = wx.PyOnDemandOutputWindow(console_title)
|
||||
console.SetParent(frame)
|
||||
sys.stderr = console
|
||||
sys.stdout.write("this is an alpha version of the design. This debug terminal will be gone on official release\n")
|
||||
ports = [comport.device for comport in serial.tools.list_ports.comports()] # get all available ports
|
||||
frame.dev_list.AppendItems(ports)
|
||||
frame.SetTitle("Cease your resistance! - alpha 0.2.0")
|
||||
frame.SetTitle("SeeDatResistance - Beta 0.1.0 RC2")
|
||||
frame.btLaunch.Bind(wx.EVT_BUTTON, run)
|
||||
if os.path.isfile("settings.json"):
|
||||
try:
|
||||
sys.stdout.write("Found existing settings.json, auto-fill previous inputs!\n")
|
||||
settings = json.load(open('settings.json', 'r'))
|
||||
frame.r_ref_1.SetValue(str(settings["refRes"][0]))
|
||||
|
@ -111,7 +130,8 @@ if __name__ == '__main__':
|
|||
frame.m_textCtrl26.SetValue(str(settings["winSize"]))
|
||||
if settings["port"] in ports: # auto-select device port if exist
|
||||
frame.dev_list.SetValue(settings["port"])
|
||||
|
||||
except json.decoder.JSONDecodeError: # invalid settings file, ignore
|
||||
pass
|
||||
frame.Show()
|
||||
app.MainLoop()
|
||||
if 't1' in globals():
|
||||
|
|
2
ui.py
2
ui.py
|
@ -16,7 +16,7 @@ import wx
|
|||
class Frame ( wx.Frame ):
|
||||
|
||||
def __init__( self, parent ):
|
||||
wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = wx.EmptyString, pos = wx.DefaultPosition, size = wx.Size( 470,330 ), style = wx.DEFAULT_FRAME_STYLE )
|
||||
wx.Frame.__init__ ( self, parent, id = wx.ID_ANY, title = wx.EmptyString, pos = wx.DefaultPosition, size = wx.Size( 470,337 ), style = wx.DEFAULT_FRAME_STYLE )
|
||||
|
||||
self.SetSizeHintsSz( wx.Size( 470,330 ), wx.DefaultSize )
|
||||
self.SetForegroundColour( wx.SystemSettings.GetColour( wx.SYS_COLOUR_BTNTEXT ) )
|
||||
|
|
Loading…
Reference in New Issue