125 lines
6.0 KiB
Python
125 lines
6.0 KiB
Python
"""
|
|
Read raw data from Arduino and then converted into actual resistance using parameters provided from top level
|
|
"""
|
|
from datetime import datetime
|
|
import serial, time, json
|
|
import serial.tools.list_ports
|
|
import numpy as np
|
|
import csv
|
|
|
|
# constant settings
|
|
SENSORS_MAX = 4 # maximum sensor ports
|
|
|
|
def read():
|
|
"""
|
|
read the data from a board, if any. If "no device" selected generate random values for demo
|
|
"""
|
|
baud = 19200
|
|
settings = json.load(open('settings.json', 'r'))
|
|
resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware
|
|
v_in = settings["v_in"]
|
|
refRes = np.array(settings["refRes"])
|
|
sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to
|
|
file_name = settings["file_name"]
|
|
port = settings["port"]
|
|
delay = settings["delay"]
|
|
|
|
if np.any(sensor_ports >= SENSORS_MAX):
|
|
raise ValueError("Port range is 0-3!")
|
|
if "- No Device -" in port:
|
|
# generate random value
|
|
|
|
# open the file and add a line of header to it, then close
|
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
|
writer = csv.writer(f)
|
|
# headers based on number of ports used
|
|
if len(sensor_ports) == 1:
|
|
header = ['Time', 'Resistance']
|
|
if len(sensor_ports)== 2:
|
|
header = ['Time', 'Resistance1', 'Resistance2']
|
|
if len(sensor_ports) == 3:
|
|
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3']
|
|
if len(sensor_ports) == 4:
|
|
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3', 'Resistance4']
|
|
writer.writerow(header)
|
|
f.close()
|
|
while True:
|
|
#dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data
|
|
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
|
|
#dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
|
'''ser = serial.Serial(port)
|
|
ser.flushInput()
|
|
while True:
|
|
try:
|
|
ser_bytes = ser.readline()
|
|
decoded_bytes = float(ser_bytes[0:len(ser_bytes)-2].decode("utf-8"))
|
|
except:
|
|
print("Keyboard Interrupt")
|
|
break'''
|
|
#dat_sel = np.trunc((np.take(decoded_bytes, sensor_ports) / 1000) * 10**2) / 10**2
|
|
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration
|
|
# created a new array to convert resistance values to sci notation
|
|
r_arr2 = np.empty(len(r_arr), dtype=object)
|
|
for i in range(len(r_arr)) :
|
|
a = np.format_float_scientific(r_arr[i], precision = 2)
|
|
r_arr2[i] = a
|
|
# write + export values as .csv format
|
|
# converted resistance values in array to scientific notation
|
|
dat = f", ".join(np.insert(r_arr2.astype(str), 0, datetime.now().strftime('%H:%M:%S'))) #np.format_float_scientific()
|
|
print(dat)
|
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
|
f.write(dat + '\n')
|
|
f.close()
|
|
time.sleep(delay / 1000)
|
|
exit(0)
|
|
else:
|
|
controller = serial.Serial(port, baudrate=baud)
|
|
|
|
# open the file and add a line of header to it, then close
|
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
|
writer = csv.writer(f)
|
|
# headers based on number of ports used
|
|
if len(sensor_ports) == 1:
|
|
header = ['Time', 'Resistance']
|
|
if len(sensor_ports)== 2:
|
|
header = ['Time', 'Resistance1', 'Resistance2']
|
|
if len(sensor_ports) == 3:
|
|
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3']
|
|
if len(sensor_ports) == 4:
|
|
header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3', 'Resistance4']
|
|
writer.writerow(header)
|
|
f.close()
|
|
while controller.isOpen():
|
|
try:
|
|
#controller.flushInput()
|
|
read_data = controller.readline().decode("utf-8")
|
|
# use numpy so it can make list calculations easier (and possibly faster)
|
|
dat_list = np.asarray(json.loads(read_data), dtype=np.float32)[:SENSORS_MAX]
|
|
#ser_bytes = controller.readline()
|
|
#decoded_bytes = float(ser_bytes[0:len(ser_bytes)-2].decode("utf-8"))
|
|
#dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
|
dat_sel = np.take(dat_list, sensor_ports)
|
|
# if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below
|
|
# dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution
|
|
|
|
# take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors
|
|
#dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2
|
|
r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration
|
|
# created a new array to convert resistance values to sci notation
|
|
r_arr2 = np.empty(len(r_arr), dtype=object)
|
|
for i in range(len(r_arr)) :
|
|
a = np.format_float_scientific(r_arr[i], precision = 2)
|
|
r_arr2[i] = a
|
|
# write + export values as .csv format
|
|
# converted resistance values in array to scientific notation
|
|
dat = f", ".join(np.insert(r_arr2.astype(str), 0, datetime.now().strftime('%H:%M:%S')))
|
|
print(dat)
|
|
f = open(file_name, "a", newline="", encoding="utf-8")
|
|
f.write(dat + '\n')
|
|
f.close()
|
|
except KeyboardInterrupt as e:
|
|
print(e.__class__.__name__)
|
|
break
|
|
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
|
|
print('decoder error')
|