Hei Habr. Sannsynligvis har alle som noen gang har møtt eller sett av slektninger eller venner på et fly brukt den gratis tjenesten Flightradar24. Dette er en veldig praktisk måte å spore flyets posisjon i sanntid.
В
Story
Det er åpenbart at flydata ikke overføres slik at brukerne kan se dem på smarttelefonene sine. Systemet kalles ADS-B (Automatic dependent surveillance—broadcast), og brukes til å automatisk overføre informasjon om flyet til kontrollsenteret - identifikator, koordinater, retning, hastighet, høyde og andre data overføres. Tidligere, før bruken av slike systemer, kunne avsenderen bare se et punkt på radaren. Dette var ikke lenger nok når det var for mange fly.
Teknisk sett består ADS-B av en sender på et fly som med jevne mellomrom sender pakker med informasjon med en ganske høy frekvens på 1090 MHz (det finnes andre moduser, men vi er ikke så interessert i dem, siden koordinater overføres bare her). I tillegg til senderen er det selvfølgelig også en mottaker et sted på flyplassen, men for oss som brukere er vår egen mottaker interessant.
Forresten, til sammenligning, det første slike system, Airnav Radarbox, designet for vanlige brukere, dukket opp i 2007, og kostet rundt $900; et abonnement på nettverkstjenester kostet ytterligere $250 i året.
Anmeldelser av de første russiske eierne kan leses på forumet
Mottar signaler
Først må signalet registreres. Hele signalet har en varighet på bare 120 mikrosekunder, så for å komfortabelt demontere komponentene er det ønskelig med en SDR-mottaker med en samplingsfrekvens på minst 5 MHz.
Etter opptak mottar vi en WAV-fil med en samplingshastighet på 5000000 30 500 samples/sek; XNUMX sekunder av et slikt opptak "veier" omtrent XNUMX MB. Å lytte til den med en mediespiller er selvfølgelig ubrukelig – filen inneholder ikke lyd, men et direkte digitalisert radiosignal – det er akkurat slik Software Defined Radio fungerer.
Vi vil åpne og behandle filen med Python. De som vil eksperimentere på egenhånd kan laste ned et eksempelopptak
La oss laste ned filen og se hva som er inni.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Resultat: vi ser tydelige "pulser" mot bakgrunnsstøyen.
Hver "puls" er et signal, hvis struktur er tydelig synlig hvis du øker oppløsningen på grafen.
Som du kan se, er bildet ganske konsistent med det som er gitt i beskrivelsen ovenfor. Du kan begynne å behandle dataene.
Dekoding
Først må du få litt strøm. Selve signalet er kodet med Manchester-koding:
Fra nivåforskjellen i nibbles er det lett å få ekte "0" og "1".
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
Strukturen til selve signalet er som følger:
La oss se på feltene mer detaljert.
DF (Downlink Format, 5 bits) - bestemmer typen melding. Det finnes flere typer:
Vi er kun interessert i type DF17, fordi... Det er denne som inneholder koordinatene til flyet.
ICAO (24 bits) - internasjonal unik kode for flyet. Du kan sjekke flyet med koden
Edit: i
DATA (56 eller 112 biter) - de faktiske dataene som vi skal dekode. De første 5 databitene er feltet Type kode, som inneholder undertypen til dataene som lagres (må ikke forveksles med DF). Det er ganske mange av disse typene:
La oss se på noen eksempler på pakker.
Flyidentifikasjon
Eksempel i binær form:
00100 011 000101 010111 000111 110111 110001 111000
Datafelt:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, hvert tegn C1-C8 inneholder koder som tilsvarer indekser på linjen:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######
Ved å dekode strengen er det enkelt å få flykoden: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Luftbåren posisjon
Hvis navnet er enkelt, er koordinatene mer kompliserte. De overføres i form av 2, jevne og odde rammer. Feltkode TC = 01011b = 11.
Eksempel på partalls- og oddetallspakker:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Selve beregningen av koordinater skjer i henhold til en ganske vanskelig formel:
(
Jeg er ingen GIS-ekspert, så jeg vet ikke hvor det kommer fra. Hvem vet, skriv i kommentarfeltet.
Høyde anses som enklere - avhengig av den spesifikke biten, kan den representeres som enten et multiplum av 25 eller 100 fot.
Luftbåren hastighet
Pakke med TC=19. Det interessante her er at hastigheten enten kan være nøyaktig, i forhold til bakken (Ground Speed), eller luftbåren, målt av en flysensor (Airspeed). Mange forskjellige felt overføres også:
(
Konklusjon
Som du kan se, har ADS-B-teknologi blitt en interessant symbiose, når en standard er nyttig ikke bare for fagfolk, men også for vanlige brukere. Men selvfølgelig ble en nøkkelrolle i dette spilt av den billigere teknologien til digitale SDR-mottakere, som lar enheten bokstavelig talt motta signaler med frekvenser over en gigahertz "for pennies."
I selve standarden er det selvfølgelig mye mer. Interesserte kan se PDF-en på siden
Det er usannsynlig at alt det ovennevnte vil være nyttig for mange, men i det minste forblir den generelle ideen om hvordan det fungerer, håper jeg.
Forresten, en ferdig dekoder i Python eksisterer allerede, du kan studere den
Kildekoden til parseren beskrevet i artikkelen er gitt under kuttet. Dette er et testeksempel som ikke gir seg ut for å være produksjon, men noen ting fungerer i det, og det kan brukes til å analysere filen registrert ovenfor.
Kildekode (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
Jeg håper noen var interessert, takk for oppmerksomheten.
Kilde: www.habr.com