"""
A session maintains information about the currently visible aircraft.
"""
import asyncio
import collections
import datetime
import logging
import os
import pickle
from . import client
from . import archive
from . import aircraft
from . import message
from asyncio import AbstractEventLoop
from ..mypy_types import PositionType
logger = logging.getLogger(__name__)
[docs]class Session(object):
"""
A session maintains information about aircraft received from a SBS Client.
A session is connected to a SBS source to receive a stream of messages or
can operate in a replay mode where it is fed messages from a session log
archive.
"""
def __init__(
self,
record: bool = False,
record_file: str = None,
cache_enabled: bool = True,
cache_file: str = "session_cache.pickle",
session_threshold_minutes: int = 2,
check_interval: float = 5.0,
origin: PositionType = None,
loop: AbstractEventLoop = None,
):
"""
:param record: a boolean flag to enable recording messages to a
session log file. By default this is False. The *record_file*
argument must be supplied is this argument is True.
:param record_file: The file name to use for recorded messages.
By default this is None.
:param cache_enabled: A boolean flag that determines whether
the session aircraft should be cached into a pickle file.
This allows the session to quickly recover state after a brief
shutdown and restart. The default value is True.
:param cache_file: A filename to use for the cache file. The
default value is 'session_cache.pickle'.
:param session_threshold_minutes: The number of minutes to retain
an aircraft in the session after the last message received
from the aircraft.
:param check_interval: The number of seconds between checking
if aircraft in the cache should be discarded.
:param origin: a tuple of (lat, lon) representing a reference
location to use as the origin for distance calculations.
"""
self.loop = loop or asyncio.get_event_loop()
self.origin = origin
self.client = None # type client.Client
# `aircraft` is a dict of {'icao24': :class:`Aircraft`}
self.aircraft = {}
# Aircraft are considered lost after a period of time without
# receiving any updates. When aircraft are lost they are removed
# from the aircraft dict.
self.expiry_threshold = datetime.timedelta(minutes=session_threshold_minutes)
self.logfile = None # type: archive.RotatingArchiveFileHandler
if record and record_file is None:
raise Exception("Record is enabled but no record_file is specified!")
self.record_file = record_file
self.archiving_enabled = False
if record:
self.start_recording(self.session_file)
# Aircraft can be reloaded from a previous session cache (pickle)
# to quickly re-establish aircraft details. This is useful in
# situations where there is a brief period between application
# stop and subsequent start. If the interval is longer than the
# aircraft expiry threshold then all cache aircraft will be dropped.
self.cache_enabled = cache_enabled
self.cache_file = cache_file
if self.cache_enabled and os.path.exists(self.cache_file):
self.load_aircraft_cache()
# A monitor function scans over the aircraft at periodic intervals
# in order to discard aircraft that have not been updated recently.
# The aircraft expiry threshold is configurable.
# Start the monitor task to discard aircraft from the session
# if they have not had an update in a while.
self.cache_monitor_interval = check_interval
self.session_monitor_task = asyncio.Task(self.manage_session())
[docs] async def connect(self, host, port=30003):
""" Connect the session to a SBS interface """
self.client = client.Client(
host=host, port=port, on_raw_msg_callback=self.on_sbs_message
)
await self.client.start()
[docs] async def disconnect(self):
""" Disconnect the session from a SBS interface """
if self.client:
await self.client.stop()
self.client = None
[docs] async def close(self):
""" Stop the session """
await self.disconnect()
self.stop_recording()
if self.cache_enabled:
logger.info("Saving aircraft to session cache")
with open(self.cache_file, "wb") as cache_fd:
assert isinstance(self.aircraft, dict)
pickle.dump(self.aircraft, cache_fd)
self.session_monitor_task.cancel()
# async def replay(self, session_file, replay_rate=1):
# '''
# Replay messages from a session log file.
# A session log file line contains a timestamp and then a raw SBS
# message line. Each line is terminated with a newline character.
# :param session_file: A session log file
# :param replay_rate: A time multiplier factor to adjust the
# message replay rate. Default is 1.
# '''
# previous_timestamp = None
# if os.path.exists(session_file):
# # Adjust the cache monitor interval so that planes expire out
# # of the cache at a duration appropriate for the replay rate.
# cache_monitor_interval_orig = self.cache_monitor_interval
# self.cache_monitor_interval = (
# self.cache_monitor_interval / replay_rate)
# with open(session_file, 'r') as f:
# for line in f:
# line = line.rstrip()
# timestamp, msg = line.split(',', 1)
# timestamp = datetime.datetime.strptime(
# timestamp, "%Y%m%dT%H%M%S.%f")
# adjusted_delay = 0.01
# if previous_timestamp:
# interval = timestamp - previous_timestamp
# delay = interval.total_seconds()
# adjusted_delay = delay / replay_rate
# previous_timestamp = timestamp
# # Don't bother delaying for very small time intervals.
# if adjusted_delay > 0.04:
# await asyncio.sleep(adjusted_delay)
# self.on_sbs_message(msg)
# self.cache_monitor_interval = cache_monitor_interval_orig
[docs] def start_recording(
self, record_file: str = None, maxBytes: int = 2 ** 23, backupCount: int = 3
):
"""
Start recording session messages to log file.
"""
record_file = record_file or self.record_file
if record_file is None:
raise Exception("No session log file specified")
if not self.archiving_enabled:
self.logfile = archive.RotatingArchiveFileHandler(
record_file, maxBytes=maxBytes, backupCount=backupCount
)
self.archiving_enabled = True
else:
logger.warning(
"Attempted to start session recording but session is "
"already being recorded!"
)
[docs] def stop_recording(self):
"""
Stop recording session messages to file.
"""
if self.archiving_enabled:
self.archiving_enabled = False
if self.logfile:
self.logfile.close()
self.logfile = None
[docs] def on_sbs_message(self, msg_str: str):
"""
Process a SBS message line string.
This method is typically called by a SBS client. However, it may
also be called when replaying a session file.
"""
now = datetime.datetime.now(tz=datetime.timezone.utc)
msg = message.fromString(msg_str)
if msg.hex_ident == "000000":
logger.warning("Invalid ICAO code detected: {}".format(msg.hex_ident))
return
if msg.message_type == message.MessageType.Transmission:
if self.archiving_enabled:
self.logfile.emit(msg_str)
if msg.hex_ident not in self.aircraft:
ac = aircraft.Aircraft(msg.hex_ident)
# Add origin info to the aircraft so distance calculations can be
# performed.
ac.origin = self.origin
self.aircraft[msg.hex_ident] = ac
logger.info("New session aircraft: %s", msg.hex_ident)
ac = self.aircraft[msg.hex_ident]
ac.last_seen = now
ac.msg_count += 1
if (
msg.transmission_type
== message.TransmissionType.ES_IDENT_AND_CATEGORY.value
):
if ac.callsign != msg.callsign:
timestamp = now
ac.update_ident(msg.callsign, timestamp)
elif msg.transmission_type in [
message.TransmissionType.ES_SURFACE_POS.value,
message.TransmissionType.ES_AIRBORNE_POS.value,
]:
timestamp = now
ac.update_position(msg.altitude, msg.lat, msg.lon, timestamp)
elif (
msg.transmission_type == message.TransmissionType.ES_AIRBORNE_VEL.value
):
timestamp = now
ac.update_motion(
msg.ground_speed, msg.track, msg.vertical_rate, timestamp
)
elif msg.transmission_type in [
message.TransmissionType.AIR_TO_AIR.value,
message.TransmissionType.SURVEILLANCE_ALT.value,
]:
timestamp = now
ac.update_altitude(msg.altitude, timestamp)
[docs] def load_aircraft_cache(self):
""" Initialise the aircraft cache using the cache file. """
with open(self.cache_file, "rb") as cache_fd:
cached_aircraft = pickle.load(cache_fd)
self.discard_lost_aircraft(cached_aircraft)
if cached_aircraft:
logger.info(
"Recovered {} aircraft from session cache".format(
len(cached_aircraft)
)
)
assert isinstance(cached_aircraft, dict)
# for icao_id, ac in cached_aircraft.items():
# if not ac.details:
# self.enqueue_lookup(icao_id)
self.aircraft = cached_aircraft
[docs] def discard_lost_aircraft(self, aircraft_dict):
"""
Remove aircraft that were last seen beyond the expiry threshold.
The expiry threshold is a configurable session attribute.
:param aircraft_dict: A dict of aircraft participating in this
session.
"""
lost_aircraft = []
now = datetime.datetime.now(tz=datetime.timezone.utc)
for icao_id, aircraft_item in aircraft_dict.items():
x = now - aircraft_item.last_seen
if x > self.expiry_threshold:
lost_aircraft.append(icao_id)
if lost_aircraft:
logger.debug(
"dropping {} aircraft from session due to inactivity: "
"{}".format(len(lost_aircraft), lost_aircraft)
)
for icao_id in lost_aircraft:
del aircraft_dict[icao_id]
[docs] async def manage_session(self):
""" Perform periodic session management actions.
"""
logger.debug("starting session management task")
while True:
self.discard_lost_aircraft(self.aircraft)
await asyncio.sleep(self.cache_monitor_interval)