""" Read raw data from Arduino and then converted into actual resistance using parameters provided from top level """ from datetime import datetime import serial, time, json import serial.tools.list_ports import numpy as np import csv # constant settings SENSORS_MAX = 4 # maximum sensor ports def read(): """ read the data from a board, if any. If "no device" selected generate random values for demo """ baud = 19200 settings = json.load(open('settings.json', 'r')) resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware v_in = settings["v_in"] refRes = np.array(settings["refRes"]) sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to file_name = settings["file_name"] port = settings["port"] delay = settings["delay"] if np.any(sensor_ports >= SENSORS_MAX): raise ValueError("Port range is 0-3!") if "- No Device -" in port: # generate random value # open the file and add a line of header to it, then close f = open(file_name, "a", newline="", encoding="utf-8") writer = csv.writer(f) header = ['Time', 'Resistance'] writer.writerow(header) f.close() while True: dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration # write + export values as .csv format # converted resistance values in array to scientific notation dat = f", ".join(np.insert(np.format_float_scientific(r_arr.astype(str)), 0, datetime.now().strftime('%H:%M:%S'))) print(dat) f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() time.sleep(delay / 1000) exit(0) else: controller = serial.Serial(port, baudrate=baud) while controller.isOpen(): try: read_data = controller.readline().decode("utf-8") # use numpy so it can make list calculations easier (and possibly faster) dat_list = np.asarray(json.loads(read_data), dtype=np.uint32)[:SENSORS_MAX] # if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below # dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration # write + export values as .csv format # converted resistance values in array to scientific notation dat = f", ".join(np.insert(np.format_float_scientific(r_arr.astype(str)), 0, datetime.now().strftime('%H:%M:%S'))) print(dat) f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() except KeyboardInterrupt as e: print(e.__class__.__name__) break except (json.decoder.JSONDecodeError, UnicodeDecodeError): print('decoder error')