Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Здраво Хабр. Вероватно је свако ко је икада срео или испратио рођаке или пријатеље у авиону користио бесплатну услугу Флигхтрадар24. Ово је веома згодан начин за праћење положаја авиона у реалном времену.

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

В први део Описан је принцип рада овакве онлајн услуге. Сада ћемо кренути даље и открити који се подаци шаљу и примају од авиона до пријемне станице и сами их декодирати користећи Питхон.

Прича

Очигледно, подаци о авионима се не преносе да би их корисници видели на својим паметним телефонима. Систем се зове АДС-Б (Аутоматиц депендент супервеилланце—броадцаст), и користи се за аутоматски пренос информација о летелици до контролног центра – преносе се његов идентификатор, координате, правац, брзина, висина и други подаци. Раније, пре појаве оваквих система, диспечер је могао да види само тачку на радару. Ово више није било довољно када је било превише авиона.

Технички, АДС-Б се састоји од предајника на авиону који периодично шаље пакете информација на прилично високој фреквенцији од 1090 МХз (постоје и други режими, али они нас не занимају толико, пошто се координате преносе само овде). Наравно, поред предајника, негде на аеродрому постоји и пријемник, али нама, као корисницима, интересантан је сопствени пријемник.

Иначе, за поређење, први такав систем, Аирнав Радарбок, дизајниран за обичне кориснике, појавио се 2007. године и коштао је око 900 долара, а претплата на мрежне услуге коштала је још 250 долара годишње.

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Рецензије тих првих руских власника могу се прочитати на форуму радиосцаннер. Сада када су РТЛ-СДР пријемници постали широко доступни, сличан уређај се може склопити за 30 долара; више о томе било је у први део. Пређимо на сам протокол - да видимо како он функционише.

Пријем сигнала

Прво, сигнал треба снимити. Цео сигнал има трајање од само 120 микросекунди, па је за удобно растављање његових компоненти пожељан СДР пријемник са фреквенцијом узорковања од најмање 5 МХз.

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Након снимања добијамо ВАВ датотеку са фреквенцијом узорковања од 5000000 узорака/сек; 30 секунди таквог снимка „теже“ око 500МБ. Слушање преко медија плејера је, наравно, бескорисно – датотека не садржи звук, већ директно дигитализован радио сигнал – управо тако ради софтверски дефинисани радио.

Отворићемо и обрадити датотеку користећи Питхон. Они који желе сами да експериментишу могу преузети пример снимка по ссылке.

Хајде да преузмемо датотеку и видимо шта је унутра.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Резултат: видимо очигледне „импулсе“ наспрам позадинске буке.

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Сваки „пулс” је сигнал чија је структура јасно видљива ако повећате резолуцију на графикону.

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Као што видите, слика је сасвим у складу са оним што је дато у опису изнад. Можете започети обраду података.

Декодирање

Прво, морате добити бит стреам. Сам сигнал је кодиран коришћењем Манчестерског кодирања:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Из разлике у нивоу у грицкалицама лако је добити праве „0“ и „1“.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Структура самог сигнала је следећа:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Погледајмо поља детаљније.

DF (Довнлинк Формат, 5 бита) - одређује тип поруке. Постоји неколико типова:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол
(извор табеле)

Интересује нас само тип ДФ17, јер... То је оно што садржи координате авиона.

ИЦАО (24 бита) - међународни јединствени код авиона. Авион можете проверити по његовом коду Онлине (нажалост, аутор је престао да ажурира базу података, али је и даље актуелна). На пример, за код 3ц5ее2 имамо следеће информације:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Уреди: у коментари на чланак Детаљније је дат опис ИЦАО кода, препоручујем заинтересованима да га прочитају.

ПОДАЦИ (56 или 112 бита) - стварни подаци које ћемо декодирати. Првих 5 битова података су поље Типе Цоде, који садржи подтип података који се чувају (не мешати са ДФ). Постоји доста ових врста:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол
(извор табеле)

Погледајмо неколико примера пакета.

Идентификација авиона

Пример у бинарном облику:

КСНУМКС КСНУМКС КСНУМКС КСНУМКС КСНУМКС КСНУМКС КСНУМКС КСНУМКС

Поља података:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

ТЦ = 00100б = 4, сваки знак Ц1-Ц8 садржи кодове који одговарају индексима у реду:
#АБЦДЕФГХИЈКЛМНОПКРСТУВВКСИЗ#####_###############0123456789######

Декодирањем низа лако је добити шифру авиона: ЕВГ7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Положај у ваздуху

Ако је име једноставно, онда су координате компликованије. Преносе се у облику 2, парни и непарни оквири. Шифра поља ТЦ = 01011б = 11.

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол

Пример парних и непарних пакета:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Сам прорачун координата се одвија према прилично лукавој формули:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол
(извор)

Нисам стручњак за ГИС, па не знам одакле долази. Ко зна, пишите у коментарима.

Висина се сматра једноставнијом - у зависности од специфичног бита, може се представити као вишекратник од 25 или 100 стопа.

Аирборне Велоцити

Пакет са ТЦ=19. Интересантна ствар овде је да брзина може бити или тачна, у односу на тло (Гроунд Спеед), или у ваздуху, мерена сензором авиона (Аирспеед). Много различитих поља се такође преноси:

Флигхтрадар24 - како то функционише? Део 2, АДС-Б протокол
(извор)

Закључак

Као што видите, технологија АДС-Б постала је занимљива симбиоза, када је стандард користан не само за професионалце, већ и за обичне кориснике. Али, наравно, кључну улогу у томе одиграла је јефтинија технологија дигиталних СДР пријемника, која омогућава уређају да буквално прима сигнале са фреквенцијама изнад гигахерца „за пени“.

У самом стандарду, наравно, има много више. Заинтересовани могу погледати ПДФ на страници ИЦАО или посетите већ поменуту вебсајт.

Мало је вероватно да ће све горе наведено бити корисно многима, али бар општа идеја о томе како то функционише, надам се, остаје.

Иначе, готов декодер у Пајтону већ постоји, можете га проучити овде. А власници СДР пријемника могу саставити и покренути готов АДС-Б декодер са странице, о томе је детаљније било речи у први део.

Изворни код парсера описаног у чланку је дат испод реза. Ово је тестни пример који не претендује да буде продукција, али неке ствари у њему функционишу и може се користити за рашчлањивање датотеке снимљене изнад.
Изворни код (Питхон)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Надам се да је неко био заинтересован, хвала на пажњи.

Извор: ввв.хабр.цом

Додај коментар