Source code for poseyctrl.csvw
import time
import queue
import signal
from multiprocess import Queue, Process
[docs]class CSVWriter:
    def __init__(self, qin: Queue, prefix: str =""):
        self.log = CSVWriterLogger()
        self.process = None
        self.qin = qin
        self.prefix = prefix
        self.quit = False
        self.files = {}
    
[docs]    def stop_gracefully(self, wait=True):
        self.qin.put(('quit', time.time(), {}))
        if wait:
            self.process.join()
[docs]    def close(self):
        for id in self.files:
            if self.files[id] is not None:
                self.files[id].close()
        self.files = {}
[docs]    def loop(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)
        while True:
            # Handle next message.
            try:
                msg = self.qin.get_nowait()
                sig = msg[0]
                if self.quit:
                    self.close()
                    break
                elif sig == 'quit':
                    self.close()
                    break
                else:
                    sig, t, data = msg
                    if sig not in self.files:
                        self.files[sig] = open(f'{self.prefix}data.{sig}.csv', 'w')
                        self.files[sig].write('pctime,' + ','.join(data.keys()) + '\n')
                    self.files[sig].write('"' + str(t) + '",' + ','.join([str(x) for x in data.values()]) + '\n')
            except queue.Empty:
                time.sleep(1)
        self.log.info('Finished loop.')
[docs]    def start(self):
        self.log.info('Starting process...')
        self.process = Process(target=CSVWriter.loop, args=(self,))
        self.process.start()
        self.log.info('Returning control.')
[docs]    def stop(self):
        if self.process is not None and self.process.is_alive():
            self.log.info('Stopping process...')
            self.process.terminate()
            self.log.info('Joining...')
            self.process.join(1)
            self.process = None
            self.log.info('Shutdown complete')
        else:
            self.process = None
  
    
      Last update:
      May 05, 2023