Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Прывітанне Хабр. Мусіць кожны, хто хоць раз сустракаў ці праводзіў сваякоў ці сяброў на самалёт, карыстаўся бясплатным сэрвісам Flightradar24. Гэта вельмі зручны спосаб адсочвання становішча самалёта ў рэальным часе.

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

В першай частцы быў апісаны прынцып працы такога анлайн-сэрвісу. Цяпер мы пойдзем далей, і высветлім, якія дадзеныя перадаюцца і прымаюцца ад паветранага судна да прыёмнай станцыі, і дэкадуем іх самастойна з дапамогай Python.

Гісторыя

Відавочна, што дадзеныя аб самалётах перадаюцца не для таго, каб карыстачы бачылі іх на сваіх смартфонах. Сістэма называецца ADS-B (Automatic dependent surveillance-broadcast), і служыць для аўтаматычнай перадачы інфармацыі аб паветраным судне ў дыспетчарскі цэнтр – перадаюцца яго ідэнтыфікатар, каардынаты, кірунак, хуткасць, вышыня і іншыя дадзеныя. Раней, да з'яўлення такіх сістэм, дыспетчар мог бачыць толькі кропку на радары. Гэтага стала недастаткова, калі самалётаў стала зашмат.

Тэхнічна, ADS-B складаецца з перадатчыка на паветраным судне, які перыядычна пасылае пакеты з інфармацыяй на дастаткова высокай частаце 1090 Мгц (ёсць і іншыя рэжымы, але нам яны не так цікавыя, бо каардынаты перадаюцца толькі тут). Зразумела, акрамя перадатчыка, ёсць і прымач дзесьці ў аэрапорце, але для нас, як для карыстачоў, цікавы прымач наш уласны.

Дарэчы, для параўнання, першая такая сістэма, Airnav Radarbox, разлічаная на звычайных карыстачоў, з'явілася ў 2007 году, і каштавала каля 900$, яшчэ каля 250$ у год каштавала падпіска на сеткавыя сэрвісы.

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Водгукі тых першых расійскіх уладальнікаў можна пачытаць на форуме radioscanner. Цяпер, калі масава сталі даступныя RTL-SDR прымачы, аналагічны девайс можна сабраць за 30 $, падрабязней пра гэта было ў першай частцы. Мы ж пяройдзем уласна, да пратакола - паглядзім як гэта працуе.

Прыём сігналаў

Для пачатку, сігнал трэба запісаць. Увесь сігнал мае працягласць усяго толькі 120 мікрасекунд, таму каб камфортна разабраць яго кампаненты, пажаданы SDR-прымач з частатой дыскрэтызацыі не менш за 5Мгц.

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Пасля запісу мы атрымліваем WAV-файл з частатой дыскрэтызацыі 5000000 сэмплаў/сек, 30 секунд такога запісу "важаць" каля 500Мб. Слухаць яе медыяплэерам зразумела, бескарысна – файл утрымоўвае не гук, а непасрэдна аблічбаваны радыёсігнал – менавіта так працуе Software Defined Radio.

Адчыняць і апрацоўваць файл мы будзем з дапамогай Python. Жадаючыя паэксперыментаваць самастойна, могуць запампаваць прыклад запісу па спасылцы.

Загрузім файл, і паглядзім што ўсярэдзіне.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Вынік: мы бачым відавочныя "імпульсы" на фоне шуму.

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Кожны «імпульс» - гэта і ёсць сігнал, структуру якога добра відаць, калі павялічыць дазвол на графіцы.

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Як можна бачыць, карцінка цалкам адпавядае таму, што прыведзена ў апісанні вышэй. Можна прыступаць да апрацоўкі дадзеных.

Дэкадаванне

Для пачатку, трэба атрымаць бітавы паток. Сам сігнал закадаваны з дапамогай manchester encoding:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

З розніцы ўзроўняў у паўбайтах лёгка атрымаць рэальныя "0" і "1".

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Структура самога сігналу мае наступны выгляд:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Разгледзім палі больш падрабязна.

DF (Downlink Format, 5 біт) - вызначае тып паведамлення. Іх некалькі тыпаў:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол
(крыніца табліцы)

Нас цікавіць толькі тып DF17, т.я. менавіта ён змяшчае каардынаты паветранага судна.

ІКАО (24 біта) - міжнародны унікальны код паветранага судна. Праверыць самалёт па ім коду можна на сайце (нажаль, аўтар перастаў абнаўляць базу, але яна яшчэ актуальная). Напрыклад, для кода 3c5ee2 маем наступную інфармацыю:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Праўка: у каментары да артыкула апісанне кода ICAO прыведзена больш падрабязна, якія цікавяцца рэкамендую азнаёміцца.

ДАДЗЕНЫЯ (56 або 112 біт) - уласна дадзеныя, якія мы і будзем дэкадаваць. Першыя 5 біт дадзеных - поле Код тыпу, якое змяшчае падтып якія захоўваюцца дадзеных (не блытаць з DF). Такіх тыпаў даволі шмат:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол
(крыніца табліцы)

Разбяром некалькі прыкладаў пакетаў.

Aircraft identification

Прыклад у бінарным выглядзе:

00100 011 000101 010111 000111 110111 110001 111000

Палі даных:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, кожны сімвал C1-C8 змяшчае коды, якія адпавядаюць індэксам у радку:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######

Раскадаваўшы радок, нескладана атрымаць код самалёта: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Airborne position

Калі з назвай усё проста, то з каардынатамі больш складана. Яны перадаюцца ў выглядзе 2х, цотных і няцотных фрэймаў. Код поля TC=01011b=11.

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол

Прыклад цотнага і няцотнага пакетаў:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Само вылічэнне каардынат адбываецца па дастаткова хітрай формуле:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол
(крыніца)

Я не спецыяліст па ГІС, так што адкуль яно выводзіцца, не ведаю. Хто ў курсе, напішыце ў каментарах.

Вышыня лічыцца прасцей у залежнасці ад вызначанага біта, яна можа ўяўляцца або кратнай 25, або 100 футам.

Airborne Velocity

Пакет з TC=19. Цікава тут тое, што хуткасць можа быць як дакладная, адносна землі (Ground Speed), так і паветраная, якая вымяраецца датчыкам самалёта (Airspeed). Яшчэ перадаецца мноства розных палёў:

Flightradar24 – як гэта працуе? Частка 2, ADS-B пратакол
(крыніца)

Заключэнне

Як можна бачыць, тэхналогія ADS-B стала цікавым сімбіёзам, калі які-небудзь стандарт спатрэбіцца не толькі прафесіяналам, але і звычайным карыстачам. Але зразумела, ключавую ролю ў гэтым адыграла патанненне тэхналогіі лічбавых SDR-прымачоў, якія дазваляюць на дэвайсе літаральна "за капейкі" прымаць сігналы з частатой вышэй гігагерца.

У самім стандарце зразумела, значна больш за ўсё. Жадаючыя могуць паглядзець PDF на старонцы ІКАО ці наведаць ужо згаданы вышэй сайт.

Ці наўрад шматлікім спатрэбіцца ўсё вышэйнапісанае, але прынамсі агульная ідэя таго, як гэта працуе, спадзяюся, засталася.

Дарэчы, гатовы дэкодэр на Python ужо існуе, яго можна вывучыць тут. А ўладальнікі SDR-прымачоў могуць сабраць і запусціць гатовы ADS-B дэкодэр са старонкі, падрабязней пра гэта распавядалася ў першай частцы.

Зыходны код парсера, апісаны ў артыкуле, прыведзены пад катом. Гэта тэставы прыклад, які не прэтэндуе на production, але сёе-тое ў ім працуе, і парсіць запісаны вышэй файл, ім можна.
Зыходны код (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Спадзяюся, камусьці было цікава, дзякуй за ўвагу.

Крыніца: habr.com

Дадаць каментар