Source code for poseyctrl.hil
from inspect import trace
import traceback
import time
import datetime as dt
import logging
import math
import os
from typing import Optional
from multiprocess import Queue
import numpy as np
import pyposey as pyp
[docs]class PoseyHILStats:
def __init__(self, log, delay=3):
self.log = log
self.start_time = time.time()
self.last_update = self.start_time
self.last_timestamp = 0
self.last_Vbatt = 0
self.delay = delay
self.bytes = 0
self.task = 0
self.datasummary = 0
self.imu = 0
self.ble = 0
[docs] def add_task(self, timestamp, bytes, Vbatt):
self.bytes += bytes
self.task += 1
self.last_timestamp = timestamp
self.last_Vbatt = Vbatt/255.0*4.2 + 3.2
[docs] def runtime(self):
rt = time.time() - self.start_time
return f'{str(dt.timedelta(seconds=math.floor(rt)))} / MCU {str(dt.timedelta(seconds=math.floor(self.last_timestamp*1e-6)))}'
[docs] def log_stats(self):
now = time.time()
dt = now - self.last_update
if dt >= self.delay:
batt_pct = (self.last_Vbatt - 3.3)/(4.2 - 3.3)*100.0
self.log.info(f'RT: [{self.runtime()}] Batt: {self.last_Vbatt:.2f}V ({batt_pct:.0f}%) Rates: [{self.stats("T", self.task, dt)} {self.stats("I", self.imu, dt)} {self.stats("B", self.ble, dt, postfix="dps")} {self.stats("BW", self.bytes/1024.0, dt, postfix="KBps")}]')
self.bytes = 0
self.task = 0
self.datasummary = 0
self.imu = 0
self.ble = 0
self.last_update = now
[docs]class PoseyHILReceiveMessages:
def __init__(self):
self.taskwaist = pyp.tasks.TaskWaistTelemetryMessage()
self.taskwatch = pyp.tasks.TaskWatchTelemetryMessage()
self.command = pyp.control.CommandMessage()
self.datasummary = pyp.control.DataSummaryMessage()
self.imu = pyp.platform.sensors.IMUMessage()
self.ble = pyp.platform.sensors.BLEMessage()
[docs] def register_listeners(self, ml: pyp.platform.io.MessageListener):
ml.add_listener(self.taskwatch)
ml.add_listener(self.taskwaist)
ml.add_listener(self.command)
ml.add_listener(self.datasummary)
ml.add_listener(self.imu)
ml.add_listener(self.ble)
[docs]class PoseyHIL:
def __init__(self,
name: str,
qin: Queue, qout: Queue, pq: Queue,
adv, connection, service,
output_raw: Optional[str] = None):
self.log = logging.getLogger(f'posey.{name}')
self.stats = PoseyHILStats(self.log)
self.last_ping = 0
self.adv = adv
self.uart_conn = connection
self.uart_service = service
self.qin = qin
self.qout = qout
self.pq = pq
self.name = name
if output_raw is not None:
self.output_raw = output_raw
self.raw_serial_in_fn = f'{self.output_raw}.in.bin'
self.raw_serial_in = open(self.raw_serial_in_fn, 'wb')
self.raw_serial_out_fn = f'{self.output_raw}.out.bin'
self.raw_serial_out = open(self.raw_serial_out_fn, 'wb')
else:
self.raw_serial_in = None
self.raw_serial_out = None
self.messages = PoseyHILReceiveMessages()
self.ml = pyp.platform.io.MessageListener()
self.messages.register_listeners(self.ml)
[docs] def process_message(self, time: dt.datetime, mid: int):
sig = None
data = None
send_to_pq = False
if mid == pyp.tasks.TaskWaistTelemetry.message_id:
sig = 'taskwaist'
if self.messages.taskwaist.valid_checksum:
self.stats.add_task(
self.messages.taskwaist.message.t_start,
15 + 3,
self.messages.taskwaist.message.Vbatt)
self.messages.taskwaist.deserialize()
data = {
'sensor': self.name,
't_start': self.messages.taskwaist.message.t_start,
't_end': self.messages.taskwaist.message.t_end,
'invalid_checksum': self.messages.taskwaist.message.invalid_checksum,
'missed_deadline': self.messages.taskwaist.message.missed_deadline,
'Vbatt': self.messages.taskwaist.message.Vbatt
}
else:
self.log.error('Invalid TaskWaist checkum.')
elif mid == pyp.tasks.TaskWatchTelemetry.message_id:
sig = 'taskwatch'
if self.messages.taskwatch.valid_checksum:
self.stats.add_task(
self.messages.taskwatch.message.t_start,
12 + 3,
self.messages.taskwatch.message.Vbatt)
self.messages.taskwatch.deserialize()
data = {
'sensor': self.name,
't_start': self.messages.taskwatch.message.t_start,
't_end': self.messages.taskwatch.message.t_end,
'invalid_checksum': self.messages.taskwatch.message.invalid_checksum,
'missed_deadline': self.messages.taskwatch.message.missed_deadline,
'Vbatt': self.messages.taskwatch.message.Vbatt
}
else:
self.log.error('Invalid TaskWatch checkum.')
elif mid == pyp.control.Command.message_id:
# If we get a command message, it must be an acknowledgement. Store
# in a queue to be retrieved later.
send_to_pq = True
sig = 'command'
if self.messages.command.valid_checksum:
self.messages.command.deserialize()
data = {
'sensor': self.name,
'command': self.messages.command.message.command,
'payload': self.messages.command.message.payload,
'ack': self.messages.command.message.ack}
else:
self.log.error('Invalid Command checkum.')
elif mid == pyp.control.DataSummary.message_id:
# If we get a data summary message, we're going to want to use it
# to provide information on our download.
send_to_pq = True
sig = 'datasummary'
if self.messages.datasummary.valid_checksum:
self.messages.datasummary.deserialize()
data = {
'sensor': self.name,
'datetime': self.messages.datasummary.message.datetime.tobytes().decode('UTF-8'),
'start_ms': self.messages.datasummary.message.start_ms,
'end_ms': self.messages.datasummary.message.end_ms,
'bytes': self.messages.datasummary.message.bytes}
else:
self.log.error('Invalid DataSummary checkum.')
elif mid == pyp.platform.sensors.IMUData.message_id:
sig = 'imu'
if self.messages.imu.valid_checksum:
self.stats.add_imu()
self.messages.imu.deserialize()
data = {
'sensor': self.name,
'time': self.messages.imu.message.time,
'Ax': self.messages.imu.message.Ax,
'Ay': self.messages.imu.message.Ay,
'Az': self.messages.imu.message.Az,
'Qi': self.messages.imu.message.Qi,
'Qj': self.messages.imu.message.Qj,
'Qk': self.messages.imu.message.Qk,
'Qr': self.messages.imu.message.Qr
}
else:
self.log.error('Invalid IMU checkum.')
elif mid == pyp.platform.sensors.BLEData.message_id:
sig = 'ble'
if self.messages.ble.valid_checksum:
self.stats.add_ble()
self.messages.ble.deserialize()
data = {
'sensor': self.name,
'time': self.messages.ble.message.time,
'uuid': '{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}'.format(
*self.messages.ble.message.uuid[::-1]),
'major': self.messages.ble.message.major,
'minor': self.messages.ble.message.minor,
'power': self.messages.ble.message.power,
'rssi': self.messages.ble.message.rssi}
else:
self.log.error('Invalid BLE checkum.')
else:
self.log.error(f'Invalid message ID: {mid}')
if sig is not None:
self.qout.put((sig, time, data))
if send_to_pq:
self.pq.put((sig, time, data))
[docs] def send(self, cmd):
try:
if hasattr(cmd, 'buffer'):
tx = cmd.buffer.buffer.tobytes()
else:
tx = cmd
if self.raw_serial_out is not None:
self.raw_serial_out.write(tx)
if self.uart_conn.connected:
self.uart_service.write(tx)
return True
except:
self.log.error('Sending failed')
self.log.error(traceback.format_exc())
return False
[docs] def close(self):
if self.raw_serial_in is not None:
self.raw_serial_in.close()
self.raw_serial_in = None
if os.path.getsize(self.raw_serial_in_fn) == 0:
self.log.warning(f'Input file {self.raw_serial_in_fn} is empty, removing...')
os.remove(self.raw_serial_in_fn)
if self.raw_serial_out is not None:
self.raw_serial_out.close()
self.raw_serial_out = None
if os.path.getsize(self.raw_serial_out_fn) == 0:
self.log.warning(f'Output file {self.raw_serial_out_fn} is empty, removing...')
os.remove(self.raw_serial_out_fn)
[docs] def read_uart(self, size: int = -1):
if size < 0:
size = self.uart_service.in_waiting
data = self.uart_service.read(size)
if data is not None:
data = bytes(data)
return data
[docs] def process_uart(self, decode_messages=True):
to_read = 0
data = None
if self.uart_conn and self.uart_conn.connected:
if decode_messages:
to_read = min(self.uart_service.in_waiting, self.ml.free)
else:
to_read = self.uart_service.in_waiting
if to_read > 0:
data = self.read_uart(to_read)
if (data is not None) and (self.raw_serial_in is not None):
self.raw_serial_in.write(data)
if decode_messages:
if data is not None:
self.ml.write(data)
mid = self.ml.process_next()
if mid >= 0:
self.process_message(dt.datetime.now(), mid)
# This is unnecessary, but just in case we want it sometime in the future.
# self.keep_alive()
return to_read
[docs] def decode_buffer(self, buffer):
try:
N = len(buffer)
bytes_left = N
while bytes_left > 0:
to_read = min(bytes_left, self.ml.free)
if to_read > 0:
si = N - bytes_left
ei = si + to_read
data = bytes(buffer[si:ei])
self.ml.write(data)
bytes_left -= to_read
while True:
mid = self.ml.process_next()
if mid >= 0:
self.process_message(dt.datetime.now(), mid)
else:
break
self.log.info("Dumping to CSV, this may take a while...")
iter = 0
while not self.qin.empty():
iter += 1
if (iter % 30) == 0:
self.log.info(" - Still waiting for queue to empty...")
time.sleep(1)
except KeyboardInterrupt:
self.log.info("Keyboard interrupt, stopping...")
[docs] def keep_alive(self):
if self.uart_conn and self.uart_conn.connected:
t = time.time()
if (t - self.last_ping) > 1:
# self.log.info("Sending ping.")
self.last_ping = t
self.send(bytes("Stayin alive!\n", "utf-8"))
Last update:
May 05, 2023