Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Tere Habr. Tõenäoliselt on tasuta Flightradar24 teenust kasutanud kõik, kes on kunagi lennukis sugulaste või sõpradega kohtunud või kohtunud. See on väga mugav viis lennuki asukoha jälgimiseks reaalajas.

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

В Esimene osa Kirjeldati sellise võrguteenuse tööpõhimõtet. Nüüd läheme edasi ja selgitame välja, milliseid andmeid lennukist vastuvõtujaama saadetakse ja vastu võetakse, ning dekodeerime need Pythoni abil ise.

Lugu

Ilmselgelt ei edastata lennukiandmeid, et kasutajad saaksid neid nutitelefonides näha. Süsteem kannab nime ADS-B (Automatic dependent valve-broadcast) ning seda kasutatakse lennumasina kohta käiva info automaatseks edastamiseks juhtimiskeskusesse – edastatakse selle identifikaator, koordinaadid, suund, kiirus, kõrgus ja muud andmed. Varem, enne selliste süsteemide tulekut, nägi dispetšer radaril ainult punkti. Sellest ei piisanud enam, kui lennukeid oli liiga palju.

Tehniliselt koosneb ADS-B õhusõiduki saatjast, mis saadab perioodiliselt teabepakette üsna kõrgel sagedusel 1090 MHz (on ka teisi režiime, kuid me pole neist nii huvitatud, kuna koordinaate edastatakse ainult siin). Muidugi on kuskil lennujaamas lisaks saatjale ka vastuvõtja, aga meile kui kasutajatele on huvitav meie enda vastuvõtja.

Muide, võrdluseks, esimene selline tavakasutajatele mõeldud süsteem Airnav Radarbox ilmus 2007. aastal ja maksis umbes 900 dollarit, võrguteenuste tellimus maksis veel 250 dollarit aastas.

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Nende esimeste vene omanike arvustusi saab lugeda foorumist raadioskanner. Nüüd, kui RTL-SDR-vastuvõtjad on muutunud laialdaselt kättesaadavaks, saab sarnase seadme kokku panna 30 dollari eest; selle kohta oli rohkem teavet Esimene osa. Liigume edasi protokolli enda juurde – vaatame, kuidas see töötab.

Signaalide vastuvõtmine

Esiteks tuleb signaal salvestada. Kogu signaali kestus on vaid 120 mikrosekundit, nii et selle komponentide mugavaks lahtivõtmiseks on soovitav SDR-vastuvõtja, mille diskreetimissagedus on vähemalt 5 MHz.

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Pärast salvestamist saame WAV-faili, mille diskreetimissagedus on 5000000 30 500 sämplit sekundis, XNUMX sekundit sellist salvestust “kaaluvad” umbes XNUMX MB. Meediapleieriga kuulamine on muidugi kasutu - fail ei sisalda heli, vaid otse digiteeritud raadiosignaali - just nii töötab Software Defined Radio.

Avame ja töötleme faili Pythoni abil. Need, kes soovivad omal käel katsetada, saavad alla laadida näidissalvestuse по ссылке.

Laadime faili alla ja vaatame, mis seal sees on.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Tulemus: näeme taustamüra taustal ilmseid "impulsse".

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Iga "impulss" on signaal, mille struktuur on selgelt nähtav, kui suurendate graafikul eraldusvõimet.

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Nagu näete, on pilt üsna kooskõlas ülaltoodud kirjeldusega. Saate alustada andmete töötlemist.

Dekodeerimine

Esiteks peate saama natuke voogu. Signaal ise on kodeeritud Manchesteri kodeeringuga:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Niblite tasemeerinevuse põhjal on lihtne saada tõelised “0” ja “1”.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Signaali enda struktuur on järgmine:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Vaatame põlde lähemalt.

DF (Downlink Format, 5 bits) – määrab sõnumi tüübi. Neid on mitut tüüpi:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll
(tabeli allikas)

Meid huvitab ainult tüüp DF17, sest... Just see sisaldab lennuki koordinaate.

ICAO (24 bitti) - õhusõiduki rahvusvaheline unikaalne kood. Lennukit saate kontrollida selle koodi järgi Internetis (kahjuks on autor andmebaasi uuendamise lõpetanud, kuid see on endiselt asjakohane). Näiteks koodi 3c5ee2 jaoks on meil järgmine teave:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Redigeeri: sisse kommentaarid artiklile ICAO koodi kirjeldus on toodud täpsemalt, huvilistel soovitan tutvuda.

DATA (56 või 112 bitti) - tegelikud andmed, mida me dekodeerime. Esimesed 5 bitti andmeid on väli Tüüpkood, mis sisaldab salvestatavate andmete alamtüüpi (mitte segi ajada DF-iga). Neid tüüpe on üsna palju:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll
(tabeli allikas)

Vaatame mõnda näidet pakenditest.

Lennuki identifitseerimine

Näide binaarsel kujul:

00100 011 000101 010111 000111 110111 110001 111000

Andmeväljad:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, iga märk C1-C8 sisaldab koode, mis vastavad rea indeksitele:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

Stringi dekodeerides on lihtne saada lennuki kood: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Asend õhus

Kui nimi on lihtne, siis koordinaadid on keerulisemad. Neid edastatakse kahe, paaris- ja paaritu kaadri kujul. Väljakood TC = 2b = 01011.

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll

Paaris- ja paaritupakettide näide:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Koordinaatide arvutamine ise toimub üsna keerulise valemi järgi:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll
(allikas)

Ma ei ole GIS-ekspert, nii et ma ei tea, kust see tuleb. Kes teab, kirjutage kommentaaridesse.

Kõrgust peetakse lihtsamaks – olenevalt konkreetsest bitist võib seda esitada kas 25 või 100 jala kordsena.

Õhukiirus

Pakett TC=19-ga. Huvitav on siin see, et kiirus võib olla kas täpne maapinna suhtes (Ground Speed) või õhus, mõõdetuna õhusõiduki anduriga (Airspeed). Samuti edastatakse palju erinevaid välju:

Flightradar24 – kuidas see töötab? 2. osa, ADS-B protokoll
(allikas)

Järeldus

Nagu näete, on ADS-B tehnoloogia muutunud huvitavaks sümbioosiks, kui standard on kasulik mitte ainult professionaalidele, vaid ka tavakasutajatele. Kuid loomulikult mängis selles võtmerolli digitaalsete SDR-vastuvõtjate odavam tehnoloogia, mis võimaldab seadmel sõna otseses mõttes "sentide eest" vastu võtta signaale, mille sagedus ületab gigahertsi.

Standardis endas on muidugi palju rohkem. Huvilised saavad lehelt vaadata PDF-i ICAO või külastage juba eespool mainitud veebisait.

On ebatõenäoline, et kõik ülaltoodud on paljudele kasulikud, kuid loodan, et vähemalt üldine ettekujutus selle toimimisest jääb alles.

Muide, Pythonis on valmis dekooder juba olemas, saate seda uurida siin. Ja SDR-vastuvõtjate omanikud saavad valmis ADS-B dekoodri kokku panna ja käivitada lehelt, arutati seda üksikasjalikumalt aastal Esimene osa.

Artiklis kirjeldatud parseri lähtekood on toodud lõike all. See on testnäide, mis ei pretendeeri tootmisele, kuid selles töötavad mõned asjad ja seda saab kasutada ülal salvestatud faili sõelumiseks.
Lähtekood (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Loodan, et keegi oli huvitatud, tänan tähelepanu eest.

Allikas: www.habr.com

Lisa kommentaar