Flightradar24 – jak to działa? Część 2, protokół ADS-B

Witaj Habr. Prawdopodobnie każdy, kto kiedykolwiek spotkał lub odprawił krewnych lub znajomych w samolocie, skorzystał z bezpłatnej usługi Flightradar24. Jest to bardzo wygodny sposób śledzenia pozycji statku powietrznego w czasie rzeczywistym.

Flightradar24 – jak to działa? Część 2, protokół ADS-B

В część pierwsza Opisano zasadę działania takiego serwisu internetowego. Teraz przejdziemy dalej i ustalimy, jakie dane są wysyłane i odbierane z samolotu do stacji odbiorczej i sami je dekodujemy za pomocą Pythona.

Historia

Oczywiście dane samolotu nie są przesyłane, aby użytkownicy mogli je zobaczyć na swoich smartfonach. System nosi nazwę ADS-B (Automatyczny zależny nadzór – rozgłoszenie) i służy do automatycznego przesyłania informacji o samolocie do centrum kontroli – przesyłany jest jego identyfikator, współrzędne, kierunek, prędkość, wysokość i inne dane. Wcześniej, przed pojawieniem się takich systemów, dyspozytor mógł widzieć tylko punkt na radarze. To już nie wystarczało, gdy samolotów było za dużo.

Technicznie rzecz biorąc, ADS-B składa się z nadajnika na samolocie, który okresowo wysyła pakiety informacji na dość wysokiej częstotliwości 1090 MHz (istnieją inne tryby, ale nie jesteśmy nimi tak zainteresowani, ponieważ współrzędne są przesyłane tylko tutaj). Oczywiście oprócz nadajnika gdzieś na lotnisku znajduje się także odbiornik, ale dla nas, jako użytkowników, interesujący jest nasz własny odbiornik.

Swoją drogą, dla porównania, pierwszy taki system, Airnav Radarbox, przeznaczony dla zwykłych użytkowników, pojawił się w 2007 roku i kosztował około 900 dolarów, abonament na usługi sieciowe kosztuje kolejne 250 dolarów rocznie.

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Recenzje tych pierwszych rosyjskich właścicieli można przeczytać na forum radioskaner. Teraz, gdy odbiorniki RTL-SDR stały się powszechnie dostępne, podobne urządzenie można złożyć już za 30 dolarów; więcej na ten temat w artykule część pierwsza. Przejdźmy do samego protokołu – zobaczmy jak to działa.

Odbiór sygnałów

Najpierw należy zarejestrować sygnał. Cały sygnał trwa zaledwie 120 mikrosekund, dlatego do wygodnego demontażu jego elementów pożądany jest odbiornik SDR o częstotliwości próbkowania co najmniej 5 MHz.

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Po nagraniu otrzymujemy plik WAV o częstotliwości próbkowania 5000000 30 500 próbek/s, a XNUMX sekund takiego nagrania „waży” około XNUMX MB. Słuchanie tego za pomocą odtwarzacza multimedialnego jest oczywiście bezużyteczne – plik nie zawiera dźwięku, ale bezpośrednio zdigitalizowany sygnał radiowy – dokładnie tak działa Radio Defined Radio.

Otworzymy i przetworzymy plik za pomocą Pythona. Osoby chcące poeksperymentować samodzielnie mogą pobrać przykładowe nagranie по ссылке.

Pobierzmy plik i zobaczmy, co jest w środku.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Wynik: widzimy wyraźne „impulsy” na tle szumu tła.

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Każdy „impuls” jest sygnałem, którego struktura jest wyraźnie widoczna, jeśli zwiększysz rozdzielczość wykresu.

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Jak widać, obraz jest całkiem zgodny z tym, co podano w powyższym opisie. Możesz rozpocząć przetwarzanie danych.

Rozszyfrowanie

Najpierw musisz uzyskać strumień bitów. Sam sygnał jest kodowany przy użyciu kodowania Manchester:

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Z różnicy poziomów w przekąskach łatwo jest uzyskać prawdziwe „0” i „1”.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Struktura samego sygnału jest następująca:

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Przyjrzyjmy się polom bardziej szczegółowo.

DF (Format łącza w dół, 5 bitów) – określa typ wiadomości. Istnieje kilka typów:

Flightradar24 – jak to działa? Część 2, protokół ADS-B
(źródło tabeli)

Nas interesuje tylko typ DF17, ponieważ... Zawiera współrzędne samolotu.

ICAO (24 bity) - międzynarodowy unikalny kod statku powietrznego. Możesz sprawdzić samolot po jego kodzie on-line (niestety autor zaprzestał aktualizacji bazy, ale jest ona nadal aktualna). Przykładowo dla kodu 3c5ee2 mamy następujące informacje:

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Edycja: w komentarze do artykułu Bardziej szczegółowo podano opis kodu ICAO, zainteresowanym polecam się z nim zapoznać.

DATA (56 lub 112 bitów) - rzeczywiste dane, które będziemy dekodować. Pierwsze 5 bitów danych to pole Kod Typ, zawierający podtyp przechowywanych danych (nie mylić z DF). Jest sporo tego typu:

Flightradar24 – jak to działa? Część 2, protokół ADS-B
(źródło tabeli)

Spójrzmy na kilka przykładów pakietów.

Identyfikacja statku powietrznego

Przykład w formie binarnej:

00100 011 000101 010111 000111 110111 110001 111000

Pola danych:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, każdy znak C1-C8 zawiera kody odpowiadające indeksom w linii:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_################0123456789######

Dekodując ciąg znaków, łatwo uzyskać kod samolotu: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Pozycja w powietrzu

Jeśli nazwa jest prosta, współrzędne są bardziej skomplikowane. Są one przesyłane w postaci 2 klatek parzystych i nieparzystych. Kod pola TC = 01011b = 11.

Flightradar24 – jak to działa? Część 2, protokół ADS-B

Przykład pakietów parzystych i nieparzystych:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Samo obliczenie współrzędnych odbywa się według dość trudnego wzoru:

Flightradar24 – jak to działa? Część 2, protokół ADS-B
(źródło)

Nie jestem ekspertem GIS, więc nie wiem, skąd to się bierze. Kto wie, napiszcie w komentarzach.

Wysokość uważa się za prostszą – w zależności od konkretnego bitu można ją przedstawić jako wielokrotność 25 lub 100 stóp.

Prędkość w powietrzu

Pakiet z TC=19. Interesującą rzeczą jest to, że prędkość może być dokładna w stosunku do ziemi (prędkość naziemna) lub w powietrzu, mierzona przez czujnik samolotu (prędkość powietrza). Przesyłanych jest również wiele różnych pól:

Flightradar24 – jak to działa? Część 2, protokół ADS-B
(źródło)

wniosek

Jak widać technologia ADS-B stała się ciekawą symbiozą, gdy standard jest przydatny nie tylko profesjonalistom, ale także zwykłym użytkownikom. Ale oczywiście kluczową rolę odegrała w tym tańsza technologia cyfrowych odbiorników SDR, która pozwala urządzeniu dosłownie odbierać sygnały o częstotliwościach powyżej gigaherca „za grosze”.

W samym standardzie jest oczywiście znacznie więcej. Zainteresowani mogą obejrzeć plik PDF na stronie ICAO lub odwiedź ten, o którym mowa powyżej broker.

Jest mało prawdopodobne, aby wszystkie powyższe były przydatne dla wielu, ale mam nadzieję, że przynajmniej ogólne pojęcie o tym, jak to działa, pozostaje.

Nawiasem mówiąc, gotowy dekoder w Pythonie już istnieje, możesz go przestudiować tutaj. Natomiast właściciele odbiorników SDR mogą złożyć i uruchomić gotowy dekoder ADS-B ze strony, zostało to szerzej omówione w część pierwsza.

Poniżej fragmentu podano kod źródłowy parsera opisanego w artykule. To jest przykład testowy, który nie udaje produkcji, ale pewne rzeczy w nim działają i można go wykorzystać do analizy pliku zapisanego powyżej.
Kod źródłowy (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Mam nadzieję, że ktoś był zainteresowany, dziękuję za uwagę.

Źródło: www.habr.com

Dodaj komentarz