Source code for adsb.sbs.client
import asyncio
import logging
from asyncio import AbstractEventLoop
from . import archive, message, protocol
from typing import Callable
RawMessageType = bytes
RawMessageHandlerType = Callable[[RawMessageType], None]
MessageType = message.SBSMessage
MessageHandlerType = Callable[[MessageType], None]
logger = logging.getLogger(__name__)
[docs]class Client(object):
""" A SBS message client.
A Client connects to a SBS server to obtain ADSB messages in the
SBS format. A Client can record messages to a file if configured
to do so.
Users of the Client would typically provide a callback function
to receive either raw message string or SBSMessage objects.
"""
def __init__(
self,
host: str = "localhost",
port: int = 30003,
on_raw_msg_callback: RawMessageHandlerType = None,
on_msg_callback: MessageHandlerType = None,
record: bool = False,
record_file: str = None,
loop: AbstractEventLoop = None,
):
"""
:param host: The SBS server host to connect to.
:param port: The SBS server port to connect to.
:param on_raw_msg_callback: a function that will be called whenever
a new SBS message is received. The callback is expected to
take one argument which is the message text.
:param on_msg_callback: a user function that will be called whenever
a new SBS message is received. The callback is expected to
take one argument which is a SBSMessage object.
:param record: a boolean flag to enable recording messages to a file.
By default this is False. The *record_file* argument must be
supplied is this argument is True.
:param record_file: The file name to use for recorded messages.
By default this is sbs_messages.txt.
:param loop: The event loop to run in.
"""
self.loop = loop or asyncio.get_event_loop()
self.host = host
self.port = port
self._on_message_received = on_msg_callback
self._on_raw_message_received = on_raw_msg_callback
self.record = record
if record and record_file is None:
raise Exception("Record is enabled but no record_file is specified!")
self.record_file = record_file
self.archiving_enabled = False
self.logfile = None
self.protocol = None
[docs] async def start(self) -> None:
""" Start the client """
if self.protocol:
raise Exception("Client is already running!")
if self.record:
self.start_recording(self.record_file)
await self.connect(self.host, self.port)
[docs] async def stop(self) -> None:
""" Stop the client """
self.stop_recording()
await self.disconnect()
[docs] async def connect(self, host: str, port=30003) -> None:
""" Connect to a SBS interface.
:param host: The SBS server host to connect to.
:param port: The SBS server port to connect to.
"""
self.protocol = protocol.SBSProtocol(on_msg_callback=self._on_sbs_message)
t, p = await self.loop.create_connection(lambda: self.protocol, host, port)
await p.wait_connected
[docs] async def disconnect(self) -> None:
""" Disconnect from a SBS interface """
if self.protocol:
self.protocol.close()
await self.protocol.wait_closed
self.protocol = None
[docs] def start_recording(
self, record_file: str = None, maxBytes: int = 2 ** 23, backupCount: int = 3
) -> None:
""" Start recording messages to a log file """
record_file = record_file or self.record_file
if record_file is None:
raise Exception("No recording log file specified")
if not self.archiving_enabled:
self.logfile = archive.RotatingArchiveFileHandler(
record_file, maxBytes=maxBytes, backupCount=backupCount
)
self.archiving_enabled = True
else:
logger.warning(
"Attempted to start recording messages but session is "
"already being recorded!"
)
[docs] def stop_recording(self) -> None:
""" Stop recording messages to file """
if self.archiving_enabled:
self.archiving_enabled = False
if self.logfile:
self.logfile.close()
self.logfile = None
def _on_sbs_message(self, msg_data: bytes) -> None:
""" Handle a raw SBS message line from the protocol.
This callback function is provided to the protocol so it can
provide messages to the client.
:param msg_data: A bytes object representing the raw SBS format ADSB
message line which has had the trailing '\r\n' delimiter discarded.
"""
msg_str = msg_data.decode()
if self.archiving_enabled:
self.logfile.emit(msg_str)
if self._on_raw_message_received:
self._on_raw_message_received(msg_data)
if self._on_message_received:
try:
msg = message.fromString(msg_str)
except Exception:
logger.exception("Error parsing message string")
return
if msg.hex_ident == "000000":
logger.warning("Invalid ICAO code detected: {}".format(msg.hex_ident))
return
self._on_message_received(msg)