71 lines
3.4 KiB
Python
71 lines
3.4 KiB
Python
"""
|
|
Read raw data from Arduino and then converted into actual resistance using parameters provided from top level
|
|
"""
|
|
from datetime import datetime
|
|
import serial, time, json
|
|
import serial.tools.list_ports
|
|
import numpy as np
|
|
|
|
# constant settings
|
|
SENSORS_MAX = 4 # maximum sensor ports
|
|
|
|
def read():
|
|
"""
|
|
read the data from a board, if any. If "no device" selected generate random values for demo
|
|
"""
|
|
baud = 19200
|
|
settings = json.load(open('settings.json', 'r'))
|
|
resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware
|
|
v_in = settings["v_in"]
|
|
refRes = np.array(settings["refRes"])
|
|
sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to
|
|
file_name = settings["file_name"]
|
|
port = settings["port"]
|
|
delay = settings["delay"]
|
|
|
|
if np.any(sensor_ports >= SENSORS_MAX):
|
|
raise ValueError("Port range is 0-3!")
|
|
if "- No Device -" in port:
|
|
# generate random value
|
|
while True:
|
|
dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data
|
|
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
|
|
dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
|
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration
|
|
# write + export values as .csv format
|
|
# converted resistance values in array to scientific notation
|
|
dat = f", ".join(np.insert(np.format_float_scientific(r_arr.astype(str)), 0, datetime.now().strftime('%H:%M:%S')))
|
|
print(dat)
|
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
|
f.write(dat + '\n')
|
|
f.close()
|
|
time.sleep(delay / 1000)
|
|
exit(0)
|
|
else:
|
|
controller = serial.Serial(port, baudrate=baud)
|
|
|
|
while controller.isOpen():
|
|
try:
|
|
read_data = controller.readline().decode("utf-8")
|
|
# use numpy so it can make list calculations easier (and possibly faster)
|
|
dat_list = np.asarray(json.loads(read_data), dtype=np.uint32)[:SENSORS_MAX]
|
|
|
|
# if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below
|
|
# dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution
|
|
|
|
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
|
|
dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
|
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration
|
|
# write + export values as .csv format
|
|
# converted resistance values in array to scientific notation
|
|
dat = f", ".join(np.insert(np.format_float_scientific(r_arr.astype(str)), 0, datetime.now().strftime('%H:%M:%S')))
|
|
print(dat)
|
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
|
f.write(dat + '\n')
|
|
f.close()
|
|
except KeyboardInterrupt as e:
|
|
print(e.__class__.__name__)
|
|
break
|
|
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
|
|
print('decoder error')
|