Source code for poseyctrl.csvw
import time
import queue
import signal
from multiprocess import Queue, Process
[docs]class CSVWriter:
def __init__(self, qin: Queue, prefix: str =""):
self.log = CSVWriterLogger()
self.process = None
self.qin = qin
self.prefix = prefix
self.quit = False
self.files = {}
[docs] def stop_gracefully(self, wait=True):
self.qin.put(('quit', time.time(), {}))
if wait:
self.process.join()
[docs] def close(self):
for id in self.files:
if self.files[id] is not None:
self.files[id].close()
self.files = {}
[docs] def loop(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
while True:
# Handle next message.
try:
msg = self.qin.get_nowait()
sig = msg[0]
if self.quit:
self.close()
break
elif sig == 'quit':
self.close()
break
else:
sig, t, data = msg
if sig not in self.files:
self.files[sig] = open(f'{self.prefix}data.{sig}.csv', 'w')
self.files[sig].write('pctime,' + ','.join(data.keys()) + '\n')
self.files[sig].write('"' + str(t) + '",' + ','.join([str(x) for x in data.values()]) + '\n')
except queue.Empty:
time.sleep(1)
self.log.info('Finished loop.')
[docs] def start(self):
self.log.info('Starting process...')
self.process = Process(target=CSVWriter.loop, args=(self,))
self.process.start()
self.log.info('Returning control.')
[docs] def stop(self):
if self.process is not None and self.process.is_alive():
self.log.info('Stopping process...')
self.process.terminate()
self.log.info('Joining...')
self.process.join(1)
self.process = None
self.log.info('Shutdown complete')
else:
self.process = None
Last update:
May 05, 2023