Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Salut Habr. Probabil că toți cei care s-au întâlnit cel puțin o dată sau și-au văzut rude sau prieteni într-un avion au folosit serviciul gratuit Flightradar24. Aceasta este o modalitate foarte convenabilă de a urmări poziția aeronavei în timp real.

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

В prima parte a fost descris principiul de funcționare a unui astfel de serviciu online. Acum vom merge mai departe și vom afla ce date sunt transmise și primite de la aeronavă către stația de recepție și le vom decoda noi înșine folosind Python.

Poveste

În mod clar, datele aeronavei nu sunt partajate pentru ca utilizatorii să le vadă pe smartphone-urile lor. Sistemul se numește ADS-B (Automatic dependent surveillance—broadcast) și este folosit pentru a transmite automat informații despre aeronavă către centrul de control - sunt transmise identificatorul acesteia, coordonatele, direcția, viteza, altitudinea și alte date. Anterior, înainte de apariția unor astfel de sisteme, dispecerul putea vedea doar un punct pe radar. Acest lucru nu era suficient când erau prea multe avioane.

Din punct de vedere tehnic, ADS-B constă dintr-un transmițător de aeronavă care trimite periodic pachete cu informații la o frecvență destul de mare de 1090 MHz (sunt și alte moduri, dar nu sunt atât de interesante pentru noi, deoarece coordonatele se transmit doar aici). Bineînțeles, pe lângă transmițător, există și un receptor undeva în aeroport, dar pentru noi, ca utilizatori, ne interesează propriul receptor.

Apropo, pentru comparație, primul astfel de sistem, Airnav Radarbox, conceput pentru utilizatorii obișnuiți, a apărut în 2007 și a costat aproximativ 900 de dolari, iar un abonament la serviciile de rețea costa aproximativ 250 de dolari pe an.

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Recenziile acelor primi proprietari ruși pot fi citite pe forum radioscaner. Acum că receptoarele RTL-SDR au devenit disponibile în mod masiv, un dispozitiv similar poate fi asamblat pentru 30 USD, mai multe despre acest lucru au fost în prima parte. Vom trece la protocolul în sine - să vedem cum funcționează.

Recepția semnalului

În primul rând, semnalul trebuie înregistrat. Întregul semnal are o durată de doar 120 de microsecunde, așa că pentru a-și dezasambla confortabil componentele, este de dorit un receptor SDR cu o rată de eșantionare de cel puțin 5 MHz.

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

După înregistrare, obținem un fișier WAV cu o rată de eșantionare de 5000000 de mostre/sec, 30 de secunde dintr-o astfel de înregistrare „cântăresc” aproximativ 500MB. Desigur, este inutil să-l ascultați cu un media player - fișierul nu conține sunet, ci un semnal radio direct digitizat - așa funcționează Software Defined Radio.

Vom deschide și procesa fișierul folosind Python. Cei care doresc să experimenteze singuri pot descărca un exemplu de înregistrare по ссылке.

Să descarcăm fișierul și să vedem ce este înăuntru.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Rezultat: vedem „impulsuri” clare pe fondul zgomotului.

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Fiecare „impuls” este un semnal, a cărui structură este clar vizibilă dacă măriți rezoluția pe grafic.

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

După cum puteți vedea, imaginea este destul de în concordanță cu ceea ce este dat în descrierea de mai sus. Puteți începe prelucrarea datelor.

Decodare

În primul rând, trebuie să obțineți puțin flux. Semnalul în sine este codificat utilizând codificarea Manchester:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Este ușor să obțineți „0” și „1” real din diferența de nivel în nibbles.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Structura semnalului în sine este următoarea:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Să ne uităm la câmpuri mai detaliat.

DF (Format de legătură descendentă, 5 biți) — definește tipul mesajului. Există mai multe tipuri:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B
(sursa tabelului)

Pe noi ne interesează doar tipul DF17, pentru că conţine coordonatele aeronavei.

OACI (24 de biți) — codul internațional unic al aeronavei. Puteți verifica aeronava după codul său on-line (Din păcate, autorul a oprit actualizarea bazei de date, dar este încă relevantă). De exemplu, pentru codul 3c5ee2 avem următoarele informații:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Editare: în comentarii la articol descrierea codului ICAO este data mai detaliat, recomand celor interesati sa o citeasca.

DATE (56 sau 112 biți) - datele reale pe care le vom decoda. Primii 5 biți de date sunt câmpul Cod de tipA care conține subtipul datelor stocate (a nu se confunda cu DF). Există destul de multe dintre aceste tipuri:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B
(sursa tabelului)

Să ne uităm la câteva exemple de pachete.

Identificarea aeronavei

Exemplu binar:

00100 011 000101 010111 000111 110111 110001 111000

Câmpuri de date:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, fiecare caracter C1-C8 conține coduri corespunzătoare indicilor din șir:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

După decodificarea șirului, este ușor să obțineți codul avionului: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

poziție în aer

Dacă totul este simplu cu numele, atunci cu coordonatele este mai complicat. Ele sunt transmise ca cadre 2x, impare și pare. Cod de câmp TC = 01011b = 11.

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B

Un exemplu de pachete pare și impare:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Calculul în sine al coordonatelor are loc după o formulă destul de complicată:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B
(sursă)

Nu sunt specialist GIS, așa că nu știu de unde vine. Cine știe, scrie în comentarii.

Altitudinea este considerată mai simplă - în funcție de bitul specific, poate fi reprezentată fie ca un multiplu de 25 sau 100 de picioare.

Viteza aeropurtată

Pachet cu TC=19. Lucrul interesant aici este că viteza poate fi atât precisă, raportată la sol (Ground Speed), cât și aer, măsurată de senzorul aeronavei (Airspeed). De asemenea, sunt trecute o mulțime de câmpuri diferite:

Flightradar24 - cum funcționează? Partea 2, Protocolul ADS-B
(sursă)

Concluzie

După cum puteți vedea, tehnologia ADS-B a devenit o simbioză interesantă, când un standard este util nu numai pentru profesioniști, ci și pentru utilizatorii obișnuiți. Dar, desigur, rolul cheie în acest sens a fost jucat de tehnologia mai ieftină a receptoarelor digitale SDR, care permite dispozitivului să primească literalmente „pentru un ban” semnale cu o frecvență mai mare decât un gigahertz.

În standardul în sine, desigur, mult mai mult. Cei interesați pot vizualiza PDF-ul de pe pagină OACI sau vizitați deja menționate mai sus website.

Este puțin probabil ca toate cele de mai sus să fie utile pentru mulți, dar cel puțin ideea generală despre cum funcționează, sper, rămâne.

Apropo, un decodor Python gata făcut există deja, îl puteți studia aici. Și proprietarii de receptoare SDR pot asambla și rula un decodor ADS-B gata făcut din pagina, acest lucru este discutat mai detaliat în prima parte.

Codul sursă al parserului descris în articol este dat sub tăietură. Acesta este un exemplu de testare care nu pretinde a fi producție, dar ceva funcționează în el și pot analiza fișierul înregistrat mai sus.
Cod sursă (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Sper că cineva a fost interesat, mulțumesc pentru atenție.

Sursa: www.habr.com

Adauga un comentariu