""" Read raw data from Arduino and then converted into actual resistance using parameters provided from top level """ from datetime import datetime import serial, time, json import serial.tools.list_ports import numpy as np # constant settings SENSORS_MAX = 4 # maximum sensor ports def read(): baud = 19200 settings = json.load(open('settings.json', 'r')) resolution = settings["resolution"] # this is not in use, but just in case if we decide to switch back to analogRead() instead of ESP32's analogReadMiliVolts() on the firmware v_in = settings["v_in"] refRes = np.array(settings["refRes"]) sensor_ports = np.array(settings["sensor_ports"]) # ports that sensor(s) are connected to file_name = settings["file_name"] port = settings["port"] delay = settings["delay"] if np.any(sensor_ports >= SENSORS_MAX): raise ValueError("Port range is 0-3!") if "- No Device -" in port: while True: dat_list = np.random.randint(0, 3300, len(sensor_ports)) # create a randomized voltage data # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # write + export values as .csv format dat = f", ".join(np.insert(r_arr.astype(str), 0, datetime.now().strftime('%H:%M:%S'))) print(dat) f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() time.sleep(delay / 1000) exit(0) else: controller = serial.Serial(port, baudrate=baud) # TODO: separate data in each run but still keep them in one csv file while controller.isOpen(): try: read_data = controller.readline().decode("utf-8") # use numpy so it can make list calculations easier (and possibly faster) dat_list = np.asarray(json.loads(read_data), dtype=np.uint32)[:SENSORS_MAX] # if we decided to switch back to analogRead(), replace this line of comment to the algorithm to something like the commented line below # dat_sel = np.take(dat_list, sensor_ports) * v_in / resolution # take only the nonzero indices, and truncated to two decimal places to "filter" out some hardware errors dat_sel = np.trunc((np.take(dat_list, sensor_ports) / 1000) * 10**2) / 10**2 r_arr = np.take(refRes, sensor_ports) * (v_in / dat_sel - 1) # write + export values as .csv format dat = f", ".join(np.insert(r_arr.astype(str), 0, datetime.now().strftime('%H:%M:%S'))) print(dat) f = open(file_name, "a", newline="", encoding="utf-8") f.write(dat + '\n') f.close() # except KeyboardInterrupt as e: # print(e.__class__.__name__) # break except (json.decoder.JSONDecodeError, UnicodeDecodeError): print('decoder error')