Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Hei Habr. Sannsynligvis har alle som noen gang har møtt eller sett av slektninger eller venner på et fly brukt den gratis tjenesten Flightradar24. Dette er en veldig praktisk måte å spore flyets posisjon i sanntid.

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

В første del Driftsprinsippet for en slik nettjeneste ble beskrevet. Vi vil nå gå videre og finne ut hvilke data som sendes og mottas fra flyet til mottaksstasjonen og dekode det selv ved hjelp av Python.

Story

Det er åpenbart at flydata ikke overføres slik at brukerne kan se dem på smarttelefonene sine. Systemet kalles ADS-B (Automatic dependent surveillance—broadcast), og brukes til å automatisk overføre informasjon om flyet til kontrollsenteret - identifikator, koordinater, retning, hastighet, høyde og andre data overføres. Tidligere, før bruken av slike systemer, kunne avsenderen bare se et punkt på radaren. Dette var ikke lenger nok når det var for mange fly.

Teknisk sett består ADS-B av en sender på et fly som med jevne mellomrom sender pakker med informasjon med en ganske høy frekvens på 1090 MHz (det finnes andre moduser, men vi er ikke så interessert i dem, siden koordinater overføres bare her). I tillegg til senderen er det selvfølgelig også en mottaker et sted på flyplassen, men for oss som brukere er vår egen mottaker interessant.

Forresten, til sammenligning, det første slike system, Airnav Radarbox, designet for vanlige brukere, dukket opp i 2007, og kostet rundt $900; et abonnement på nettverkstjenester kostet ytterligere $250 i året.

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Anmeldelser av de første russiske eierne kan leses på forumet radioskanner. Nå som RTL-SDR-mottakere har blitt allment tilgjengelig, kan en lignende enhet settes sammen for $30; mer om dette var i første del. La oss gå videre til selve protokollen - la oss se hvordan den fungerer.

Mottar signaler

Først må signalet registreres. Hele signalet har en varighet på bare 120 mikrosekunder, så for å komfortabelt demontere komponentene er det ønskelig med en SDR-mottaker med en samplingsfrekvens på minst 5 MHz.

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Etter opptak mottar vi en WAV-fil med en samplingshastighet på 5000000 30 500 samples/sek; XNUMX sekunder av et slikt opptak "veier" omtrent XNUMX MB. Å lytte til den med en mediespiller er selvfølgelig ubrukelig – filen inneholder ikke lyd, men et direkte digitalisert radiosignal – det er akkurat slik Software Defined Radio fungerer.

Vi vil åpne og behandle filen med Python. De som vil eksperimentere på egenhånd kan laste ned et eksempelopptak по ссылке.

La oss laste ned filen og se hva som er inni.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Resultat: vi ser tydelige "pulser" mot bakgrunnsstøyen.

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Hver "puls" er et signal, hvis struktur er tydelig synlig hvis du øker oppløsningen på grafen.

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Som du kan se, er bildet ganske konsistent med det som er gitt i beskrivelsen ovenfor. Du kan begynne å behandle dataene.

Dekoding

Først må du få litt strøm. Selve signalet er kodet med Manchester-koding:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Fra nivåforskjellen i nibbles er det lett å få ekte "0" og "1".

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Strukturen til selve signalet er som følger:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

La oss se på feltene mer detaljert.

DF (Downlink Format, 5 bits) - bestemmer typen melding. Det finnes flere typer:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll
(tabellkilde)

Vi er kun interessert i type DF17, fordi... Det er denne som inneholder koordinatene til flyet.

ICAO (24 bits) - internasjonal unik kode for flyet. Du kan sjekke flyet med koden online (forfatteren har dessverre sluttet å oppdatere databasen, men den er fortsatt relevant). For eksempel, for kode 3c5ee2 har vi følgende informasjon:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Edit: i kommentarer til artikkelen Beskrivelsen av ICAO-koden er gitt mer detaljert, jeg anbefaler at interesserte leser den.

DATA (56 eller 112 biter) - de faktiske dataene som vi skal dekode. De første 5 databitene er feltet Type kode, som inneholder undertypen til dataene som lagres (må ikke forveksles med DF). Det er ganske mange av disse typene:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll
(tabellkilde)

La oss se på noen eksempler på pakker.

Flyidentifikasjon

Eksempel i binær form:

00100 011 000101 010111 000111 110111 110001 111000

Datafelt:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, hvert tegn C1-C8 inneholder koder som tilsvarer indekser på linjen:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

Ved å dekode strengen er det enkelt å få flykoden: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Luftbåren posisjon

Hvis navnet er enkelt, er koordinatene mer kompliserte. De overføres i form av 2, jevne og odde rammer. Feltkode TC = 01011b = 11.

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll

Eksempel på partalls- og oddetallspakker:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Selve beregningen av koordinater skjer i henhold til en ganske vanskelig formel:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll
(kilde)

Jeg er ingen GIS-ekspert, så jeg vet ikke hvor det kommer fra. Hvem vet, skriv i kommentarfeltet.

Høyde anses som enklere - avhengig av den spesifikke biten, kan den representeres som enten et multiplum av 25 eller 100 fot.

Luftbåren hastighet

Pakke med TC=19. Det interessante her er at hastigheten enten kan være nøyaktig, i forhold til bakken (Ground Speed), eller luftbåren, målt av en flysensor (Airspeed). Mange forskjellige felt overføres også:

Flightradar24 - hvordan fungerer det? Del 2, ADS-B-protokoll
(kilde)

Konklusjon

Som du kan se, har ADS-B-teknologi blitt en interessant symbiose, når en standard er nyttig ikke bare for fagfolk, men også for vanlige brukere. Men selvfølgelig ble en nøkkelrolle i dette spilt av den billigere teknologien til digitale SDR-mottakere, som lar enheten bokstavelig talt motta signaler med frekvenser over en gigahertz "for pennies."

I selve standarden er det selvfølgelig mye mer. Interesserte kan se PDF-en på siden ICAO eller besøk den som allerede er nevnt ovenfor сайт.

Det er usannsynlig at alt det ovennevnte vil være nyttig for mange, men i det minste forblir den generelle ideen om hvordan det fungerer, håper jeg.

Forresten, en ferdig dekoder i Python eksisterer allerede, du kan studere den her. Og eiere av SDR-mottakere kan sette sammen og lansere en ferdiglaget ADS-B-dekoder fra siden, ble dette diskutert mer detaljert i første del.

Kildekoden til parseren beskrevet i artikkelen er gitt under kuttet. Dette er et testeksempel som ikke gir seg ut for å være produksjon, men noen ting fungerer i det, og det kan brukes til å analysere filen registrert ovenfor.
Kildekode (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Jeg håper noen var interessert, takk for oppmerksomheten.

Kilde: www.habr.com

Legg til en kommentar