Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Ahoj Habr. Bezplatnú službu Flightradar24 využil snáď každý, kto niekedy v lietadle stretol alebo vyprevadil príbuzných či priateľov. Ide o veľmi pohodlný spôsob sledovania polohy lietadla v reálnom čase.

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

В prvá časť Bol opísaný princíp fungovania takejto online služby. Teraz budeme pokračovať a zistíme, aké údaje sa odosielajú a prijímajú z lietadla do prijímacej stanice a sami ich dekódujeme pomocou Pythonu.

Príbeh

Je zrejmé, že údaje o lietadlách sa neprenášajú, aby ich používatelia videli na svojich smartfónoch. Systém sa nazýva ADS-B (Automatic dependent surveillance—broadcast) a slúži na automatický prenos informácií o lietadle do riadiaceho centra – prenáša sa jeho identifikátor, súradnice, smer, rýchlosť, nadmorská výška a ďalšie údaje. Predtým, pred príchodom takýchto systémov, mohol dispečer vidieť iba bod na radare. To už nestačilo, keď bolo priveľa lietadiel.

Technicky sa ADS-B skladá z vysielača na lietadle, ktorý periodicky posiela pakety informácií na pomerne vysokej frekvencii 1090 MHz (existujú aj iné režimy, ale tie nás až tak nezaujímajú, keďže súradnice sa vysielajú iba tu). Niekde na letisku je samozrejme okrem vysielača aj prijímač, ale pre nás ako používateľov je zaujímavý vlastný prijímač.

Mimochodom, pre porovnanie, prvý takýto systém Airnav Radarbox určený pre bežných používateľov sa objavil v roku 2007 a stál približne 900 dolárov, ďalších 250 dolárov ročne stálo predplatné sieťových služieb.

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Recenzie týchto prvých ruských majiteľov si môžete prečítať na fóre rádioskener. Teraz, keď sú prijímače RTL-SDR široko dostupné, podobné zariadenie je možné zostaviť za 30 dolárov; viac o tom bolo v prvá časť. Prejdime k samotnému protokolu – pozrime sa, ako funguje.

Prijímanie signálov

Najprv je potrebné zaznamenať signál. Celý signál má trvanie len 120 mikrosekúnd, takže na pohodlnú demontáž jeho komponentov je žiaduci SDR prijímač so vzorkovacou frekvenciou aspoň 5 MHz.

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Po nahratí dostaneme súbor WAV so vzorkovacou rýchlosťou 5000000 30 500 vzoriek/s, XNUMX sekúnd takéhoto záznamu „váži“ asi XNUMX MB. Počúvanie pomocou prehrávača médií je samozrejme zbytočné – súbor neobsahuje zvuk, ale priamo digitalizovaný rádiový signál – presne takto funguje Software Defined Radio.

Súbor otvoríme a spracujeme pomocou Pythonu. Tí, ktorí chcú experimentovať sami, si môžu stiahnuť ukážkovú nahrávku по ссылке.

Poďme si stiahnuť súbor a uvidíme, čo je vnútri.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Výsledok: vidíme zjavné „pulzy“ na pozadí hluku.

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Každý „impulz“ je signál, ktorého štruktúra je jasne viditeľná, ak zvýšite rozlíšenie na grafe.

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Ako vidíte, obrázok je celkom v súlade s tým, čo je uvedené v popise vyššie. Môžete začať spracovávať údaje.

Dekódovanie

Najprv musíte trochu streamovať. Samotný signál je zakódovaný pomocou kódovania Manchester:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Z rozdielu úrovní v kúskoch je ľahké získať skutočné „0“ a „1“.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Štruktúra samotného signálu je nasledovná:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Pozrime sa na polia podrobnejšie.

DF (Downlink Format, 5 bitov) – určuje typ správy. Existuje niekoľko typov:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B
(tabuľkový zdroj)

Máme záujem len o typ DF17, pretože... Práve tá obsahuje súradnice lietadla.

ICAO (24 bitov) - medzinárodný unikátny kód lietadla. Lietadlo môžete skontrolovať podľa jeho kódu on-line (bohužiaľ, autor prestal aktualizovať databázu, ale stále je aktuálna). Napríklad pre kód 3c5ee2 máme nasledujúce informácie:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Edit: in komentáre k článku Popis ICAO kódu je uvedený podrobnejšie, záujemcom odporúčam prečítať si ho.

ÚDAJE (56 alebo 112 bitov) - skutočné dáta, ktoré budeme dekódovať. Prvých 5 bitov dát je pole Typový kód, ktorý obsahuje podtyp ukladaných údajov (nezamieňať s DF). Existuje pomerne málo týchto typov:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B
(tabuľkový zdroj)

Pozrime sa na niekoľko príkladov balíčkov.

Identifikácia lietadla

Príklad v binárnom tvare:

00100 011 000101 010111 000111 110111 110001 111000

Dátové polia:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, každý znak C1-C8 obsahuje kódy zodpovedajúce indexom v riadku:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######

Dekódovaním reťazca je ľahké získať kód lietadla: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Poloha vo vzduchu

Ak je názov jednoduchý, súradnice sú zložitejšie. Vysielajú sa vo forme 2, párnych a nepárnych rámcov. Kód poľa TC = 01011b = 11.

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B

Príklad párnych a nepárnych paketov:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Samotný výpočet súradníc prebieha podľa pomerne zložitého vzorca:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B
(zdroj)

Nie som odborník na GIS, takže neviem, odkiaľ to pochádza. Kto vie, napíšte do komentárov.

Výška sa považuje za jednoduchšiu - v závislosti od konkrétneho bitu môže byť reprezentovaná buď ako násobok 25 alebo 100 stôp.

Rýchlosť vzduchu

Balík s TC=19. Zaujímavosťou je, že rýchlosť môže byť buď presná, vzhľadom na zem (Ground Speed), alebo vo vzduchu, meraná senzorom lietadla (Airspeed). Prenáša sa aj mnoho rôznych polí:

Flightradar24 - ako to funguje? Časť 2, protokol ADS-B
(zdroj)

Záver

Ako vidíte, technológia ADS-B sa stala zaujímavou symbiózou, kedy je štandard užitočný nielen pre profesionálov, ale aj pre bežných používateľov. Kľúčovú úlohu v tom však samozrejme zohrala lacnejšia technológia digitálnych prijímačov SDR, ktorá zariadeniu umožňuje doslova prijímať signály s frekvenciami nad gigahertz „za centy“.

V samotnom štandarde je toho samozrejme oveľa viac. Záujemcovia si môžu pozrieť PDF na stránke ICAO alebo navštívte ten, ktorý už bol spomenutý vyššie webové stránky.

Je nepravdepodobné, že všetko vyššie uvedené bude pre mnohých užitočné, ale dúfam, že zostane aspoň všeobecná predstava o tom, ako to funguje.

Mimochodom, hotový dekodér v Pythone už existuje, môžete si ho preštudovať tu. A majitelia SDR prijímačov môžu zostaviť a spustiť hotový dekodér ADS-B zo stránky, podrobnejšie sa o tom hovorilo v prvá časť.

Zdrojový kód parseru popísaný v článku je uvedený pod rezom. Toto je testovací príklad, ktorý nepredstiera, že ide o produkciu, ale niektoré veci v ňom fungujú a možno ho použiť na analýzu súboru zaznamenaného vyššie.
Zdrojový kód (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Dúfam, že to niekoho zaujalo, ďakujem za pozornosť.

Zdroj: hab.com

Pridať komentár