Source code for adsb.sbs.message

"""
This module implements classes and functions to assist with parsing
a SBS format message line into a :class:`SBSMessage` object and back
again.
"""

import collections
import datetime
import enum
import json
import logging

from typing import Union

logger = logging.getLogger(__name__)


DELIMITER = "\r\n"


[docs]class MessageType(object): """ This class defines the different SBS message types. Transmission messages contain information sent by aircraft. All others are produced by the application supplying the SBS feed. """ # Generated when the user changes the selected aircraft in # BaseStation. Selection_Change = "SEL" # Generated when an aircraft being tracked sets or changes its # callsign. New_Id = "ID" # Generated when the SBS picks up a signal for an aircraft that it # isn't currently tracking. New_Aircraft = "AIR" # Generated when an aircraft's status changes according to the # time-out values in the SBS1 Data Settings menu. Status_Aircraft = "STA" # Generated when the user double-clicks on an aircraft (i.e. # to bring up the aircraft details window). Click = "CLK" # Messages generated by the aircraft. There are eight different # MSG transmission types, see `TransmissionType`. Transmission = "MSG"
[docs]class TransmissionType(enum.Enum): """ Only `ES_SURFACE_POS` and `ES_AIRBORNE_POS` transmissions have position (latitude and longitude) information. ES = Entended Squitter DF = Downlink Format BDS = B-Definition Subfield """ ES_IDENT_AND_CATEGORY = 1 ES_SURFACE_POS = 2 ES_AIRBORNE_POS = 3 ES_AIRBORNE_VEL = 4 # Triggered by ground radar. SURVEILLANCE_ALT = 5 SURVEILLANCE_ID = 6 # Triggered by TCAS. AIR_TO_AIR = 7 # Broadcast. Also triggered by ground radar. ALL_CALL_REPLY = 8
[docs]class Fields(enum.Enum): """ SBS protocol message fields. Fields a declared in the order they appear in a message. """ # See :class:`MessageType` message_type = 0 # See :class:`TransmissionType` transmission_type = 1 # SBS Database session record number. session_id = 2 # SBS Database aircraft record number. aircraft_id = 3 # 24-bit ICAO ID in hex. hex_ident = 4 # SBS Database flight record number. flight_id = 5 # Date the message was generated. generated_date = 6 # Time the message was generated. generated_time = 7 # Date the message was logged by SBS logged_date = 8 # Time the message was logged by SBS. logged_time = 9 # Eight character flight ID or callsign. callsign = 10 # Altitude (ft) relative to 1013 mb (29.92" Hg). altitude = 11 # Speed over ground (kt) ground_speed = 12 # ground heading track = 13 # Latitude in degrees lat = 14 # Longitude in degrees lon = 15 # Rate of climb vertical_rate = 16 # Squawk squawk = 17 # Squawk flag - indicating squawk has changed. alert = 18 # Squawk flag indicating emergency code has been set. emergency = 19 # Flag indicating the Special Position Indicator has been set. spi = 20 # Flag indicating whether aircraft is on the ground is_on_ground = 21
# Allocate fields into groups that can use the same parser function IntegerFields = [ Fields.transmission_type.name, Fields.session_id.name, Fields.aircraft_id.name, Fields.flight_id.name, Fields.altitude.name, Fields.ground_speed.name, Fields.track.name, Fields.vertical_rate.name, Fields.is_on_ground.name, ] BooleanFields = [ Fields.spi.name, Fields.alert.name, Fields.emergency.name, Fields.is_on_ground.name, ] DateFields = [Fields.generated_date.name, Fields.logged_date.name] FloatFields = [Fields.lat.name, Fields.lon.name] StringFields = [Fields.hex_ident.name, Fields.callsign.name] TimeFields = [Fields.generated_time.name, Fields.logged_time.name] FieldNames = list(name for name in Fields.__members__) SBSMessage = collections.namedtuple("SBSMessage", FieldNames)
[docs]def fromString(line: Union[bytes, str]) -> SBSMessage: """ Parse a SBS format message string into a SBSMessage object. :param line: A bytes or string object representing a SBS format message. """ if isinstance(line, bytes): line = line.decode() values = line.rstrip(DELIMITER).split(",") if len(FieldNames) != len(values): raise Exception( "Incorrect number of msg fields. " f"Expected {len(FieldNames)}, got {len(values)}. " f"values={values}, line={line}" ) attrs = {} for k, v in zip(FieldNames, values): v = v.strip() # remove any surrounding spaces if v: # perform type conversion if necessary if k in IntegerFields: v = int(v) elif k in FloatFields: v = float(v) elif k in BooleanFields: v = True if v == "1" else False elif k in DateFields: Y, M, D = [int(i) for i in v.split("/")] v = datetime.date(Y, M, D) elif k in TimeFields: H, M, S = v.split(":") S, F = S.split(".") microsecond = int(int(F) * 1e3) v = datetime.time( hour=int(H), minute=int(M), second=int(S), microsecond=microsecond ) # elif k in StringFields: # v = v.strip() # else: # # field is expected to be a string field # logger.warning( # 'Unexpected field name: {}'.format(k)) else: v = None attrs[k] = v return SBSMessage(**attrs)
[docs]def toString(m: SBSMessage) -> bytes: """ Convert a SBS message object to s SBS format string """ o = [] for field, v in zip(FieldNames, m): # perform conversion if necessary if field in (Fields.lat.name, Fields.lon.name): o.append("" if v is None else f"{v:.5f}") elif field in DateFields: o.append(datetime.date.strftime(v, "%Y/%m/%d")) elif field in TimeFields: o.append( datetime.time.strftime(v, "%H:%M:%S") + f".{int(v.microsecond/1000):03d}" ) elif field in BooleanFields: if v is None: v = "" else: v = "1" if v is True else "0" o.append(v) else: o.append("" if v is None else str(v)) return ",".join(o).encode() + DELIMITER.encode()