""" Read raw data from Arduino and then converted into actual resistance using parameters provided from top level """ from datetime import datetime import serial, time, json import serial.tools.list_ports import numpy as np import csv # constant settings SENSORS_MAX = 4 # maximum sensor ports def read(): """ read the data from a board, if any. If "no device" selected generate random values for demo """ baud = 19200 settings = json.load(open('settings.json', 'r')) resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware v_in = settings["v_in"] refRes = np.array(settings["refRes"]) sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to file_name = settings["file_name"] port = settings["port"] delay = settings["delay"] if np.any(sensor_ports >= SENSORS_MAX): raise ValueError("Port range is 0-3!") if "- No Device -" in port: # generate random value # open the file and add a line of header to it, then close f = open(file_name, "a", newline="", encoding="utf-8") writer = csv.writer(f) # headers based on number of ports used if len(sensor_ports) == 1: header = ['Time', 'Resistance'] if len(sensor_ports)== 2: header = ['Time', 'Resistance1', 'Resistance2'] if len(sensor_ports) == 3: header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3'] if len(sensor_ports) == 4: header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3', 'Resistance4'] writer.writerow(header) f.close() while True: dat_list = np.random.randint(0, v_in * 1000, SENSORS_MAX) # create a randomized voltage data # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration # created a new array to convert resistance values to sci notation r_arr2 = np.empty(len(r_arr), dtype=object) for i in range(len(r_arr)) : a = np.format_float_scientific(r_arr[i], precision = 2) r_arr2[i] = a # write + export values as .csv format # converted resistance values in array to scientific notation dat = f", ".join(np.insert(r_arr2.astype(str), 0, datetime.now().strftime('%H:%M:%S'))) #np.format_float_scientific() print(dat) f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() time.sleep(delay / 1000) exit(0) else: controller = serial.Serial(port, baudrate=baud) # open the file and add a line of header to it, then close f = open(file_name, "a", newline="", encoding="utf-8") writer = csv.writer(f) # headers based on number of ports used if len(sensor_ports) == 1: header = ['Time', 'Resistance'] if len(sensor_ports)== 2: header = ['Time', 'Resistance1', 'Resistance2'] if len(sensor_ports) == 3: header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3'] if len(sensor_ports) == 4: header = ['Time', 'Resistance1', 'Resistance2', 'Resistance3', 'Resistance4'] writer.writerow(header) f.close() while controller.isOpen(): try: read_data = controller.readline().decode("utf-8") # use numpy so it can make list calculations easier (and possibly faster) dat_list = np.asarray(json.loads(read_data), dtype=np.float32)[:SENSORS_MAX] dat_sel = np.take(dat_list, sensor_ports) # if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below # dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors #dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # *2 <-- change with actual formula for ammonia concentration # created a new array to convert resistance values to sci notation r_arr2 = np.empty(len(r_arr), dtype=object) for i in range(len(r_arr)) : a = np.format_float_scientific(r_arr[i], precision = 2) r_arr2[i] = a # write + export values as .csv format # converted resistance values in array to scientific notation dat = f", ".join(np.insert(r_arr2.astype(str), 0, datetime.now().strftime('%H:%M:%S'))) print(dat) f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() except KeyboardInterrupt as e: print(e.__class__.__name__) break except (json.decoder.JSONDecodeError, UnicodeDecodeError): print('decoder error')