Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Sveiki, Habr. Droši vien ikviens, kurš kādreiz lidmašīnā ir saticis radus vai draugus, ir izmantojis bezmaksas pakalpojumu Flightradar24. Tas ir ļoti ērts veids, kā reāllaikā izsekot lidmašīnas atrašanās vietai.

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

В pirmā daļa Tika aprakstīts šāda tiešsaistes pakalpojuma darbības princips. Tagad mēs turpināsim un izdomāsim, kādi dati tiek sūtīti un saņemti no lidmašīnas uz uztveršanas staciju, un paši tos atšifrēsim, izmantojot Python.

Stāsts

Acīmredzot gaisa kuģu dati netiek pārsūtīti, lai lietotāji to varētu redzēt savos viedtālruņos. Sistēma tiek saukta par ADS-B (Automatic dependent valve-broadcast), un tā tiek izmantota, lai automātiski pārsūtītu informāciju par lidmašīnu uz vadības centru - tiek pārraidīts tā identifikators, koordinātas, virziens, ātrums, augstums un citi dati. Iepriekš, pirms šādu sistēmu parādīšanās, dispečers varēja redzēt tikai punktu uz radara. Ar to vairs nepietika, kad lidmašīnu bija pārāk daudz.

Tehniski ADS-B sastāv no gaisa kuģa raidītāja, kas periodiski sūta paketes ar informāciju diezgan augstā frekvencē 1090 MHz (ir arī citi režīmi, taču tie mūs tik ļoti neinteresē, jo koordinātas tiek pārraidītas tikai šeit). Protams, papildus raidītājam kaut kur lidostā ir arī uztvērējs, bet mums kā lietotājiem interesants ir savs uztvērējs.

Starp citu, salīdzinājumam, pirmā šāda sistēma Airnav Radarbox, kas paredzēta parastajiem lietotājiem, parādījās 2007. gadā un maksāja aptuveni 900 USD, tīkla pakalpojumu abonēšana maksāja vēl 250 USD gadā.

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Atsauksmes par šiem pirmajiem krievu īpašniekiem var lasīt forumā radioskeneris. Tagad, kad RTL-SDR uztvērēji ir kļuvuši plaši pieejami, līdzīgu ierīci var salikt par USD 30; vairāk par to bija pirmā daļa. Pāriesim pie paša protokola – redzēsim, kā tas darbojas.

Signālu uztveršana

Pirmkārt, signāls ir jāreģistrē. Visa signāla ilgums ir tikai 120 mikrosekundes, tāpēc, lai ērti izjauktu tā sastāvdaļas, ir vēlams SDR uztvērējs ar iztveršanas frekvenci vismaz 5 MHz.

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Pēc ierakstīšanas mēs saņemam WAV failu ar iztveršanas ātrumu 5000000 30 500 paraugu/sek; XNUMX sekundes šāda ieraksta “sver” aptuveni XNUMX MB. Klausīties to ar multivides atskaņotāju, protams, ir bezjēdzīgi – failā ir nevis skaņa, bet gan tieši digitalizēts radiosignāls – tieši tā darbojas Software Defined Radio.

Mēs atvērsim un apstrādāsim failu, izmantojot Python. Tie, kas vēlas eksperimentēt paši, var lejupielādēt ieraksta piemēru по ссылке.

Lejupielādēsim failu un redzēsim, kas ir iekšā.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Rezultāts: mēs redzam acīmredzamus “impulsus” pret fona troksni.

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Katrs “impulss” ir signāls, kura struktūra ir skaidri redzama, palielinot izšķirtspēju grafikā.

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Kā redzat, attēls diezgan atbilst tam, kas norādīts iepriekš minētajā aprakstā. Jūs varat sākt apstrādāt datus.

Dekodēšana

Pirmkārt, jums ir jāiegūst mazliet plūsma. Pats signāls tiek kodēts, izmantojot Mančestras kodējumu:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

No nibbles līmeņu starpības ir viegli iegūt reālus “0” un “1”.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Pati signāla struktūra ir šāda:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Apskatīsim laukus sīkāk.

DF (Downlink Format, 5 bits) - nosaka ziņojuma veidu. Ir vairāki veidi:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols
(tabulas avots)

Mūs interesē tikai tips DF17, jo... Tas satur lidmašīnas koordinātas.

ICAO (24 biti) - starptautisks unikālais lidmašīnas kods. Jūs varat pārbaudīt lidmašīnu pēc tās koda tiešsaistē (diemžēl autors ir pārtraucis datu bāzes atjaunošanu, bet tas joprojām ir aktuāls). Piemēram, kodam 3c5ee2 mums ir šāda informācija:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Rediģēt: iekšā komentāri pie raksta Sīkāk ir sniegts ICAO koda apraksts, interesentiem iesaku to izlasīt.

DATU (56 vai 112 biti) - faktiskie dati, kurus mēs atšifrēsim. Pirmie 5 datu biti ir lauks Tips kods, kas satur glabājamo datu apakštipu (nejaukt ar DF). Ir diezgan daudz šādu veidu:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols
(tabulas avots)

Apskatīsim dažus pakešu piemērus.

Gaisa kuģa identifikācija

Piemērs binārā formā:

00100 011 000101 010111 000111 110111 110001 111000

Datu lauki:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, katra rakstzīme C1-C8 satur kodus, kas atbilst indeksiem rindā:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

Atšifrējot virkni, ir viegli iegūt lidmašīnas kodu: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Gaisa desanta pozīcija

Ja nosaukums ir vienkāršs, tad koordinātas ir sarežģītākas. Tie tiek pārraidīti 2, pāra un nepāra kadru veidā. Lauka kods TC = 01011b = 11.

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols

Pāra un nepāra pakešu piemērs:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Pats koordinātu aprēķins notiek pēc diezgan sarežģītas formulas:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols
(avots)

Es neesmu ĢIS eksperts, tāpēc nezinu, no kurienes tas nāk. Kas zina, rakstiet komentāros.

Augstums tiek uzskatīts par vienkāršāku — atkarībā no konkrētā bita to var attēlot kā 25 vai 100 pēdu reizinājumu.

Gaisa lidojuma ātrums

Komplekts ar TC=19. Interesanti šeit ir tas, ka ātrums var būt precīzs attiecībā pret zemi (Ground Speed), vai arī gaisā, ko mēra ar gaisa kuģa sensoru (Airspeed). Tiek pārraidīti arī daudzi dažādi lauki:

Flightradar24 — kā tas darbojas? 2. daļa, ADS-B protokols
(avots)

Secinājums

Kā redzat, ADS-B tehnoloģija ir kļuvusi par interesantu simbiozi, kad standarts ir noderīgs ne tikai profesionāļiem, bet arī parastajiem lietotājiem. Bet, protams, galveno lomu tajā spēlēja lētākā digitālo SDR uztvērēju tehnoloģija, kas ļauj ierīcei burtiski uztvert signālus ar frekvencēm virs gigaherciem “par santīmiem”.

Pašā standartā, protams, ir daudz vairāk. Interesenti var apskatīties PDF formātā lapā ICAO vai apmeklējiet jau iepriekš minēto mājas lapa.

Maz ticams, ka viss iepriekš minētais daudziem noderēs, bet vismaz vispārējais priekšstats par to, kā tas darbojas, es ceru, paliek.

Starp citu, gatavs dekoderis Python jau pastāv, jūs varat to izpētīt šeit. Un SDR uztvērēju īpašnieki var salikt un palaist gatavu ADS-B dekodētāju no lapas, tas tika sīkāk apspriests pirmā daļa.

Rakstā aprakstītā parsētāja pirmkods ir norādīts zem griezuma. Šis ir testa piemērs, kas nepretendē uz ražošanu, taču dažas lietas tajā darbojas, un to var izmantot iepriekš ierakstītā faila parsēšanai.
Avota kods (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Ceru, ka kādu ieinteresēja, paldies par uzmanību.

Avots: www.habr.com

Pievieno komentāru