Source code for adsb.sbs.protocol

"""
This module implements a protocol for receiving messages in the SBS
format.
"""

import asyncio
import logging

from asyncio import AbstractEventLoop
from typing import Callable, Tuple

MessageType = str
MessageHandlerType = Callable[[MessageType], None]


logger = logging.getLogger(__name__)


DELIMITER = b"\r\n"


[docs]class SBSProtocol(asyncio.Protocol): """ A simple line based protocol to extract SBS messages from a stream. """ def __init__( self, on_msg_callback: MessageHandlerType = None, loop: AbstractEventLoop = None ): """ :param on_msg_callback: a function that will be called whenever a new message line is received. The callback is expected to take one argument which is the message text. :param loop: The event loop to run in. """ self.loop = loop or asyncio.get_event_loop() self.on_message_received = on_msg_callback self.transport = None self.buf = None # type: bytearray self.wait_connected = self.loop.create_future() self.wait_closed = self.loop.create_future()
[docs] def connection_made(self, transport: asyncio.BaseTransport) -> None: """ React to a new connection being made """ self.remote_addr = transport.get_extra_info("peername") self.transport = transport self.buf = bytearray() logger.debug("SBSProtocol connected to {}".format(self.remote_addr)) self.wait_connected.set_result(None)
[docs] def connection_lost(self, reason: str) -> None: """ React to an existing connection being lost """ logger.debug( "SBSProtocol disconnected from {}.{}".format( self.remote_addr, " Reason: {}".format(reason) if reason else "" ) ) self.buf = None self.wait_closed.set_result(None)
[docs] def data_received(self, data: bytes) -> None: """ Process a raw data stream. Accumulate message chunks until a complete message can be extracted from the buffer. Messages are delimited by the characters \\r\\n. Sources such as mutability/dump1090 also send a heartbeat message, containing only \\r\\n, if there is no ADSB activity. These need to be handled gracefully. Each SBS message has the trailing \\r\\n delimiter discarded and is converted to a string before being passed to the message handler. """ self.buf.extend(data) while self.buf: msg, sep, remainder = self.buf.partition(DELIMITER) # If sep contains a delimiter then we may have extracted a msg, # otherwise we only have a chunk and need to accumulate more # bytes until a complete message can be extracted. if not sep: break self.buf = remainder if msg and self.on_message_received: self.on_message_received(bytes(msg))
[docs] def close(self) -> None: """ Close the connection """ if self.transport: self.transport.abort() # don't wait for unsent buffered data
[docs]class SBSServerProtocol(asyncio.Protocol): """ A SBSProtocol instance specifically for use in a SBS server. The main use for this class is in unit tests. """ def __init__(self, server: "SBSServer"): self.server = server
[docs] def connection_made(self, transport: asyncio.BaseTransport) -> None: self.transport = transport self.peer = transport.get_extra_info("peername") # type: Tuple[str, int] logger.debug(f"{self.__class__.__name__} connected to {self.peer}") self.server.register_protocol(self.peer, self)
[docs] def connection_lost(self, reason: str) -> None: logger.debug( f"{self.__class__.__name__} disconnected from {self.peer}. " f"Reason: {reason}" if reason else "" ) self.server.deregister_protocol(self.peer)
[docs] def close(self) -> None: """ Close the connection """ if self.transport: self.transport.abort() # don't wait for unsent buffered data
[docs] def data_received(self, data: bytes) -> None: """ The server does not expect to receive data from clients. It is effectively a one-way publisher socket. """ logger.warning( "Received unexpected data from client %s: %s".format( self.peer, data.decode() ) )
[docs] def send_message(self, data: bytes, add_delimiter: bool = False) -> None: """ Send message to client. :param bytes: A SBS format message string encoded into bytes to send to clients. :param add_delimiter: A boolean flag that determines if a message delimiter should be added to the message bytes. By default this is False. """ if add_delimiter: data = data + DELIMITER self.transport.write(data)