Flightradar24 - kako deluje? 2. del, protokol ADS-B

Pozdravljeni Habr. Verjetno so vsi, ki so vsaj enkrat srečali ali pospremili sorodnike ali prijatelje na letalu, uporabili brezplačno storitev Flightradar24. To je zelo priročen način za spremljanje položaja letala v realnem času.

Flightradar24 - kako deluje? 2. del, protokol ADS-B

В 1. del opisan je bil princip delovanja tovrstne spletne storitve. Zdaj bomo šli dlje in ugotovili, kateri podatki se prenašajo in sprejemajo iz letala do sprejemne postaje, in jih sami dekodiramo s pomočjo Pythona.

Zgodba

Jasno je, da se podatki o letalih ne delijo, da bi jih uporabniki lahko videli na svojih pametnih telefonih. Sistem se imenuje ADS-B (Automatic dependent surveillance—broadcast) in se uporablja za avtomatsko posredovanje informacij o letalu v nadzorni center - prenašajo se njegov identifikator, koordinate, smer, hitrost, nadmorska višina in drugi podatki. Prej, pred pojavom tovrstnih sistemov, je lahko dispečer videl le piko na radarju. To ni bilo dovolj, ko je bilo letal preveč.

Tehnično je ADS-B sestavljen iz letalskega oddajnika, ki občasno pošilja pakete z informacijami na dokaj visoki frekvenci 1090 MHz (obstajajo tudi drugi načini, vendar za nas niso tako zanimivi, ker se koordinate prenašajo samo tukaj). Seveda je poleg oddajnika nekje na letališču tudi sprejemnik, a za nas kot uporabnike je zanimiv lasten sprejemnik.

Mimogrede, za primerjavo, prvi tak sistem Airnav Radarbox, zasnovan za običajne uporabnike, se je pojavil leta 2007 in je stal približno 900 dolarjev, naročnina na omrežne storitve pa je stala približno 250 dolarjev na leto.

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Ocene teh prvih ruskih lastnikov lahko preberete na forumu radijski skener. Zdaj, ko so sprejemniki RTL-SDR postali množično dostopni, je podobno napravo mogoče sestaviti že za 30 dolarjev, več o tem je bilo v 1. del. Prešli bomo na sam protokol - poglejmo, kako deluje.

Sprejem signala

Najprej je treba posneti signal. Celoten signal traja le 120 mikrosekund, zato je za udobno razstavljanje njegovih komponent zaželen SDR sprejemnik s frekvenco vzorčenja vsaj 5 MHz.

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Po snemanju dobimo datoteko WAV s hitrostjo vzorčenja 5000000 vzorcev / sek, 30 sekund takega posnetka "tehta" približno 500 MB. Seveda ga je neuporabno poslušati z multimedijskim predvajalnikom - datoteka ne vsebuje zvoka, ampak neposredno digitaliziran radijski signal - tako deluje programsko definiran radio.

Datoteko bomo odprli in obdelali s programom Python. Tisti, ki želite eksperimentirati sami, lahko prenesete primer posnetka по ссылке.

Prenesimo datoteko in poglejmo, kaj je notri.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Rezultat: vidimo jasne "impulze" na ozadju hrupa.

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Vsak "impulz" je signal, katerega struktura je jasno vidna, če povečate ločljivost na grafu.

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Kot lahko vidite, je slika povsem skladna s tem, kar je navedeno v zgornjem opisu. Lahko začnete z obdelavo podatkov.

Dekodiranje

Najprej morate dobiti malo toka. Sam signal je kodiran z manchesterskim kodiranjem:

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Z lahkoto je dobiti prave "0" in "1" iz razlike v stopnjah v grižljajih.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Struktura samega signala je naslednja:

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Oglejmo si polja podrobneje.

DF (Format povezave navzdol, 5 bitov) — določa vrsto sporočila. Obstaja več vrst:

Flightradar24 - kako deluje? 2. del, protokol ADS-B
(vir tabele)

Zanima nas samo tip DF17, ker vsebuje koordinate letala.

ICAO (24 bitov) — mednarodna edinstvena koda letala. Letalo lahko preverite po njegovi kodi dosegljiv (Avtor je na žalost prenehal posodabljati bazo, vendar je še vedno aktualna). Na primer, za kodo 3c5ee2 imamo naslednje podatke:

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Uredi: v komentarji na članek podrobneje je podan opis kode ICAO, priporočam, da si ga zainteresirani preberejo.

PODATKI (56 ali 112 bitov) - dejanski podatki, ki jih bomo dekodirali. Prvih 5 bitov podatkov je polje Vnesite kodoA, ki vsebuje podvrsto shranjenih podatkov (ne zamenjujte z DF). Teh vrst je kar nekaj:

Flightradar24 - kako deluje? 2. del, protokol ADS-B
(vir tabele)

Oglejmo si nekaj primerov paketov.

identifikacijo letala

Binarni primer:

00100 011 000101 010111 000111 110111 110001 111000

Podatkovna polja:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, vsak znak C1-C8 vsebuje kode, ki ustrezajo indeksom v nizu:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_################0123456789######

Po dekodiranju niza je enostavno dobiti kodo letala: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

položaj v zraku

Če je z imenom vse preprosto, potem je s koordinatami bolj zapleteno. Prenašajo se kot 2x, lihi in sodi okvirji. Koda polja TC = 01011b = 11.

Flightradar24 - kako deluje? 2. del, protokol ADS-B

Primer sodih in lihih paketov:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Sam izračun koordinat poteka po precej zapleteni formuli:

Flightradar24 - kako deluje? 2. del, protokol ADS-B
(Vir)

Nisem strokovnjak za GIS, zato ne vem, od kod prihaja. Kdo ve, napišite v komentarje.

Nadmorska višina velja za enostavnejšo - odvisno od specifičnega bita jo je mogoče predstaviti kot večkratnik 25 ali 100 čevljev.

Hitrost v zraku

Paket s TC=19. Zanimivo pri tem je, da je lahko hitrost natančna glede na tla (Hitrost na tleh) in zrak, izmerjena s senzorjem letala (Zračna hitrost). Opravljenih je tudi veliko različnih področij:

Flightradar24 - kako deluje? 2. del, protokol ADS-B
(Vir)

Zaključek

Kot lahko vidite, je tehnologija ADS-B postala zanimiva simbioza, ko je standard uporaben ne le za profesionalce, ampak tudi za navadne uporabnike. Seveda pa je ključno vlogo pri tem odigrala cenejša tehnologija digitalnih SDR sprejemnikov, ki omogoča, da naprava dobesedno »za drobiž« sprejema signale s frekvenco, višjo od gigaherca.

V samem standardu pa seveda veliko več. Zainteresirani si lahko ogledate PDF na strani ICAO ali obiščite že zgoraj omenjeno Spletna stran.

Malo verjetno je, da bo vse zgoraj našteto koristno za mnoge, vendar vsaj splošna predstava o tem, kako deluje, upam, ostaja.

Mimogrede, že pripravljen dekoder Python že obstaja, lahko ga preučite tukaj. In lastniki sprejemnikov SDR lahko sestavijo in zaženejo že pripravljen dekoder ADS-B s strani, to je podrobneje obravnavano v 1. del.

Pod rezom je navedena izvorna koda razčlenjevalnika, opisanega v članku. To je testni primer, ki se ne pretvarja, da je proizvodnja, vendar nekaj v njem deluje in lahko razčlenijo zgoraj posneto datoteko.
Izvorna koda (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Upam, da je koga zanimalo, hvala za pozornost.

Vir: www.habr.com

Dodaj komentar