Pozdrav Habr. Vjerojatno su svi koji su ikada sreli ili ispratili rođake ili prijatelje u avionu koristili besplatnu uslugu Flightradar24. Ovo je vrlo praktičan način za praćenje položaja zrakoplova u stvarnom vremenu.
В
Priča
Očito se podaci o zrakoplovu ne prenose da bi ih korisnici mogli vidjeti na svojim pametnim telefonima. Sustav se zove ADS-B (Automatic dependent surveillance—broadcast), a koristi se za automatski prijenos informacija o zrakoplovu u kontrolni centar - prenose se njegov identifikator, koordinate, smjer, brzina, visina i drugi podaci. Ranije, prije pojave takvih sustava, dispečer je mogao vidjeti samo točku na radaru. To više nije bilo dovoljno kada je bilo previše aviona.
Tehnički, ADS-B se sastoji od odašiljača u zrakoplovu koji periodički šalje pakete informacija na prilično visokoj frekvenciji od 1090 MHz (postoje i drugi modovi, ali oni nas toliko ne zanimaju, jer se samo ovdje prenose koordinate). Naravno, osim odašiljača, negdje u zračnoj luci postoji i prijamnik, ali nama kao korisnicima zanimljiv je vlastiti prijamnik.
Inače, za usporedbu, prvi takav sustav, Airnav Radarbox, namijenjen običnim korisnicima, pojavio se 2007. godine i koštao je oko 900 dolara, a pretplata na mrežne usluge koštala je još 250 dolara godišnje.
Recenzije tih prvih ruskih vlasnika mogu se pročitati na forumu
Prijem signala
Prvo je potrebno snimiti signal. Cjelokupni signal traje samo 120 mikrosekundi, pa je za udobno rastavljanje njegovih komponenti poželjan SDR prijemnik s frekvencijom uzorkovanja od najmanje 5 MHz.
Nakon snimanja dobivamo WAV datoteku s brzinom uzorkovanja od 5000000 uzoraka/s, a 30 sekundi takve snimke “teži” oko 500 MB. Slušanje s media playerom je, naravno, beskorisno - datoteka ne sadrži zvuk, već izravno digitalizirani radio signal - upravo tako radi softverski definiran radio.
Datoteku ćemo otvoriti i obraditi pomoću Pythona. Oni koji žele sami eksperimentirati mogu preuzeti primjer snimke
Skinimo datoteku i vidimo što je unutra.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Rezultat: vidimo očite "pulsove" naspram pozadinske buke.
Svaki "impuls" je signal, čija je struktura jasno vidljiva ako povećate rezoluciju na grafikonu.
Kao što vidite, slika je sasvim u skladu s onim što je dano u gornjem opisu. Možete početi s obradom podataka.
Dekodiranje
Prvo, morate dobiti malo struje. Sam signal je kodiran Manchester kodiranjem:
Iz razlike u razini grickalica lako je dobiti prave "0" i "1".
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
Struktura samog signala je sljedeća:
Pogledajmo polja detaljnije.
DF (Downlink Format, 5 bita) - određuje vrstu poruke. Postoji nekoliko vrsta:
Zanima nas samo tip DF17, jer... To je ono što sadrži koordinate zrakoplova.
ICAO (24 bita) - međunarodni jedinstveni kod zrakoplova. Avion možete provjeriti po šifri
Uredi: u
PODACI (56 ili 112 bita) - stvarni podaci koje ćemo dekodirati. Prvih 5 bitova podataka su polje Upišite kod, koji sadrži podvrstu podataka koji se pohranjuju (ne smije se brkati s DF). Postoji dosta ovih vrsta:
Pogledajmo nekoliko primjera paketa.
Identifikacija zrakoplova
Primjer u binarnom obliku:
00100 011 000101 010111 000111 110111 110001 111000
Podatkovna polja:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, svaki znak C1-C8 sadrži kodove koji odgovaraju indeksima u retku:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_################0123456789######
Dekodiranjem stringa lako je doći do šifre zrakoplova: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Zračni položaj
Ako je naziv jednostavan, onda su koordinate kompliciranije. Prenose se u obliku 2, parnog i neparnog okvira. Oznaka polja TC = 01011b = 11.
Primjer parnih i neparnih paketa:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Sam izračun koordinata odvija se prema prilično lukavoj formuli:
(
Nisam stručnjak za GIS pa ne znam odakle dolazi. Tko zna neka napiše u komentarima.
Visina se smatra jednostavnijom - ovisno o specifičnom bitu, može se predstaviti ili kao višekratnik 25 ili 100 stopa.
Brzina u zraku
Paket sa TC=19. Zanimljivo je da brzina može biti ili točna, u odnosu na tlo (brzina na tlu), ili u zraku, mjerena senzorom zrakoplova (brzina u zraku). Također se prenose mnoga različita polja:
(
Zaključak
Kao što vidite, ADS-B tehnologija postala je zanimljiva simbioza, kada je standard koristan ne samo profesionalcima, već i običnim korisnicima. No, naravno, ključnu ulogu u tome odigrala je jeftinija tehnologija digitalnih SDR prijamnika, koja uređaju omogućuje doslovno primanje signala s frekvencijama iznad gigaherca “za sitne pare”.
U samom standardu, naravno, postoji mnogo više. Zainteresirani mogu pogledati PDF na stranici
Malo je vjerojatno da će sve gore navedeno biti korisno mnogima, ali barem opća ideja o tome kako to funkcionira, nadam se, ostaje.
Usput, već postoji gotov dekoder u Pythonu, možete ga proučiti
Izvorni kod parsera opisanog u članku naveden je ispod isječka. Ovo je testni primjer koji se ne pretvara da je proizvodni, ali neke stvari u njemu rade i može se koristiti za analizu gore snimljene datoteke.
Izvorni kod (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
Nadam se da je netko bio zainteresiran, hvala na pažnji.
Izvor: www.habr.com