55 lines
2.4 KiB
Python
55 lines
2.4 KiB
Python
|
"""
|
||
|
Read raw data from Arduino and then converted into actual resistance using parameters provided from top level
|
||
|
"""
|
||
|
from datetime import datetime
|
||
|
import serial, os, json
|
||
|
import serial.tools.list_ports
|
||
|
import numpy as np
|
||
|
|
||
|
# constant settings
|
||
|
SENSORS_MAX = 4 # maximum sensor ports
|
||
|
|
||
|
def read():
|
||
|
baud = 19200
|
||
|
settings = json.load(open('settings.json', 'r'))
|
||
|
resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware
|
||
|
v_in = settings["v_in"]
|
||
|
refRes = np.array(settings["refRes"])
|
||
|
sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to
|
||
|
file_name = settings["file_name"]
|
||
|
port = settings["port"]
|
||
|
if "- No Device -" in port:
|
||
|
# TODO: set to writerandomvalues.py thing instead raise error
|
||
|
print("this should generate random values base on # sensors given")
|
||
|
exit(0)
|
||
|
else:
|
||
|
controller = serial.Serial(port, baudrate=baud)
|
||
|
|
||
|
if np.any(sensor_ports >= SENSORS_MAX):
|
||
|
raise ValueError("Port range is 0-3!")
|
||
|
|
||
|
# TODO: separate data in each run but still keep them in one csv file
|
||
|
while controller.isOpen():
|
||
|
try:
|
||
|
read_data = controller.readline().decode("utf-8")
|
||
|
# use numpy so it can make list calculations easier (and possibly faster)
|
||
|
dat_list = np.asarray(json.loads(read_data), dtype=np.uint32)[:SENSORS_MAX]
|
||
|
|
||
|
# if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below
|
||
|
# dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution
|
||
|
|
||
|
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
|
||
|
dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
||
|
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1)
|
||
|
# write + export values as .csv format
|
||
|
dat = f", ".join(np.insert(r_arr.astype(str), 0, datetime.now().strftime('%H:%M:%S')))
|
||
|
print(dat)
|
||
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
||
|
f.write(dat + '\n')
|
||
|
f.close()
|
||
|
# except KeyboardInterrupt as e:
|
||
|
# print(e.__class__.__name__)
|
||
|
# break
|
||
|
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
|
||
|
print('decoder error')
|