Source code for poseyctrl.apps.posey_cmd
from logging import getLogger
import time
import argparse
import logging
import traceback
from multiprocess import Queue
import queue
import bleak
import datetime as dt
from enum import Enum
import numpy as np
import json
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import Advertisement
from poseyctrl.sensor import PoseySensor
from pyposey import MessageAck
from pyposey.control import CommandType, CommandMessage
[docs]def posey_cmd():
    def confirm() -> bool:
        """
        Ask user to enter Y or N (case-insensitive).
        :param return: True if the answer is Y.
        """
        answer = ""
        while answer not in ["y", "n"]:
            answer = input("Continue? [Y/N]? ").lower()
        return answer == "y"
    # Process arguments.
    parser = argparse.ArgumentParser(
        "posey-cmd",
        description="Send a command to a posey hub device.")
    parser.add_argument("sensor",
        type=str, help="Sensor to connect to.")
    parser.add_argument("command",
        type=str, help="Command to issue.",
        choices=["noop", "reboot", "startrecording", "stoprecording", "datasummary", "download", "flasherase"])
    parser.add_argument("-t", "--timeout",
        type=float, default=10,
        help="Timeout (seconds) to scan for BLE sensor devices.")
    parser.add_argument("-d", "--debug",
        action="store_true", default=False,
        help="Enable debug logging.")
    parser.add_argument("-l", "--log",
        action="store_true", default=False,
        help="Output log to file.")
    parser.add_argument("-f", "--force",
        action="store_true", default=False,
        help="Force command without confirmation.")
    args = parser.parse_args()
    # Configure logger.
    dtnow = dt.datetime.now().strftime('%Y%m%d_%H%M%S')
    nowstamp = f"{dtnow}-posey-cmd-{args.sensor}-{args.command}"
    handlers = [logging.StreamHandler()]
    if args.log:
        handlers.append(logging.FileHandler(f"{nowstamp}.log"))
    logging.basicConfig(
        handlers=handlers,
        datefmt='%H:%M:%S',
        format='{name:.<15} {asctime}: [{levelname}] {message}',
        style='{', level=logging.DEBUG if args.debug else logging.INFO)
    log = getLogger("main")
    getLogger("asyncio").setLevel(logging.CRITICAL)
    device_name = args.sensor
    log.info(f"Start time: {dt.datetime.now().astimezone().replace(microsecond=0).isoformat()}")
    log.info(f"Sensor: {device_name}")
    log.info(f"Scan timeout: {args.timeout}")
    # Config.
    qin = Queue()
    qout = Queue()
    pq = Queue()
    # Confirmation.
    if not args.force:
        if args.command == "flasherase":
            log.warning("This will erase all data on the device!")
            if not confirm():
                log.info("Aborting.")
                return
        elif args.command == "download":
            log.warning("This will stop recording to download data!")
            log.info("If you haven't already stopped recording, you should do") 
            log.info("that instead, otherwise the end timestamp is sometimes invalid.")
            if not confirm():
                log.info("Aborting.")
                return
        elif args.command == "startrecording":
            log.warning("Starting a new recording will delete any existing data!")
            log.info("You may want to download the existing data first.")
            if not confirm():
                log.info("Aborting.")
                return
        elif args.command == "stoprecording":
            log.warning("Are you sure you want to stop recording?")
            if not confirm():
                log.info("Aborting.")
                return
        elif args.command == "reboot":
            log.warning("This will stop recording and invalidate the end timestamp!")
            log.info("If the device is recording stop it first.")
            if not confirm():
                log.info("Aborting.")
                return
    # Find sensors.
    log.info(f"Scanning for Posey sensor {device_name}...")
    ble = BLERadio()
    device_adv = None
    for adv in ble.start_scan(Advertisement, timeout=args.timeout):
        if adv.complete_name is None:
            continue
        cn = adv.complete_name.lower()
        if ("posey" in cn) and (device_name.lower() in cn):
            name = adv.complete_name
            log.info(f"Found Posey {name} (Address: {adv.address.string})")
            device_adv = adv
            ble.stop_scan()
            break
    
    if device_adv is None:
        log.error("Device not found!")
        raise RuntimeError("Could not find Posey sensor!")
    
    device_name = device_adv.complete_name
    log.info(f"Connecting to {device_adv.complete_name}.")
    sensor = PoseySensor(device_name, ble, device_adv, qout, qin, pq, nowstamp)
    log.info(f"Connecting to device {sensor}")
    if sensor.connect():
        log.info(" - Connected.")
    else:
        log.error(" - Failed to connect to BLE device.")
        raise RuntimeError("Could not connect to Posey sensor!")
    def wait_for_ack(timeout=60):
        log.info("Waiting for ack...")
        # Check for ack.
        t0 = time.time()
        while True:
            try:
                (sig, _, data) = pq.get_nowait()
                if sig == 'command':
                    log.info(f"Found ack: 0x{data['ack']:02x}")
                    return data
                if (timeout != None) and ((time.time() - t0) > timeout):
                    log.error("Timeout while waiting for ack!")
                    raise Exception("Timeout")
            except queue.Empty:
                pass
            sensor.hil.process_uart()
            time.sleep(0.1)
    def wait_for_datasummary(timeout=60):
        log.info("Waiting for datasummary...")
        # Check for ack.
        t0 = time.time()
        data_summary = None
        while True:
            try:
                (sig, sig_time, data) = pq.get_nowait()
                if sig == 'datasummary':
                    data_summary = data
                    log.info("Got DataSummary message:")
                    log.info(json.dumps(
                        data_summary, indent=4))
                    return data_summary
                if (timeout != None) and ((time.time() - t0) > timeout):
                    log.error("Timeout while waiting for ack!")
                    raise Exception("Timeout")
            except queue.Empty:
                pass
            sensor.hil.process_uart()
            time.sleep(0.1)
    def wait_for_download(buffer):
        bytes = len(buffer)
        log.info("Waiting for %.2f MB...", bytes/1024.0/1024.0)
        bytes_left = bytes
        t0 = time.time()
        tu = t0
        bu = 0
        di = 0
        while bytes_left > 0:
            data = sensor.hil.read_uart()
            if data is not None:
                data_len = len(data)
                bytes_left -= data_len
                de = di + data_len
                if de > bytes:
                    de = bytes
                    data = data[:(de - di)]
                buffer[di:de] = np.frombuffer(data, 'u1')
                di = de
            else:
                data_len = 0
            dt = 1.0*(time.time() - tu)
            if (dt > 10) or (bytes_left <= 0):
                bytes_read = bytes - bytes_left
                log.info("Waiting for %.2f/%.2f MB (%.2f%%, %.2f KBps)",
                    bytes_left/1024.0/1024.0,
                    bytes/1024.0/1024.0,
                    100.0*bytes_left/bytes,
                    (bytes_read - bu)/1024.0/dt)
                tu = time.time()
                bu = bytes_read
            if data_len == 0:
                time.sleep(0.1)
        return bytes_read
            
    # Send command.
    cmd = CommandMessage()
    cmd.message.ack = MessageAck.Expected
    expected_ack = MessageAck.OK
    if args.command == 'noop':
        cmd.message.command = CommandType.NoOp
    elif args.command == 'reboot':
        cmd.message.command = CommandType.Reboot
    elif args.command == 'startrecording':
        cmd.message.command = CommandType.StartCollecting
        cmd.message.payload = np.frombuffer(
            dt.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %Z %z").encode('UTF-8'),
            dtype='u1')
        expected_ack = MessageAck.Working
        log.info("Data recording will start after flash erase. This may take up to a few minutes.")
    elif args.command == 'flasherase':
        cmd.message.command = CommandType.FullFlashErase
        expected_ack = MessageAck.Working
        log.info("A full flash erase may take up to a few minutes (typical 2m30s).")
    elif (args.command == 'stoprecording'):
        cmd.message.command = CommandType.StopCollecting
    elif (args.command == 'download') or (args.command == 'datasummary'):
        cmd.message.command = CommandType.GetDataSummary
    cmd.serialize()
    sensor.hil.send(cmd)
    log.info(f'Sent init command for {args.command}: 0x{cmd.message.command:02x} {cmd.message.command_str()}')
    # Wait for ack.
    data = wait_for_ack()
    if data['ack'] != expected_ack:
        log.error(f"Bad ack returned after init: 0x{data['ack']:02x}")
    elif args.command == 'flasherase':
        log.info('Waiting for acknowledgement that flash erase completed...')
        data = wait_for_ack()
        if data['ack'] != MessageAck.OK:
            log.error(f"Unexpected ack in response to flash erase: 0x{data['ack']:02x}")
    elif args.command == 'startrecording':
        log.info('Waiting for acknowledgement that recording started...')
        data = wait_for_ack()
        if data['ack'] != MessageAck.OK:
            log.error(f"Unexpected ack in response to start recording: 0x{data['ack']:02x}")
    elif args.command == 'datasummary':
        data_summary = wait_for_datasummary()
        if data_summary is None:
            log.error("No DataSummary returned!")
    elif args.command == 'download':
        data_summary = wait_for_datasummary()
        if data_summary is None:
            log.error("No DataSummary returned!")
        else:
            bytes = data_summary['bytes']
            log.info("Allocating download buffer for %2f MB...",
                bytes/1024.0/1024.0)
            download_buffer = np.empty(bytes, 'u1')
            cmd.message.command = CommandType.DownloadData
            cmd.serialize()
            sensor.hil.send(cmd)
            log.info(f'Sent download command: 0x{cmd.message.command:02x} {cmd.message.command_str()}')
            # Wait on data summary and ack.
            data = wait_for_ack()
            if data['ack'] != MessageAck.Working:
                log.error(f"Unexpected ack in response to download: 0x{data['ack']:02x}")
            else:
                # Wait for download.
                bytes_read = wait_for_download(download_buffer)
                if bytes_read < bytes:
                    log.error("Only read %d of %d bytes!", bytes_read, bytes)
                fn = f"{dtnow}-{device_name.replace(' ', '')}-download.npz"
                log.info("Dumping downloaded data to file: %s", fn)
                np.savez(fn, summary=data_summary, data=download_buffer, allow_pickle=True)
                log.info("Done!")
    # elif args.command == 'record':
    #     # Wait for keyboard interrupt, then send stop.
    #     log.info("Data is recording. Terminate recording with Ctrl+C (keyboard interrupt)")
    #     try:
    #         while True:
    #             # Connected?
    #             if not sensor.connected:
    #                 log.warning(f"Sensor {sensor.name} disconnected. Reconnecting...")
    #                 try:
    #                     sensor.connect()
    #                 except KeyboardInterrupt:
    #                     raise
    #                 except bleak.exc.BleakError as e:
    #                     msg = e.message if hasattr(e, 'message') else e
    #                     log.warning(f"Bleak error: {msg}")
    #                 except:
    #                     log.info("Exception on connect:")
    #                     traceback.print_exc()
    #                 if sensor.connected:
    #                     log.info("Reconnect successful")
    #                 else:
    #                     log.error(f"Could not reconnect to {sensor.name}!")
    #                     continue
    #             # Collect data.
    #             sensor.hil.process_uart()
    #             # If time, print statistics.
    #             sensor.hil.stats.log_stats()
    #             time.sleep(0.2)
    #     except KeyboardInterrupt:
    #         log.info("Keyboard interrupt, stopping record.")
    #     except:
    #         traceback.print_exc()
        
    #     # Send stop command.
    #     cmd.message.command = CommandType.StopCollecting
    #     cmd.serialize()
    #     sensor.hil.send(cmd)
    #     log.info(f'Sent stop command: 0x{cmd.message.command:02x} {cmd.message.command_str()}')
    #     # Wait for ack.
    #     data = wait_for_ack()
    #     if data['ack'] != MessageAck.OK:
    #         log.error(f"Bad ack returned from stop: 0x{data['ack']:02x}")
    log.info("Disconnecting sensor...")
    sensor.disconnect()
    sensor.hil.close()
if __name__ == "__main__":
    posey_cmd()
  
    
      Last update:
      May 05, 2023