Source code for adsb.sbs.message
"""
This module implements classes and functions to assist with parsing
a SBS format message line into a :class:`SBSMessage` object and back
again.
"""
import collections
import datetime
import enum
import json
import logging
from typing import Union
logger = logging.getLogger(__name__)
DELIMITER = "\r\n"
[docs]class MessageType(object):
"""
This class defines the different SBS message types.
Transmission messages contain information sent by aircraft. All others
are produced by the application supplying the SBS feed.
"""
# Generated when the user changes the selected aircraft in
# BaseStation.
Selection_Change = "SEL"
# Generated when an aircraft being tracked sets or changes its
# callsign.
New_Id = "ID"
# Generated when the SBS picks up a signal for an aircraft that it
# isn't currently tracking.
New_Aircraft = "AIR"
# Generated when an aircraft's status changes according to the
# time-out values in the SBS1 Data Settings menu.
Status_Aircraft = "STA"
# Generated when the user double-clicks on an aircraft (i.e.
# to bring up the aircraft details window).
Click = "CLK"
# Messages generated by the aircraft. There are eight different
# MSG transmission types, see `TransmissionType`.
Transmission = "MSG"
[docs]class TransmissionType(enum.Enum):
"""
Only `ES_SURFACE_POS` and `ES_AIRBORNE_POS` transmissions have
position (latitude and longitude) information.
ES = Entended Squitter
DF = Downlink Format
BDS = B-Definition Subfield
"""
ES_IDENT_AND_CATEGORY = 1
ES_SURFACE_POS = 2
ES_AIRBORNE_POS = 3
ES_AIRBORNE_VEL = 4
# Triggered by ground radar.
SURVEILLANCE_ALT = 5
SURVEILLANCE_ID = 6
# Triggered by TCAS.
AIR_TO_AIR = 7
# Broadcast. Also triggered by ground radar.
ALL_CALL_REPLY = 8
[docs]class Fields(enum.Enum):
""" SBS protocol message fields.
Fields a declared in the order they appear in a message.
"""
# See :class:`MessageType`
message_type = 0
# See :class:`TransmissionType`
transmission_type = 1
# SBS Database session record number.
session_id = 2
# SBS Database aircraft record number.
aircraft_id = 3
# 24-bit ICAO ID in hex.
hex_ident = 4
# SBS Database flight record number.
flight_id = 5
# Date the message was generated.
generated_date = 6
# Time the message was generated.
generated_time = 7
# Date the message was logged by SBS
logged_date = 8
# Time the message was logged by SBS.
logged_time = 9
# Eight character flight ID or callsign.
callsign = 10
# Altitude (ft) relative to 1013 mb (29.92" Hg).
altitude = 11
# Speed over ground (kt)
ground_speed = 12
# ground heading
track = 13
# Latitude in degrees
lat = 14
# Longitude in degrees
lon = 15
# Rate of climb
vertical_rate = 16
# Squawk
squawk = 17
# Squawk flag - indicating squawk has changed.
alert = 18
# Squawk flag indicating emergency code has been set.
emergency = 19
# Flag indicating the Special Position Indicator has been set.
spi = 20
# Flag indicating whether aircraft is on the ground
is_on_ground = 21
# Allocate fields into groups that can use the same parser function
IntegerFields = [
Fields.transmission_type.name,
Fields.session_id.name,
Fields.aircraft_id.name,
Fields.flight_id.name,
Fields.altitude.name,
Fields.ground_speed.name,
Fields.track.name,
Fields.vertical_rate.name,
Fields.is_on_ground.name,
]
BooleanFields = [
Fields.spi.name,
Fields.alert.name,
Fields.emergency.name,
Fields.is_on_ground.name,
]
DateFields = [Fields.generated_date.name, Fields.logged_date.name]
FloatFields = [Fields.lat.name, Fields.lon.name]
StringFields = [Fields.hex_ident.name, Fields.callsign.name]
TimeFields = [Fields.generated_time.name, Fields.logged_time.name]
FieldNames = list(name for name in Fields.__members__)
SBSMessage = collections.namedtuple("SBSMessage", FieldNames)
[docs]def fromString(line: Union[bytes, str]) -> SBSMessage:
"""
Parse a SBS format message string into a SBSMessage object.
:param line: A bytes or string object representing a SBS format message.
"""
if isinstance(line, bytes):
line = line.decode()
values = line.rstrip(DELIMITER).split(",")
if len(FieldNames) != len(values):
raise Exception(
"Incorrect number of msg fields. "
f"Expected {len(FieldNames)}, got {len(values)}. "
f"values={values}, line={line}"
)
attrs = {}
for k, v in zip(FieldNames, values):
v = v.strip() # remove any surrounding spaces
if v:
# perform type conversion if necessary
if k in IntegerFields:
v = int(v)
elif k in FloatFields:
v = float(v)
elif k in BooleanFields:
v = True if v == "1" else False
elif k in DateFields:
Y, M, D = [int(i) for i in v.split("/")]
v = datetime.date(Y, M, D)
elif k in TimeFields:
H, M, S = v.split(":")
S, F = S.split(".")
microsecond = int(int(F) * 1e3)
v = datetime.time(
hour=int(H), minute=int(M), second=int(S), microsecond=microsecond
)
# elif k in StringFields:
# v = v.strip()
# else:
# # field is expected to be a string field
# logger.warning(
# 'Unexpected field name: {}'.format(k))
else:
v = None
attrs[k] = v
return SBSMessage(**attrs)
[docs]def toString(m: SBSMessage) -> bytes:
""" Convert a SBS message object to s SBS format string """
o = []
for field, v in zip(FieldNames, m):
# perform conversion if necessary
if field in (Fields.lat.name, Fields.lon.name):
o.append("" if v is None else f"{v:.5f}")
elif field in DateFields:
o.append(datetime.date.strftime(v, "%Y/%m/%d"))
elif field in TimeFields:
o.append(
datetime.time.strftime(v, "%H:%M:%S")
+ f".{int(v.microsecond/1000):03d}"
)
elif field in BooleanFields:
if v is None:
v = ""
else:
v = "1" if v is True else "0"
o.append(v)
else:
o.append("" if v is None else str(v))
return ",".join(o).encode() + DELIMITER.encode()