Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Sveiki, Habr. Tikriausiai visi, kurie kada nors yra susitikę ar išvydę gimines ar draugus lėktuve, yra pasinaudoję nemokama Flightradar24 paslauga. Tai labai patogus būdas stebėti orlaivio padėtį realiu laiku.

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

В pirmoji dalis Buvo aprašytas tokios internetinės paslaugos veikimo principas. Dabar eisime į priekį ir išsiaiškinsime, kokie duomenys siunčiami ir gaunami iš orlaivio į priėmimo stotį, ir patys juos dekoduosime naudodami Python.

Istorija

Akivaizdu, kad orlaivių duomenys neperduodami, kad vartotojai galėtų juos pamatyti savo išmaniuosiuose telefonuose. Sistema vadinama ADS-B (Automatic dependent valve-broadcast), naudojama informacijai apie orlaivį automatiškai perduoti į valdymo centrą – perduodamas jo identifikatorius, koordinatės, kryptis, greitis, aukštis ir kiti duomenys. Anksčiau, prieš atsirandant tokioms sistemoms, dispečeris radare matydavo tik tašką. To nebepakako, kai lėktuvų buvo per daug.

Techniškai ADS-B susideda iš siųstuvo orlaivyje, kuris periodiškai siunčia informacijos paketus gana aukštu 1090 MHz dažniu (yra ir kitų režimų, bet mes jais nelabai domimės, nes koordinatės perduodamos tik čia). Žinoma, be siųstuvo, kur nors oro uoste yra ir imtuvas, bet mums, kaip vartotojams, įdomus nuosavas imtuvas.

Beje, palyginimui, pirmoji tokia sistema „Airnav Radarbox“, skirta paprastiems vartotojams, pasirodė 2007 m., kainavo apie 900 USD, tinklo paslaugų abonementas kainuoja dar 250 USD per metus.

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Tų pirmųjų Rusijos savininkų apžvalgas galite perskaityti forume radijo skaitytuvas. Dabar, kai RTL-SDR imtuvai tapo plačiai prieinami, panašų įrenginį galima surinkti už 30 USD; daugiau apie tai buvo pirmoji dalis. Pereikime prie paties protokolo – pažiūrėkime, kaip jis veikia.

Signalų priėmimas

Pirmiausia reikia įrašyti signalą. Visas signalas trunka tik 120 mikrosekundžių, todėl norint patogiai išardyti jo komponentus, pageidautinas SDR imtuvas, kurio diskretizavimo dažnis yra ne mažesnis kaip 5 MHz.

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Po įrašymo gauname WAV failą, kurio diskretizavimo dažnis yra 5000000 30 500 mėginių per sek., XNUMX sekundžių tokio įrašo „sveria“ apie XNUMX MB. Klausytis su medijos grotuvu, žinoma, nenaudinga – faile yra ne garsas, o tiesiogiai suskaitmenintas radijo signalas – būtent taip veikia Software Defined Radio.

Mes atidarysime ir apdorosime failą naudodami Python. Norintys eksperimentuoti patys gali parsisiųsti pavyzdinį įrašą по ссылке.

Atsisiųskite failą ir pažiūrėkime, kas yra viduje.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Rezultatas: matome akivaizdžius „pulsus“ foniniame triukšme.

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Kiekvienas „impulsas“ yra signalas, kurio struktūra aiškiai matoma padidinus grafiko skiriamąją gebą.

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Kaip matote, paveikslėlis visiškai atitinka tai, kas pateikta aukščiau esančiame aprašyme. Galite pradėti apdoroti duomenis.

Dekodavimas

Pirmiausia turite gauti šiek tiek srauto. Pats signalas yra užkoduotas naudojant Mančesterio kodavimą:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Iš niblių lygio skirtumo nesunku gauti tikrus „0“ ir „1“.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Paties signalo struktūra yra tokia:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Pažvelkime į laukus išsamiau.

DF (Downlink Format, 5 bitai) – nustato pranešimo tipą. Yra keletas tipų:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas
(lentelės šaltinis)

Mus domina tik DF17 tipas, nes... Būtent jame yra orlaivio koordinatės.

TCAO (24 bitai) – tarptautinis unikalus orlaivio kodas. Lėktuvą galite patikrinti pagal jo kodą interneto (deja, autorė nustojo atnaujinti duomenų bazę, bet ji vis dar aktuali). Pavyzdžiui, apie kodą 3c5ee2 turime tokią informaciją:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Redaguoti: in komentarai prie straipsnio Išsamesnis ICAO kodekso aprašymas, besidomintiems rekomenduoju jį perskaityti.

DUOMENYS (56 arba 112 bitų) – faktiniai duomenys, kuriuos iššifruosime. Pirmieji 5 duomenų bitai yra laukas Tipo kodas, kuriame yra saugomų duomenų potipis (nepainioti su DF). Yra keletas šių tipų:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas
(lentelės šaltinis)

Pažvelkime į keletą paketų pavyzdžių.

Orlaivio identifikavimas

Pavyzdys dvejetaine forma:

00100 011 000101 010111 000111 110111 110001 111000

Duomenų laukai:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, kiekvienas simbolis C1-C8 yra kodai, atitinkantys indeksus eilutėje:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

Iškodavus eilutę nesunku gauti orlaivio kodą: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Oro desanto padėtis

Jei pavadinimas paprastas, tai koordinatės sudėtingesnės. Jie perduodami 2, lyginių ir nelyginių kadrų pavidalu. Lauko kodas TC = 01011b = 11.

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas

Lyginių ir nelyginių paketų pavyzdys:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Pats koordinačių skaičiavimas vyksta pagal gana sudėtingą formulę:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas
(šaltinis)

Nesu GIS ekspertas, todėl nežinau, iš kur tai. Kas žino, rašykite komentaruose.

Aukštis laikomas paprastesniu – priklausomai nuo konkretaus bito, jis gali būti pavaizduotas kaip 25 arba 100 pėdų kartotinis.

Oro greitis

Pakuotė su TC=19. Įdomu tai, kad greitis gali būti tikslus, palyginti su žeme (Ground Speed), arba ore, matuojamas orlaivio jutikliu (Airspeed). Taip pat perduodama daug skirtingų laukų:

Flightradar24 – kaip tai veikia? 2 dalis, ADS-B protokolas
(šaltinis)

išvada

Kaip matote, ADS-B technologija tapo įdomia simbioze, kai standartas naudingas ne tik profesionalams, bet ir paprastiems vartotojams. Tačiau, žinoma, pagrindinį vaidmenį čia suvaidino pigesnė skaitmeninių SDR imtuvų technologija, leidžianti įrenginiui tiesiogine prasme priimti signalus, kurių dažnis viršija gigahercą „už centus“.

Pačiame standarte, žinoma, yra daug daugiau. Norintieji gali peržiūrėti PDF formatą puslapyje TCAO arba apsilankykite jau minėtame aukščiau Interneto svetainė.

Mažai tikėtina, kad visa tai, kas išdėstyta pirmiau, bus naudinga daugeliui, bet bent jau bendra idėja, kaip tai veikia, tikiuosi, išliks.

Beje, paruoštas dekoderis Python jau egzistuoja, galite jį ištirti čia. O SDR imtuvų savininkai gali surinkti ir paleisti jau paruoštą ADS-B dekoderį iš puslapio, tai buvo išsamiau aptarta pirmoji dalis.

Straipsnyje aprašyto analizatoriaus šaltinio kodas pateiktas po pjūviu. Tai yra bandomasis pavyzdys, kuris nepretenduoja į produkciją, tačiau kai kurie dalykai jame veikia, ir jį galima naudoti anksčiau įrašytam failui išanalizuoti.
Šaltinio kodas (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Tikiuosi, kad kam nors buvo įdomu, ačiū už dėmesį.

Šaltinis: www.habr.com

Добавить комментарий