Source code for poseyctrl.apps.posey_decode_bin
import os
import datetime as dt
import time
from multiprocess import Queue
from poseyctrl import csvw
from poseyctrl import hil
import argparse
[docs]def posey_decode_bin():
    parser = argparse.ArgumentParser()
    parser.add_argument("input", type=str, help="Input bin.")
    parser.add_argument("output", type=str, default=".", nargs="?",
        help="Output directory.")
    parser.add_argument("-p", "--prefix", type=str, default=None,
        help="Output prefix.")
    args = parser.parse_args()
    if not os.path.isfile(args.input):
        print(f"Error: input file does not exist! -> {args.input}")
    if not os.path.isdir(args.output):
        print(f"Error: output directory does not exist! -> {args.output}")
    if args.prefix is None:
        args.prefix = os.path.basename(args.input) \
            .replace(".raw", "") \
            .replace(".in", "") \
            .replace(".out", "") \
            .replace(".bin", "")
    print(f"Processing {args.input} -> {args.output}/{args.prefix}.*")
    inp = open(args.input, 'rb').read()
    os.chdir(args.output)
    qin = Queue()
    qout = Queue()
    pq = Queue()
    csvwriter = csvw.CSVWriter(qin, prefix=f"{args.prefix}.")
    sensor = hil.PoseyHIL(
        args.prefix, qout, qin, pq,
        None, None, None, output_raw = None)
    try:
        print(f"Reading {args.input}...")
        csvwriter.start()
        N = len(inp)
        bytes_left = N
        rows = {1: 0, 2: 0, 200: 0, 201: 0}
        while bytes_left > 0:
            to_read = min(bytes_left, sensor.ml.free)
            if to_read > 0:
                si = N - bytes_left
                ei = si + to_read
                data = bytes(inp[si:ei])
                sensor.ml.write(data)
                bytes_left -= to_read
            while True:
                mid = sensor.ml.process_next()
                if mid >= 0:
                    rows[mid] += 1
                    sensor.process_message(dt.datetime.now(), mid)
                else:
                    break
        print("Dumping to CSV, this may take a while...")
        iter = 0
        while not qin.empty():
            iter += 1
            if (iter % 30) == 0:
                print(" - Still waiting for queue to empty...")
            time.sleep(1)
    except KeyboardInterrupt:
        print("Keyboard interrupt, stopping...")
    print("Done.")
    csvwriter.stop_gracefully()
if __name__ == "__main__":
    posey_decode_bin()
  
    
      Last update:
      May 05, 2023