Read-Sensor-Resistances/read_arduino.py

120 lines
5.8 KiB
Python
Raw Normal View History

"""
Read raw data from Arduino and then converted into actual resistance using parameters provided from top level
"""
from datetime import datetime
import serial, time, json
import serial.tools.list_ports
import numpy as np
2023-04-14 11:20:34 +00:00
import csv, traceback
# constant settings
SENSORS_MAX = 4 # maximum sensor ports
def read():
2022-08-18 09:56:58 +00:00
"""
read the data from a board, if any. If "no device" selected generate random values for demo
"""
baud = 19200
settings = json.load(open('settings.json', 'r'))
2023-02-07 06:57:05 +00:00
# resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead()
v_in = settings["v_in"]
refRes = np.array(settings["refRes"])
sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to
file_name = settings["file_name"]
port = settings["port"]
delay = settings["delay"]
2023-04-16 22:32:18 +00:00
if np.any(sensor_ports >= SENSORS_MAX):
raise ValueError("Port range is 0-3!")
if "- No Device -" in port:
2022-08-18 09:56:58 +00:00
# generate random value
2022-12-15 23:22:33 +00:00
# open the file and add a line of header to it, then close
f = open(file_name, "a", newline="", encoding="utf-8")
writer = csv.writer(f)
# headers based on number of ports used
if len(sensor_ports) == 1:
header = ['Time', 'Resistance']
if len(sensor_ports)== 2:
header = ['Time', 'Resistance1', 'Resistance2']
if len(sensor_ports) == 3:
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3']
if len(sensor_ports) == 4:
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3', 'Resistance4']
2022-12-15 23:22:33 +00:00
writer.writerow(header)
f.close()
while True:
dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
2023-04-16 22:32:18 +00:00
# According to the actual req, the data in the data list should be (v_in-data)
2023-04-09 22:55:03 +00:00
for x in range(dat_list.size):
2023-04-16 22:32:18 +00:00
np.put(dat_list, x, v_in - dat_list[x])
dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration
# created a new array to convert resistance values to sci notation
r_arr2 = np.empty(len(r_arr), dtype=object)
for i in range(len(r_arr)) :
a = np.format_float_scientific(r_arr[i], precision = 2)
r_arr2[i] = a
# write + export values as .csv format
# converted resistance values in array to scientific notation
dat = f", ".join(np.insert(r_arr2.astype(str), 0, datetime.now().strftime('%H:%M:%S'))) #np.format_float_scientific()
print(dat)
f = open(file_name, "a", newline="", encoding="utf-8")
f.write(dat + '\n')
f.close()
time.sleep(delay / 1000)
exit(0)
else:
2023-03-06 06:14:29 +00:00
controller = serial.Serial(port, baudrate=baud) # TODO: sometimes serial port is in use, make it promt user when it happens
2022-12-16 05:35:36 +00:00
# open the file and add a line of header to it, then close
f = open(file_name, "a", newline="", encoding="utf-8")
writer = csv.writer(f)
# headers based on number of ports used
if len(sensor_ports) == 1:
header = ['Time', 'Resistance']
2023-02-07 06:57:05 +00:00
if len(sensor_ports) == 2:
header = ['Time', 'Resistance1', 'Resistance2']
if len(sensor_ports) == 3:
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3']
if len(sensor_ports) == 4:
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3', 'Resistance4']
2022-12-16 05:35:36 +00:00
writer.writerow(header)
f.close()
while controller.isOpen():
try:
read_data = controller.readline().decode("utf-8")
2023-04-14 11:20:34 +00:00
print(read_data) # Debug
# use numpy so it can make list calculations easier (and possibly faster)
dat_list = np.asarray(json.loads(read_data), dtype=np.float32)[:SENSORS_MAX]
2023-04-16 22:32:18 +00:00
# According to the actual req, the data in the data list should be (votage-data)
2023-04-09 22:55:03 +00:00
for x in range(dat_list.size):
2023-04-16 22:32:18 +00:00
np.put(dat_list, x, v_in - dat_list[x])
dat_sel = np.take(dat_list, sensor_ports)
# if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below
# dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
2023-04-17 03:17:45 +00:00
# below line is the actual formula for the voltage divider from voltage to resistance. See report's formula for detail
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration
# created a new array to convert resistance values to sci notation
r_arr2 = np.empty(len(r_arr), dtype=object)
for i in range(len(r_arr)) :
a = np.format_float_scientific(r_arr[i], precision = 2)
r_arr2[i] = a
# write + export values as .csv format
# converted resistance values in array to scientific notation
dat = f", ".join(np.insert(r_arr2.astype(str), 0, datetime.now().strftime('%H:%M:%S')))
f = open(file_name, "a", newline="", encoding="utf-8")
f.write(dat + '\n')
f.close()
2022-08-18 09:56:58 +00:00
except KeyboardInterrupt as e:
2023-04-14 11:20:34 +00:00
controller.close()
2022-08-18 09:56:58 +00:00
print(e.__class__.__name__)
break
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
print('decoder error')
2023-04-14 11:20:34 +00:00
except:
traceback.print_exc() # debug, printout error