Source code for poseyctrl.apps.posey_listen

from logging import getLogger

import argparse
import logging
import traceback
from multiprocess import Queue
import bleak
import datetime as dt

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import Advertisement

from poseyctrl.sensor import PoseySensor

[docs]def posey_listen(): # Process arguments. parser = argparse.ArgumentParser("posey-listen", description="Listens to a posey device.") parser.add_argument("sensor", type=str, help="Sensor to connect to.") parser.add_argument("-t", "--timeout", type=float, default=10, help="Timeout (seconds) to scan for BLE sensor devices.") parser.add_argument("-d", "--debug", action="store_true", default=False, help="Enable debug logging.") parser.add_argument("-l", "--log", action="store_true", default=False, help="Output log to file.") parser.add_argument("-r", "--min-rssi", type=float, default=-100, help="Minimum device RSSI to connect to.") args = parser.parse_args() # Configure logger. handlers = [logging.StreamHandler()] dtnow = dt.datetime.now().strftime('%Y%m%d_%H%M%S') nowstamp = f"{dtnow}-posey-extract" nowstamp = f"posey-listen-{args.sensor}-{dtnow}" if args.log: handlers.append(logging.FileHandler(f"{nowstamp}.log")) logging.basicConfig( handlers=handlers, datefmt='%H:%M:%S', format='{name:.<15} {asctime}: [{levelname}] {message}', style='{', level=logging.DEBUG if args.debug else logging.INFO) log = getLogger("main") getLogger("asyncio").setLevel(logging.CRITICAL) device_name = args.sensor log.info(f"Start time: {dt.datetime.now().astimezone().replace(microsecond=0).isoformat()}") log.info(f"Sensor: {device_name}") log.info(f"Scan timeout: {args.timeout}") # Config. qin = Queue() qout = Queue() pq = Queue() # Find sensors. log.info(f"Scanning for Posey sensor {device_name}...") ble = BLERadio() device_adv = None for adv in ble.start_scan(Advertisement, timeout=args.timeout, minimum_rssi=args.min_rssi): if adv.complete_name is None: continue cn = adv.complete_name.lower() if ("posey" in cn) and (device_name.lower() in cn): name = adv.complete_name log.info(f"Found Posey {name} (Address: {adv.address.string})") device_adv = adv ble.stop_scan() break if device_adv is None: log.error("Device not found!") raise RuntimeError("Could not find Posey sensor!") device_name = device_adv.complete_name log.info(f"Connecting to {device_adv.complete_name}.") sensor = PoseySensor(device_name, ble, device_adv, qout, qin, pq, nowstamp) log.info(f"Connecting to device {sensor}") if sensor.connect(): log.info(" - Connected.") else: log.error(" - Failed to connect to BLE device.") raise RuntimeError("Could not connect to Posey sensor!") try: while True: # Connected? if not sensor.connected: log.warning(f"Sensor {sensor.name} disconnected. Reconnecting...") try: sensor.connect() except KeyboardInterrupt: raise except bleak.exc.BleakError as e: msg = e.message if hasattr(e, 'message') else e log.warning(f"Bleak error: {msg}") except: log.info("Exception on connect:") traceback.print_exc() if sensor.connected: log.info("Reconnect successful") else: log.error(f"Could not reconnect to {sensor.name}!") continue # Collect data. sensor.hil.process_uart() # If time, print statistics. sensor.hil.stats.log_stats() except KeyboardInterrupt: log.info("Keyboard interrupt, breaking.") except: traceback.print_exc() log.info("Disconnecting sensor...") sensor.disconnect() sensor.hil.close()
if __name__ == "__main__": posey_listen()

Last update: May 05, 2023