Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Zdravo Habr. Vjerovatno je svako ko je ikada sreo ili ispratio rodbinu ili prijatelje u avionu koristio besplatnu uslugu Flightradar24. Ovo je veoma zgodan način praćenja položaja aviona u realnom vremenu.

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

В prvi deo Opisan je princip rada ovakve online usluge. Sada ćemo ići naprijed i shvatiti koji se podaci šalju i primaju od aviona do prijemne stanice i sami ih dekodirati koristeći Python.

История

Očigledno, podaci o avionima se ne prenose kako bi ih korisnici mogli vidjeti na svojim pametnim telefonima. Sistem se zove ADS-B (Automatic dependent nadzor—emitovanje), a koristi se za automatski prenos informacija o avionu do kontrolnog centra – prenose se njegov identifikator, koordinate, pravac, brzina, visina i drugi podaci. Ranije, prije pojave takvih sistema, dispečer je mogao vidjeti samo tačku na radaru. Ovo više nije bilo dovoljno kada je bilo previše aviona.

Tehnički, ADS-B se sastoji od predajnika na avionu koji periodično šalje pakete informacija na prilično visokoj frekvenciji od 1090 MHz (postoje i drugi načini rada, ali oni nas ne zanimaju toliko, jer se koordinate prenose samo ovdje). Naravno, pored odašiljača negdje na aerodromu postoji i prijemnik, ali nama kao korisnicima je interesantan vlastiti prijemnik.

Inače, za poređenje, prvi takav sistem, Airnav Radarbox, dizajniran za obične korisnike, pojavio se 2007. godine i koštao je oko 900 dolara, a pretplata na mrežne usluge koštala je još 250 dolara godišnje.

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Recenzije tih prvih ruskih vlasnika možete pročitati na forumu radioscanner. Sada kada su RTL-SDR prijemnici postali široko dostupni, sličan uređaj se može sastaviti za 30 dolara; više o tome bilo je u prvi deo. Pređimo na sam protokol - da vidimo kako on funkcionira.

Prijem signala

Prvo, signal treba snimiti. Cijeli signal traje samo 120 mikrosekundi, pa je za udobno rastavljanje njegovih komponenti poželjan SDR prijemnik sa frekvencijom uzorkovanja od najmanje 5 MHz.

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Nakon snimanja dobijamo WAV fajl sa frekvencijom uzorkovanja od 5000000 semplova u sekundi; 30 sekundi takvog snimanja „teže“ oko 500MB. Slušanje putem media playera je, naravno, beskorisno - datoteka ne sadrži zvuk, već direktno digitalizirani radio signal - upravo tako radi softverski definirani radio.

Otvorit ćemo i obraditi datoteku koristeći Python. Oni koji žele sami eksperimentirati mogu preuzeti primjer snimka link.

Hajde da preuzmemo fajl i vidimo šta je unutra.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Rezultat: vidimo očigledne „impulse“ naspram pozadinske buke.

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Svaki "puls" je signal čija je struktura jasno vidljiva ako povećate rezoluciju na grafikonu.

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Kao što vidite, slika je sasvim u skladu sa onim što je dato u opisu iznad. Možete započeti obradu podataka.

Dekodiranje

Prvo, morate dobiti bit stream. Sam signal je kodiran korištenjem Manchesterskog kodiranja:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Iz razlike u nivou u grickalicama lako je dobiti prave “0” i “1”.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Struktura samog signala je sljedeća:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Pogledajmo polja detaljnije.

DF (Downlink Format, 5 bita) - određuje tip poruke. Postoji nekoliko vrsta:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol
(izvor tabele)

Interesuje nas samo tip DF17, jer... To je ono što sadrži koordinate aviona.

ICAO (24 bita) - međunarodni jedinstveni kod aviona. Avion možete provjeriti po njegovom kodu online (nažalost, autor je prestao da ažurira bazu podataka, ali je i dalje relevantna). Na primjer, za kod 3c5ee2 imamo sljedeće informacije:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Uredi: u komentari na članak Detaljnije je dat opis ICAO koda, preporučujem da ga zainteresovani pročitaju.

PODACI (56 ili 112 bita) - stvarni podaci koje ćemo dekodirati. Prvih 5 bitova podataka je polje Tip kod, koji sadrži podtip podataka koji se pohranjuju (ne treba se brkati sa DF). Postoji dosta ovih vrsta:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol
(izvor tabele)

Pogledajmo nekoliko primjera paketa.

Identifikacija aviona

Primjer u binarnom obliku:

00100 011 000101 010111 000111 110111 110001 111000

Podatkovna polja:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, svaki znak C1-C8 sadrži kodove koji odgovaraju indeksima u retku:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######

Dekodiranjem niza lako je dobiti šifru aviona: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Položaj u vazduhu

Ako je ime jednostavno, onda su koordinate složenije. Prenose se u obliku 2, parni i neparni okviri. Šifra polja TC = 01011b = 11.

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol

Primjer parnih i neparnih paketa:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Sam proračun koordinata odvija se prema prilično lukavoj formuli:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol
(izvor)

Nisam stručnjak za GIS, pa ne znam odakle dolazi. Ko zna neka piše u komentarima.

Visina se smatra jednostavnijom - ovisno o određenom bitu, može se predstaviti kao višekratnik od 25 ili 100 stopa.

Airborne Velocity

Paket sa TC=19. Interesantna stvar ovdje je da brzina može biti ili tačna, u odnosu na tlo (Ground Speed), ili u zraku, mjerena senzorom aviona (Airspeed). Mnogo različitih polja se takođe prenosi:

Flightradar24 - kako funkcionira? Dio 2, ADS-B protokol
(izvor)

zaključak

Kao što vidite, ADS-B tehnologija je postala zanimljiva simbioza, kada je standard koristan ne samo profesionalcima, već i običnim korisnicima. No, naravno, ključnu ulogu u tome odigrala je jeftinija tehnologija digitalnih SDR prijemnika, koja omogućava uređaju da doslovno prima signale s frekvencijama iznad gigaherca "za peni".

U samom standardu, naravno, ima mnogo više. Zainteresovani mogu pogledati PDF na stranici ICAO ili posjetite onaj koji je već spomenut web stranica.

Malo je vjerovatno da će sve gore navedeno mnogima biti korisno, ali barem opća ideja o tome kako funkcionira, nadam se, ostaje.

Inače, gotov dekoder u Pythonu već postoji, možete ga proučiti ovdje. A vlasnici SDR prijemnika mogu sastaviti i pokrenuti gotov ADS-B dekoder sa stranice, o tome se detaljnije govorilo u prvi deo.

Izvorni kod parsera opisanog u članku je dat ispod reza. Ovo je probni primjer koji ne pretenduje da bude produkcija, ali neke stvari u njemu rade i može se koristiti za raščlanjivanje datoteke snimljene iznad.
Izvorni kod (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Nadam se da je neko bio zainteresovan, hvala na pažnji.

izvor: www.habr.com

Dodajte komentar