Source code for poseyctrl.apps.posey_decode_bin

import os
import datetime as dt
import time

from multiprocess import Queue

from poseyctrl import csvw
from poseyctrl import hil

import argparse

[docs]def posey_decode_bin(): parser = argparse.ArgumentParser() parser.add_argument("input", type=str, help="Input bin.") parser.add_argument("output", type=str, default=".", nargs="?", help="Output directory.") parser.add_argument("-p", "--prefix", type=str, default=None, help="Output prefix.") args = parser.parse_args() if not os.path.isfile(args.input): print(f"Error: input file does not exist! -> {args.input}") if not os.path.isdir(args.output): print(f"Error: output directory does not exist! -> {args.output}") if args.prefix is None: args.prefix = os.path.basename(args.input) \ .replace(".raw", "") \ .replace(".in", "") \ .replace(".out", "") \ .replace(".bin", "") print(f"Processing {args.input} -> {args.output}/{args.prefix}.*") inp = open(args.input, 'rb').read() os.chdir(args.output) qin = Queue() qout = Queue() pq = Queue() csvwriter = csvw.CSVWriter(qin, prefix=f"{args.prefix}.") sensor = hil.PoseyHIL( args.prefix, qout, qin, pq, None, None, None, output_raw = None) try: print(f"Reading {args.input}...") csvwriter.start() N = len(inp) bytes_left = N rows = {1: 0, 2: 0, 200: 0, 201: 0} while bytes_left > 0: to_read = min(bytes_left, sensor.ml.free) if to_read > 0: si = N - bytes_left ei = si + to_read data = bytes(inp[si:ei]) sensor.ml.write(data) bytes_left -= to_read while True: mid = sensor.ml.process_next() if mid >= 0: rows[mid] += 1 sensor.process_message(dt.datetime.now(), mid) else: break print("Dumping to CSV, this may take a while...") iter = 0 while not qin.empty(): iter += 1 if (iter % 30) == 0: print(" - Still waiting for queue to empty...") time.sleep(1) except KeyboardInterrupt: print("Keyboard interrupt, stopping...") print("Done.") csvwriter.stop_gracefully()
if __name__ == "__main__": posey_decode_bin()

Last update: May 05, 2023