Source code for adsb.sbs.archive

"""
This module implements a SBS message archive writing and reading capability.
It can be used to save messages to file so that they can be replayed or
analysed at a later time.

An archive is simply a recording of the message lines that were received
from the server. Each message line item is prefixed with a timestamp and
terminated with a new line. By associating a timestamp with the message in the
log file the messages can then be replayed at different rates.
"""

import datetime
import logging
import os


logger = logging.getLogger(__name__)


[docs]class RotatingArchiveFileHandler(object): """ Base class for handlers that record to disk files and rotate log files at a certain point. Handler for logging to a set of files, which switches from one file to the next when the current file reaches a certain size. Shamelessly duplicated from the Python standard library logging module with deep class hierarchy condensed and stripped of logger record specifics. """ terminator = "\r\n" # Use the same delimiter as used on SBS stream def __init__(self, filename, mode="a", maxBytes=0, backupCount=0, encoding=None): """ Open the specified file and use it as the stream for logging. By default, the file grows indefinitely. You can specify particular values of maxBytes and backupCount to allow the file to rollover at a predetermined size. Rollover occurs whenever the current log file is nearly maxBytes in length. If backupCount is >= 1, the system will successively create new files with the same pathname as the base file, but with extensions ".1", ".2" etc. appended to it. For example, with a backupCount of 5 and a base file name of "app.log", you would get "app.log", "app.log.1", "app.log.2", ... through to "app.log.5". The file being written to is always "app.log" - when it gets filled up, it is closed and renamed to "app.log.1", and if files "app.log.1", "app.log.2" etc. exist, then they are renamed to "app.log.2", "app.log.3" etc. respectively. If maxBytes is zero, rollover never occurs. """ # If rotation/rollover is wanted, it doesn't make sense to use another # mode. If for example 'w' were specified, then if there were multiple # runs of the calling application, the logs from previous runs would be # lost if the 'w' is respected, because the log file would be truncated # on each run. if maxBytes > 0: mode = "a" self.baseFilename = os.path.abspath(filename) self.mode = mode self.encoding = encoding self.stream = self._open() self.namer = None self.rotator = None self.maxBytes = maxBytes self.backupCount = backupCount
[docs] def flush(self): """ Flushes the stream. """ if self.stream and hasattr(self.stream, "flush"): self.stream.flush()
[docs] def close(self): """ Closes the stream. """ if self.stream: self.flush() if hasattr(self.stream, "close"): self.stream.close() self.stream = None
def _open(self): """ Open the base file with the (original) mode and encoding. Return the resulting stream. """ return open(self.baseFilename, self.mode, encoding=self.encoding)
[docs] def emit(self, record: str): """ Emit a record to the log file, catering for rollover as described in doRollover(). In this case the record is a SBS format message line. It does not require any extra formatting. This method overrides the default logging behaviour so that SBS messsages can be written to file directly. A timestamp, in UTC, is added to facilitate replaying a session log at a rate faster than real time while keeping the relative spacing between the messages. """ if self.stream is None: logger.warning("Attempted to write to archive but no stream exists") return try: if self.shouldRollover(record): self.doRollover() stream = self.stream msg = record timestamp = datetime.datetime.now(tz=datetime.timezone.utc) stream.write(f"{timestamp.isoformat()},{msg}{self.terminator}") self.flush() except Exception: logger.exception("Problem storing message to session archive")
[docs] def rotation_filename(self, default_name): """ Modify the filename of a log file when rotating. This is provided so that a custom filename can be provided. The default implementation calls the 'namer' attribute of the handler, if it's callable, passing the default name to it. If the attribute isn't callable (the default is None), the name is returned unchanged. :param default_name: The default name for the log file. """ if not callable(self.namer): result = default_name else: result = self.namer(default_name) return result
[docs] def rotate(self, source, dest): """ When rotating, rotate the current log. The default implementation calls the 'rotator' attribute of the handler, if it's callable, passing the source and dest arguments to it. If the attribute isn't callable (the default is None), the source is simply renamed to the destination. :param source: The source filename. This is normally the base filename, e.g. 'test.log' :param dest: The destination filename. This is normally what the source is rotated to, e.g. 'test.log.1'. """ if not callable(self.rotator): if os.path.exists(source): os.rename(source, dest) else: self.rotator(source, dest)
[docs] def doRollover(self): """ Do a rollover, as described in __init__(). """ if self.stream: self.stream.close() self.stream = None if self.backupCount > 0: for i in range(self.backupCount - 1, 0, -1): sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i)) dfn = self.rotation_filename("%s.%d" % (self.baseFilename, i + 1)) if os.path.exists(sfn): if os.path.exists(dfn): os.remove(dfn) os.rename(sfn, dfn) dfn = self.rotation_filename(self.baseFilename + ".1") if os.path.exists(dfn): os.remove(dfn) self.rotate(self.baseFilename, dfn) self.stream = self._open()
[docs] def shouldRollover(self, record): """ Determine if rollover should occur. Basically, see if the supplied record would cause the file to exceed the size limit we have. """ if self.stream is None: # delay was set... self.stream = self._open() if self.maxBytes > 0: # are we rolling over? msg = "{}{}".format(record, self.terminator) self.stream.seek(0, 2) # due to non-posix-compliant Windows feature if self.stream.tell() + len(msg) >= self.maxBytes: return 1 return 0
[docs]def read_archive(archive_file): """ This generator yields lines from a session archive log file. Each line is returned as a 2-tuple containing a UTC timestamp and the SBS format message bytes. :param archive_file: An archive file """ if os.path.exists(archive_file): with open(archive_file, "rb") as f: for line in f: timestamp, msg_bytes = line.split(b",", 1) yield timestamp.decode(), msg_bytes