Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Sveiki, Habr. DroÅ”i vien ikviens, kurÅ” kādreiz lidmaŔīnā ir saticis radus vai draugus, ir izmantojis bezmaksas pakalpojumu Flightradar24. Tas ir ļoti ērts veids, kā reāllaikā izsekot lidmaŔīnas atraÅ”anās vietai.

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Š’ pirmā daļa Tika aprakstÄ«ts Ŕāda tieÅ”saistes pakalpojuma darbÄ«bas princips. Tagad mēs turpināsim un izdomāsim, kādi dati tiek sÅ«tÄ«ti un saņemti no lidmaŔīnas uz uztverÅ”anas staciju, un paÅ”i tos atÅ”ifrēsim, izmantojot Python.

Stāsts

AcÄ«mredzot gaisa kuÄ£u dati netiek pārsÅ«tÄ«ti, lai lietotāji to varētu redzēt savos viedtālruņos. Sistēma tiek saukta par ADS-B (Automatic dependent valve-broadcast), un tā tiek izmantota, lai automātiski pārsÅ«tÄ«tu informāciju par lidmaŔīnu uz vadÄ«bas centru - tiek pārraidÄ«ts tā identifikators, koordinātas, virziens, ātrums, augstums un citi dati. IepriekÅ”, pirms Ŕādu sistēmu parādÄ«Å”anās, dispečers varēja redzēt tikai punktu uz radara. Ar to vairs nepietika, kad lidmaŔīnu bija pārāk daudz.

Tehniski ADS-B sastāv no gaisa kuÄ£a raidÄ«tāja, kas periodiski sÅ«ta paketes ar informāciju diezgan augstā frekvencē 1090 MHz (ir arÄ« citi režīmi, taču tie mÅ«s tik ļoti neinteresē, jo koordinātas tiek pārraidÄ«tas tikai Å”eit). Protams, papildus raidÄ«tājam kaut kur lidostā ir arÄ« uztvērējs, bet mums kā lietotājiem interesants ir savs uztvērējs.

Starp citu, salÄ«dzinājumam, pirmā Ŕāda sistēma Airnav Radarbox, kas paredzēta parastajiem lietotājiem, parādÄ«jās 2007. gadā un maksāja aptuveni 900 USD, tÄ«kla pakalpojumu abonÄ“Å”ana maksāja vēl 250 USD gadā.

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Atsauksmes par Å”iem pirmajiem krievu Ä«paÅ”niekiem var lasÄ«t forumā radioskeneris. Tagad, kad RTL-SDR uztvērēji ir kļuvuÅ”i plaÅ”i pieejami, lÄ«dzÄ«gu ierÄ«ci var salikt par USD 30; vairāk par to bija pirmā daļa. Pāriesim pie paÅ”a protokola ā€“ redzēsim, kā tas darbojas.

Signālu uztverŔana

Pirmkārt, signāls ir jāreÄ£istrē. Visa signāla ilgums ir tikai 120 mikrosekundes, tāpēc, lai ērti izjauktu tā sastāvdaļas, ir vēlams SDR uztvērējs ar iztverÅ”anas frekvenci vismaz 5 MHz.

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Pēc ierakstÄ«Å”anas mēs saņemam WAV failu ar iztverÅ”anas ātrumu 5000000 30 500 paraugu/sek; XNUMX sekundes Ŕāda ieraksta ā€œsverā€ aptuveni XNUMX MB. KlausÄ«ties to ar multivides atskaņotāju, protams, ir bezjēdzÄ«gi ā€“ failā ir nevis skaņa, bet gan tieÅ”i digitalizēts radiosignāls ā€“ tieÅ”i tā darbojas Software Defined Radio.

Mēs atvērsim un apstrādāsim failu, izmantojot Python. Tie, kas vēlas eksperimentēt paÅ”i, var lejupielādēt ieraksta piemēru ŠæŠ¾ ссыŠ»ŠŗŠµ.

Lejupielādēsim failu un redzēsim, kas ir iekŔā.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Rezultāts: mēs redzam acÄ«mredzamus ā€œimpulsusā€ pret fona troksni.

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Katrs ā€œimpulssā€ ir signāls, kura struktÅ«ra ir skaidri redzama, palielinot izŔķirtspēju grafikā.

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Kā redzat, attēls diezgan atbilst tam, kas norādÄ«ts iepriekÅ” minētajā aprakstā. JÅ«s varat sākt apstrādāt datus.

DekodēŔana

Pirmkārt, jums ir jāiegūst mazliet plūsma. Pats signāls tiek kodēts, izmantojot Mančestras kodējumu:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

No nibbles lÄ«meņu starpÄ«bas ir viegli iegÅ«t reālus ā€œ0ā€ un ā€œ1ā€.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Pati signāla struktūra ir Ŕāda:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Apskatīsim laukus sīkāk.

DF (Downlink Format, 5 bits) - nosaka ziņojuma veidu. Ir vairāki veidi:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols
(tabulas avots)

MÅ«s interesē tikai tips DF17, jo... Tas satur lidmaŔīnas koordinātas.

ICAO (24 biti) - starptautisks unikālais lidmaŔīnas kods. JÅ«s varat pārbaudÄ«t lidmaŔīnu pēc tās koda tieÅ”saistē (diemžēl autors ir pārtraucis datu bāzes atjaunoÅ”anu, bet tas joprojām ir aktuāls). Piemēram, kodam 3c5ee2 mums ir Ŕāda informācija:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Rediģēt: iekŔā komentāri pie raksta SÄ«kāk ir sniegts ICAO koda apraksts, interesentiem iesaku to izlasÄ«t.

DATU (56 vai 112 biti) - faktiskie dati, kurus mēs atÅ”ifrēsim. Pirmie 5 datu biti ir lauks Tips kods, kas satur glabājamo datu apakÅ”tipu (nejaukt ar DF). Ir diezgan daudz Ŕādu veidu:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols
(tabulas avots)

ApskatÄ«sim dažus pakeÅ”u piemērus.

Gaisa kuģa identifikācija

Piemērs binārā formā:

00100 011 000101 010111 000111 110111 110001 111000

Datu lauki:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, katra rakstzīme C1-C8 satur kodus, kas atbilst indeksiem rindā:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

AtÅ”ifrējot virkni, ir viegli iegÅ«t lidmaŔīnas kodu: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Gaisa desanta pozīcija

Ja nosaukums ir vienkārŔs, tad koordinātas ir sarežģītākas. Tie tiek pārraidīti 2, pāra un nepāra kadru veidā. Lauka kods TC = 01011b = 11.

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols

Pāra un nepāra pakeÅ”u piemērs:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Pats koordinātu aprēķins notiek pēc diezgan sarežģītas formulas:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols
(avots)

Es neesmu ĢIS eksperts, tāpēc nezinu, no kurienes tas nāk. Kas zina, rakstiet komentāros.

Augstums tiek uzskatÄ«ts par vienkārŔāku ā€” atkarÄ«bā no konkrētā bita to var attēlot kā 25 vai 100 pēdu reizinājumu.

Gaisa lidojuma ātrums

Komplekts ar TC=19. Interesanti Å”eit ir tas, ka ātrums var bÅ«t precÄ«zs attiecÄ«bā pret zemi (Ground Speed), vai arÄ« gaisā, ko mēra ar gaisa kuÄ£a sensoru (Airspeed). Tiek pārraidÄ«ti arÄ« daudzi dažādi lauki:

Flightradar24 ā€” kā tas darbojas? 2. daļa, ADS-B protokols
(avots)

Secinājums

Kā redzat, ADS-B tehnoloÄ£ija ir kļuvusi par interesantu simbiozi, kad standarts ir noderÄ«gs ne tikai profesionāļiem, bet arÄ« parastajiem lietotājiem. Bet, protams, galveno lomu tajā spēlēja lētākā digitālo SDR uztvērēju tehnoloÄ£ija, kas ļauj ierÄ«cei burtiski uztvert signālus ar frekvencēm virs gigaherciem ā€œpar santÄ«miemā€.

PaŔā standartā, protams, ir daudz vairāk. Interesenti var apskatÄ«ties PDF formātā lapā ICAO vai apmeklējiet jau iepriekÅ” minēto mājas lapa.

Maz ticams, ka viss iepriekÅ” minētais daudziem noderēs, bet vismaz vispārējais priekÅ”stats par to, kā tas darbojas, es ceru, paliek.

Starp citu, gatavs dekoderis Python jau pastāv, jÅ«s varat to izpētÄ«t Å”eit. Un SDR uztvērēju Ä«paÅ”nieki var salikt un palaist gatavu ADS-B dekodētāju no lapas, tas tika sÄ«kāk apspriests pirmā daļa.

Rakstā aprakstÄ«tā parsētāja pirmkods ir norādÄ«ts zem griezuma. Å is ir testa piemērs, kas nepretendē uz ražoÅ”anu, taču dažas lietas tajā darbojas, un to var izmantot iepriekÅ” ierakstÄ«tā faila parsÄ“Å”anai.
Avota kods (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Ceru, ka kādu ieinteresēja, paldies par uzmanību.

Avots: www.habr.com

Pievieno komentāru