Flightradar24 - як це працює? Частина 2, ADS-B протокол

Привіт Хабр. Напевно, кожен, хто хоч раз зустрічав або проводжав родичів або друзів на літак, користувався безкоштовним сервісом Flightradar24. Це дуже зручний спосіб відстеження положення літака у реальному часі.

Flightradar24 - як це працює? Частина 2, ADS-B протокол

В першій частині було описано принцип роботи такого онлайн-сервісу. Зараз ми підемо далі, і з'ясуємо, які дані передаються та приймаються від повітряного судна до приймальної станції та декодуємо їх самостійно за допомогою Python.

Історія

Очевидно, дані про літаки передаються не для того, щоб користувачі бачили їх на своїх смартфонах. Система називається ADS-B (Automatic dependent surveillance-broadcast), і служить для автоматичної передачі інформації про повітряне судно в диспетчерський центр - передаються його ідентифікатор, координати, напрям, швидкість, висота та інші дані. Раніше до появи таких систем диспетчер міг бачити лише крапку на радарі. Цього стало недостатньо, коли літаків стало надто багато.

Технічно, ADS-B складається з передавача на повітряному судні, який періодично надсилає пакети з інформацією на досить високій частоті 1090 МГц (є й інші режими, але нам вони не такі цікаві, тому що координати передаються тільки тут). Зрозуміло, крім передавача, є і приймач десь в аеропорту, але для нас, як для користувачів, цікавий наш приймач.

До речі, для порівняння, перша така система Airnav Radarbox, розрахована на звичайних користувачів, з'явилася в 2007 році, і коштувала близько 900 $, ще близько 250 $ на рік коштувала підписка на мережеві сервіси.

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Відгуки тих перших російських власників можна почитати на форумі radioscanner. Зараз, коли масово стали доступні RTL-SDR приймачі, аналогічний девайс можна зібрати за 30 $, детальніше про це було в першій частині. Ми ж перейдемо власне, до протоколу — побачимо, як це працює.

Прийом сигналів

Для початку сигнал потрібно записати. Весь сигнал має тривалість лише 120 мікросекунд, тому щоб комфортно розібрати його компоненти, бажаний SDR-приймач з частотою дискретизації не менше 5МГц.

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Після запису ми отримуємо WAV-файл із частотою дискретизації 5000000 семплів/сек, 30 секунд такого запису «важать» близько 500Мб. Слухати її медіаплеєром зрозуміло, марно - файл містить не звук, а безпосередньо оцифрований радіосигнал - саме так працює Software Defined Radio.

Відкривати та обробляти файл ми будемо за допомогою Python. Бажаючі поекспериментувати самостійно, можуть завантажити приклад запису за посиланням.

Завантажимо файл, і подивимося, що всередині.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Результат: бачимо явні «імпульси» і натомість шуму.

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Кожен «імпульс» — це сигнал, структуру якого добре видно, якщо збільшити дозвіл на графіці.

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Як можна бачити, картинка цілком відповідає тому, що описано вище. Можна приступати до обробки даних.

Декодування

Для початку потрібно отримати бітовий потік. Сам сигнал закодований за допомогою manchester encoding:

Flightradar24 - як це працює? Частина 2, ADS-B протокол

З різниці рівнів у напівбайтах легко отримати реальні «0» та «1».

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Структура самого сигналу має такий вигляд:

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Розглянемо поля докладніше.

DF (Downlink Format, 5 біт) – визначає тип повідомлення. Їх кілька типів:

Flightradar24 - як це працює? Частина 2, ADS-B протокол
(джерело таблиці)

Нас цікавить лише тип DF17, т.к. саме він містить координати повітряного судна.

ІКАО (24 біти) - міжнародний унікальний код повітряного судна. Перевірити літак за його кодом можна на сайті (На жаль, автор перестав оновлювати базу, але вона ще актуальна). Наприклад, для коду 3c5ee2 маємо таку інформацію:

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Виправлення: в коментарі до статті опис коду ICAO наведено більш докладно, рекомендую ознайомитися.

ДАНІ (56 або 112 біт) - власне дані, які ми і декодуватимемо. Перші 5 біт даних - поле Тип коду, що містить підтип даних, що зберігаються (не плутати з DF). Таких типів досить багато:

Flightradar24 - як це працює? Частина 2, ADS-B протокол
(джерело таблиці)

Розберемо кілька прикладів пакетів.

Aircraft identification

Приклад у бінарному вигляді:

00100 011 000101 010111 000111 110111 110001 111000

Поля даних:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, кожен символ C1-C8 містить коди, що відповідають індексам у рядку:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######

Розкодувавши рядок, нескладно отримати код літака: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Airborne position

Якщо з назвою все просто, то з координатами складніше. Вони передаються у вигляді 2х, парних та непарних фреймів. Код поля TC=01011b=11.

Flightradar24 - як це працює? Частина 2, ADS-B протокол

Приклад парного та непарного пакетів:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Саме обчислення координат відбувається за досить хитрою формулою:

Flightradar24 - як це працює? Частина 2, ADS-B протокол
(джерело)

Я не фахівець із ГІС, тож звідки воно виводиться, не знаю. Хто в курсі, напишіть у коментарях.

Висота вважається простіше — залежно від певного біта, вона може бути або кратною 25, або 100 футам.

Airborne Velocity

Пакет із TC=19. Цікаво тут те, що швидкість може бути як точна, щодо землі (Ground Speed), так і повітряна вимірювана датчиком літака (Airspeed). Ще передається безліч різних полів:

Flightradar24 - як це працює? Частина 2, ADS-B протокол
(джерело)

Висновок

Як можна бачити, технологія ADS-B стала цікавим симбіозом, коли будь-який стандарт знадобиться не лише професіоналам, а й звичайним користувачам. Але зрозуміло, ключову роль у цьому відіграло здешевлення технології цифрових SDR-приймачів, що дозволяє на девайсі буквально «за копійки» приймати сигнали з частотою вище за гігагерця.

У самому стандарті, зрозуміло, набагато більше. Бажаючі можуть переглянути PDF на сторінці ІКАО або відвідати вже згаданий вище сайт.

Навряд чи багатьом стане у нагоді все вищенаписане, але принаймні загальна ідея того, як це працює, сподіваюся, залишилася.

До речі, готовий декодер на Python вже існує, його можна вивчити тут. А власники SDR-приймачів можуть зібрати та запустити готовий ADS-B декодер. зі сторінки, докладніше про це розповідалося в першій частині.

Вихідний код парсера, описаний у статті, наведено під катом. Це тестовий приклад, що не претендує на production, але дещо в ньому працює, і парсувати записаний вище файл, ним можна.
Вихідний код (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Сподіваюся, комусь було цікаво, дякую за увагу.

Джерело: habr.com

Додати коментар або відгук