Source code for adsb.sbs.server


import asyncio
import datetime
import logging
import socket

from . import protocol

from typing import Tuple
from asyncio import AbstractEventLoop


logger = logging.getLogger(__name__)


[docs]class Server(object): def __init__( self, host: str = "localhost", port: int = 30003, backlog=100, loop: AbstractEventLoop = None, ) -> None: self.loop = loop or asyncio.get_event_loop() self.host = host self._requested_port = port self.port = None self.backlog = backlog self.listener = None self.protocols = {}
[docs] async def start(self) -> None: """ Start the server """ try: self.listener = await self.loop.create_server( lambda: protocol.SBSServerProtocol(self), self.host, self._requested_port, family=socket.AF_INET, backlog=self.backlog, ) # type: asyncio.Server # Fetch actual port in use. This can be different from the # specified port if the port was passed as 0 which means use # an ephemeral port. assert len(self.listener.sockets) == 1 _, self.port = self.listener.sockets[0].getsockname() except asyncio.CancelledError: logger.exception("Connection waiter Future was cancelled") except Exception: logger.exception("An error occurred in start")
[docs] async def stop(self) -> None: """ Stop the server """ if self.listener: # Avoid iterating over the protocols dict which may change size # while it is being iterating over. peers = list(self.protocols) for peer in peers: prot = self.protocols.get(peer) if prot: prot.close() self.listener.close()
[docs] def register_protocol( self, peer: Tuple[str, int], prot: "SBSServerProtocol" ) -> None: """ Register a protocol instance with the server. :param peer: Tuple of (host:str, port:int). :param prot: a SBSServerProtocol instance. """ self.protocols[peer] = prot
[docs] def deregister_protocol(self, peer: Tuple[str, int]) -> None: """ De-register a protocol instance from the server. This peer will no longer receive messages. :param peer: Tuple of (host:str, port:int). """ del self.protocols[peer]
[docs] def send_message(self, msg: bytes, peer: Tuple[str, int] = None) -> None: """ Send a message. :param msg: A bytes object representing the SBS format message to send to peers. The message is assumed to include the end of message delimiter. :param peer: A specific peer to send the message to. Peer is a Tuple of (host:str, port:int). If not specified then the message is broadcast to all peers. """ if self.protocols: if peer: prot = self.protocols.get(peer) if prot: prot.send_message(msg) else: raise Exception( f"Server can't send msg to non-existant peer: {peer}" ) else: # broadcast message to all peers for peer, prot in self.protocols.items(): prot.send_message(msg) else: raise Exception("Server can't send msg, no peers available")