Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Hej Habr. Förmodligen har alla som någonsin träffat eller sett av släktingar eller vänner på ett plan använt den kostnadsfria tjänsten Flightradar24. Detta är ett mycket bekvämt sätt att spåra flygplanets position i realtid.

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

В den första delen Funktionsprincipen för en sådan onlinetjänst beskrevs. Vi kommer nu att gå vidare och ta reda på vilken data som skickas och tas emot från flygplanet till mottagningsstationen och avkoda det själva med Python.

Story

Uppenbarligen överförs inte flygplansdata så att användarna kan se dem på sina smartphones. Systemet kallas ADS-B (Automatic dependent surveillance—broadcast), och används för att automatiskt överföra information om flygplanet till kontrollcentralen - dess identifierare, koordinater, riktning, hastighet, höjd och andra data överförs. Tidigare, före tillkomsten av sådana system, kunde avsändaren bara se en punkt på radarn. Detta räckte inte längre när det var för många plan.

Tekniskt sett består ADS-B av en sändare på ett flygplan som med jämna mellanrum skickar informationspaket med en ganska hög frekvens på 1090 MHz (det finns andra lägen, men vi är inte så intresserade av dem, eftersom koordinater bara sänds här). Givetvis finns det förutom sändaren även en mottagare någonstans på flygplatsen, men för oss som användare är vår egen mottagare intressant.

Förresten, för jämförelse, det första sådana systemet, Airnav Radarbox, designat för vanliga användare, dök upp 2007 och kostade cirka 900 USD; ett abonnemang på nätverkstjänster kostade ytterligare 250 USD per år.

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Recensioner av de första ryska ägarna kan läsas på forumet radioskanner. Nu när RTL-SDR-mottagare har blivit allmänt tillgängliga kan en liknande enhet sättas ihop för $30; mer om detta fanns i den första delen. Låt oss gå vidare till själva protokollet - låt oss se hur det fungerar.

Tar emot signaler

Först måste signalen spelas in. Hela signalen har en varaktighet på endast 120 mikrosekunder, så för att bekvämt demontera dess komponenter är en SDR-mottagare med en samplingsfrekvens på minst 5 MHz önskvärt.

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Efter inspelningen får vi en WAV-fil med en samplingshastighet på 5000000 30 500 samplingar/sekund; XNUMX sekunder av sådan inspelning "väger" cirka XNUMX MB. Att lyssna på den med en mediaspelare är förstås värdelöst – filen innehåller inget ljud, utan en direkt digitaliserad radiosignal – det är precis så Software Defined Radio fungerar.

Vi kommer att öppna och bearbeta filen med Python. De som vill experimentera på egen hand kan ladda ner en exempelinspelning по ссылке.

Låt oss ladda ner filen och se vad som finns inuti.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Resultat: vi ser tydliga "pulser" mot bakgrundsljudet.

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Varje "puls" är en signal, vars struktur är tydligt synlig om du ökar upplösningen på grafen.

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Som du kan se är bilden ganska överensstämmande med vad som ges i beskrivningen ovan. Du kan börja bearbeta uppgifterna.

Avkodning

Först måste du få lite stream. Signalen i sig är kodad med Manchester-kodning:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Från nivåskillnaden i nibbles är det lätt att få riktiga "0" och "1".

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Strukturen för själva signalen är som följer:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Låt oss titta på fälten mer detaljerat.

DF (Downlink Format, 5 bitar) - bestämmer typen av meddelande. Det finns flera typer:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll
(tabellkälla)

Vi är bara intresserade av typ DF17, eftersom... Det är denna som innehåller flygplanets koordinater.

ICAO (24 bitar) - internationell unik kod för flygplanet. Du kan kontrollera planet genom dess kod nätet (tyvärr har författaren slutat uppdatera databasen, men den är fortfarande aktuell). Till exempel, för kod 3c5ee2 har vi följande information:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Edit: in kommentarer till artikeln Beskrivningen av ICAO-koden ges mer detaljerat, jag rekommenderar att intresserade läser den.

DATA (56 eller 112 bitar) - den faktiska data som vi kommer att avkoda. De första 5 databitarna är fältet kod, som innehåller undertypen av data som lagras (inte att förväxla med DF). Det finns en hel del av dessa typer:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll
(tabellkälla)

Låt oss titta på några exempel på paket.

Flygplansidentifikation

Exempel i binär form:

00100 011 000101 010111 000111 110111 110001 111000

Datafält:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, varje tecken C1-C8 innehåller koder som motsvarar index på raden:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

Genom att avkoda strängen är det lätt att få flygplanskoden: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Luftburen position

Om namnet är enkelt är koordinaterna mer komplicerade. De sänds i form av 2, jämna och udda ramar. Fältkod TC = 01011b = 11.

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll

Exempel på jämna och udda paket:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Själva beräkningen av koordinater sker enligt en ganska knepig formel:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll
(källa)

Jag är ingen GIS-expert, så jag vet inte var det kommer ifrån. Vem vet, skriv i kommentarerna.

Höjd anses vara enklare - beroende på den specifika biten kan den representeras som antingen en multipel av 25 eller 100 fot.

Luftburen hastighet

Paket med TC=19. Det intressanta här är att hastigheten kan vara antingen exakt, i förhållande till marken (Ground Speed), eller luftburen, mätt av en flygplanssensor (Airspeed). Många olika fält överförs också:

Flightradar24 - hur fungerar det? Del 2, ADS-B-protokoll
(källa)

Slutsats

Som du kan se har ADS-B-teknik blivit en intressant symbios, när en standard är användbar inte bara för proffs utan också för vanliga användare. Men naturligtvis spelades en nyckelroll i detta av den billigare tekniken för digitala SDR-mottagare, vilket gör att enheten bokstavligen kan ta emot signaler med frekvenser över en gigahertz "för pennies".

I själva standarden finns det förstås mycket mer. Den som är intresserad kan se PDF-filen på sidan ICAO eller besök den som redan nämnts ovan сайт.

Det är osannolikt att allt ovanstående kommer att vara användbart för många, men åtminstone den allmänna idén om hur det fungerar, hoppas jag, finns kvar.

Förresten, en färdig dekoder i Python finns redan, du kan studera den här. Och ägare av SDR-mottagare kan montera och lansera en färdig ADS-B-avkodare från sidan, diskuterades detta mer ingående i den första delen.

Källkoden för parsern som beskrivs i artikeln ges under klippet. Det här är ett testexempel som inte utger sig för att vara produktion, men vissa saker fungerar i det, och det kan användas för att analysera filen som spelats in ovan.
Källkod (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Jag hoppas att någon var intresserad, tack för din uppmärksamhet.

Källa: will.com

Lägg en kommentar