Source code for poseyctrl.apps.posey_cmd

from logging import getLogger

import time
import argparse
import logging
import traceback
from multiprocess import Queue
import queue
import bleak
import datetime as dt
from enum import Enum
import numpy as np
import json

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import Advertisement

from poseyctrl.sensor import PoseySensor

from pyposey import MessageAck
from pyposey.control import CommandType, CommandMessage


[docs]def posey_cmd(): def confirm() -> bool: """ Ask user to enter Y or N (case-insensitive). :param return: True if the answer is Y. """ answer = "" while answer not in ["y", "n"]: answer = input("Continue? [Y/N]? ").lower() return answer == "y" # Process arguments. parser = argparse.ArgumentParser( "posey-cmd", description="Send a command to a posey hub device.") parser.add_argument("sensor", type=str, help="Sensor to connect to.") parser.add_argument("command", type=str, help="Command to issue.", choices=["noop", "reboot", "startrecording", "stoprecording", "datasummary", "download", "flasherase"]) parser.add_argument("-t", "--timeout", type=float, default=10, help="Timeout (seconds) to scan for BLE sensor devices.") parser.add_argument("-d", "--debug", action="store_true", default=False, help="Enable debug logging.") parser.add_argument("-l", "--log", action="store_true", default=False, help="Output log to file.") parser.add_argument("-f", "--force", action="store_true", default=False, help="Force command without confirmation.") args = parser.parse_args() # Configure logger. dtnow = dt.datetime.now().strftime('%Y%m%d_%H%M%S') nowstamp = f"{dtnow}-posey-cmd-{args.sensor}-{args.command}" handlers = [logging.StreamHandler()] if args.log: handlers.append(logging.FileHandler(f"{nowstamp}.log")) logging.basicConfig( handlers=handlers, datefmt='%H:%M:%S', format='{name:.<15} {asctime}: [{levelname}] {message}', style='{', level=logging.DEBUG if args.debug else logging.INFO) log = getLogger("main") getLogger("asyncio").setLevel(logging.CRITICAL) device_name = args.sensor log.info(f"Start time: {dt.datetime.now().astimezone().replace(microsecond=0).isoformat()}") log.info(f"Sensor: {device_name}") log.info(f"Scan timeout: {args.timeout}") # Config. qin = Queue() qout = Queue() pq = Queue() # Confirmation. if not args.force: if args.command == "flasherase": log.warning("This will erase all data on the device!") if not confirm(): log.info("Aborting.") return elif args.command == "download": log.warning("This will stop recording to download data!") log.info("If you haven't already stopped recording, you should do") log.info("that instead, otherwise the end timestamp is sometimes invalid.") if not confirm(): log.info("Aborting.") return elif args.command == "startrecording": log.warning("Starting a new recording will delete any existing data!") log.info("You may want to download the existing data first.") if not confirm(): log.info("Aborting.") return elif args.command == "stoprecording": log.warning("Are you sure you want to stop recording?") if not confirm(): log.info("Aborting.") return elif args.command == "reboot": log.warning("This will stop recording and invalidate the end timestamp!") log.info("If the device is recording stop it first.") if not confirm(): log.info("Aborting.") return # Find sensors. log.info(f"Scanning for Posey sensor {device_name}...") ble = BLERadio() device_adv = None for adv in ble.start_scan(Advertisement, timeout=args.timeout): if adv.complete_name is None: continue cn = adv.complete_name.lower() if ("posey" in cn) and (device_name.lower() in cn): name = adv.complete_name log.info(f"Found Posey {name} (Address: {adv.address.string})") device_adv = adv ble.stop_scan() break if device_adv is None: log.error("Device not found!") raise RuntimeError("Could not find Posey sensor!") device_name = device_adv.complete_name log.info(f"Connecting to {device_adv.complete_name}.") sensor = PoseySensor(device_name, ble, device_adv, qout, qin, pq, nowstamp) log.info(f"Connecting to device {sensor}") if sensor.connect(): log.info(" - Connected.") else: log.error(" - Failed to connect to BLE device.") raise RuntimeError("Could not connect to Posey sensor!") def wait_for_ack(timeout=60): log.info("Waiting for ack...") # Check for ack. t0 = time.time() while True: try: (sig, _, data) = pq.get_nowait() if sig == 'command': log.info(f"Found ack: 0x{data['ack']:02x}") return data if (timeout != None) and ((time.time() - t0) > timeout): log.error("Timeout while waiting for ack!") raise Exception("Timeout") except queue.Empty: pass sensor.hil.process_uart() time.sleep(0.1) def wait_for_datasummary(timeout=60): log.info("Waiting for datasummary...") # Check for ack. t0 = time.time() data_summary = None while True: try: (sig, sig_time, data) = pq.get_nowait() if sig == 'datasummary': data_summary = data log.info("Got DataSummary message:") log.info(json.dumps( data_summary, indent=4)) return data_summary if (timeout != None) and ((time.time() - t0) > timeout): log.error("Timeout while waiting for ack!") raise Exception("Timeout") except queue.Empty: pass sensor.hil.process_uart() time.sleep(0.1) def wait_for_download(buffer): bytes = len(buffer) log.info("Waiting for %.2f MB...", bytes/1024.0/1024.0) bytes_left = bytes t0 = time.time() tu = t0 bu = 0 di = 0 while bytes_left > 0: data = sensor.hil.read_uart() if data is not None: data_len = len(data) bytes_left -= data_len de = di + data_len if de > bytes: de = bytes data = data[:(de - di)] buffer[di:de] = np.frombuffer(data, 'u1') di = de else: data_len = 0 dt = 1.0*(time.time() - tu) if (dt > 10) or (bytes_left <= 0): bytes_read = bytes - bytes_left log.info("Waiting for %.2f/%.2f MB (%.2f%%, %.2f KBps)", bytes_left/1024.0/1024.0, bytes/1024.0/1024.0, 100.0*bytes_left/bytes, (bytes_read - bu)/1024.0/dt) tu = time.time() bu = bytes_read if data_len == 0: time.sleep(0.1) return bytes_read # Send command. cmd = CommandMessage() cmd.message.ack = MessageAck.Expected expected_ack = MessageAck.OK if args.command == 'noop': cmd.message.command = CommandType.NoOp elif args.command == 'reboot': cmd.message.command = CommandType.Reboot elif args.command == 'startrecording': cmd.message.command = CommandType.StartCollecting cmd.message.payload = np.frombuffer( dt.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %Z %z").encode('UTF-8'), dtype='u1') expected_ack = MessageAck.Working log.info("Data recording will start after flash erase. This may take up to a few minutes.") elif args.command == 'flasherase': cmd.message.command = CommandType.FullFlashErase expected_ack = MessageAck.Working log.info("A full flash erase may take up to a few minutes (typical 2m30s).") elif (args.command == 'stoprecording'): cmd.message.command = CommandType.StopCollecting elif (args.command == 'download') or (args.command == 'datasummary'): cmd.message.command = CommandType.GetDataSummary cmd.serialize() sensor.hil.send(cmd) log.info(f'Sent init command for {args.command}: 0x{cmd.message.command:02x} {cmd.message.command_str()}') # Wait for ack. data = wait_for_ack() if data['ack'] != expected_ack: log.error(f"Bad ack returned after init: 0x{data['ack']:02x}") elif args.command == 'flasherase': log.info('Waiting for acknowledgement that flash erase completed...') data = wait_for_ack() if data['ack'] != MessageAck.OK: log.error(f"Unexpected ack in response to flash erase: 0x{data['ack']:02x}") elif args.command == 'startrecording': log.info('Waiting for acknowledgement that recording started...') data = wait_for_ack() if data['ack'] != MessageAck.OK: log.error(f"Unexpected ack in response to start recording: 0x{data['ack']:02x}") elif args.command == 'datasummary': data_summary = wait_for_datasummary() if data_summary is None: log.error("No DataSummary returned!") elif args.command == 'download': data_summary = wait_for_datasummary() if data_summary is None: log.error("No DataSummary returned!") else: bytes = data_summary['bytes'] log.info("Allocating download buffer for %2f MB...", bytes/1024.0/1024.0) download_buffer = np.empty(bytes, 'u1') cmd.message.command = CommandType.DownloadData cmd.serialize() sensor.hil.send(cmd) log.info(f'Sent download command: 0x{cmd.message.command:02x} {cmd.message.command_str()}') # Wait on data summary and ack. data = wait_for_ack() if data['ack'] != MessageAck.Working: log.error(f"Unexpected ack in response to download: 0x{data['ack']:02x}") else: # Wait for download. bytes_read = wait_for_download(download_buffer) if bytes_read < bytes: log.error("Only read %d of %d bytes!", bytes_read, bytes) fn = f"{dtnow}-{device_name.replace(' ', '')}-download.npz" log.info("Dumping downloaded data to file: %s", fn) np.savez(fn, summary=data_summary, data=download_buffer, allow_pickle=True) log.info("Done!") # elif args.command == 'record': # # Wait for keyboard interrupt, then send stop. # log.info("Data is recording. Terminate recording with Ctrl+C (keyboard interrupt)") # try: # while True: # # Connected? # if not sensor.connected: # log.warning(f"Sensor {sensor.name} disconnected. Reconnecting...") # try: # sensor.connect() # except KeyboardInterrupt: # raise # except bleak.exc.BleakError as e: # msg = e.message if hasattr(e, 'message') else e # log.warning(f"Bleak error: {msg}") # except: # log.info("Exception on connect:") # traceback.print_exc() # if sensor.connected: # log.info("Reconnect successful") # else: # log.error(f"Could not reconnect to {sensor.name}!") # continue # # Collect data. # sensor.hil.process_uart() # # If time, print statistics. # sensor.hil.stats.log_stats() # time.sleep(0.2) # except KeyboardInterrupt: # log.info("Keyboard interrupt, stopping record.") # except: # traceback.print_exc() # # Send stop command. # cmd.message.command = CommandType.StopCollecting # cmd.serialize() # sensor.hil.send(cmd) # log.info(f'Sent stop command: 0x{cmd.message.command:02x} {cmd.message.command_str()}') # # Wait for ack. # data = wait_for_ack() # if data['ack'] != MessageAck.OK: # log.error(f"Bad ack returned from stop: 0x{data['ack']:02x}") log.info("Disconnecting sensor...") sensor.disconnect() sensor.hil.close()
if __name__ == "__main__": posey_cmd()

Last update: May 05, 2023