Здравей Хабр. Вероятно всеки, който някога е срещал или изпращал роднини или приятели в самолет, е използвал безплатната услуга Flightradar24. Това е много удобен начин за проследяване на позицията на самолета в реално време.
В
История
Очевидно данните за самолетите не се предават, за да могат потребителите да ги виждат на своите смартфони. Системата се нарича ADS-B (Automatic dependent surveillance—broadcast) и се използва за автоматично предаване на информация за самолета към контролния център - предават се неговия идентификатор, координати, посока, скорост, надморска височина и други данни. Преди, преди появата на такива системи, диспечерът можеше да види само точка на радара. Това вече не беше достатъчно, когато имаше твърде много самолети.
Технически ADS-B се състои от предавател на самолет, който периодично изпраща пакети информация на доста висока честота от 1090 MHz (има и други режими, но ние не се интересуваме толкова от тях, тъй като координатите се предават само тук). Разбира се, освен предавателя, има и приемник някъде на летището, но за нас, като потребители, нашият собствен приемник е интересен.
Между другото, за сравнение, първата такава система Airnav Radarbox, предназначена за обикновени потребители, се появи през 2007 г. и струваше около $900; абонаментът за мрежови услуги струва още $250 на година.
Отзивите на тези първи руски собственици могат да бъдат прочетени във форума
Приемане на сигнали
Първо, сигналът трябва да бъде записан. Целият сигнал има продължителност от само 120 микросекунди, така че за удобно разглобяване на неговите компоненти е желателен SDR приемник с честота на семплиране най-малко 5 MHz.
След записа получаваме WAV файл с честота на семплиране от 5000000 30 500 проби в секунда; XNUMX секунди от такъв запис „тежат“ около XNUMX MB. Слушането му с медиен плейър, разбира се, е безполезно - файлът не съдържа звук, а директно цифровизиран радиосигнал - точно така работи софтуерно дефинираното радио.
Ще отворим и обработим файла с помощта на Python. Тези, които искат да експериментират сами, могат да изтеглят примерен запис
Нека изтеглим файла и да видим какво има вътре.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Резултат: виждаме очевидни „пулсации“ на фоновия шум.
Всеки „импулс“ е сигнал, чиято структура е ясно видима, ако увеличите разделителната способност на графиката.
Както можете да видите, картината е доста съвместима с това, което е дадено в описанието по-горе. Можете да започнете обработката на данните.
Декодиране
Първо, трябва да получите битов поток. Самият сигнал е кодиран с помощта на Манчестър кодиране:
От разликата в нивата на хапките е лесно да получите истински „0“ и „1“.
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
Структурата на самия сигнал е следната:
Нека разгледаме полетата по-подробно.
DF (Downlink Format, 5 бита) – определя вида на съобщението. Има няколко вида:
Интересуваме се само от тип DF17, защото... Това е, което съдържа координатите на самолета.
ICAO (24 бита) - международен уникален код на самолета. Можете да проверите самолета по неговия код
Редактиране: в
ДАННИ (56 или 112 бита) - действителните данни, които ще декодираме. Първите 5 бита данни са полето Type Code, съдържащ подтипа на данните, които се съхраняват (да не се бърка с DF). Има доста от тези видове:
Нека да разгледаме няколко примера за пакети.
Идентификация на самолета
Пример в двоична форма:
00100 011 000101 010111 000111 110111 110001 111000
Полета за данни:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, всеки знак C1-C8 съдържа кодове, съответстващи на индексите в реда:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_################0123456789######
Чрез декодиране на низа е лесно да получите кода на самолета: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Позиция във въздуха
Ако името е просто, тогава координатите са по-сложни. Те се предават под формата на 2 четни и нечетни кадъра. Код на полето TC = 01011b = 11.
Пример за четни и нечетни пакети:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Самото изчисляване на координатите се извършва по доста сложна формула:
(
Не съм ГИС експерт, така че не знам откъде идва. Кой знае, пишете в коментарите.
Височината се счита за по-проста - в зависимост от конкретния бит, тя може да бъде представена като кратно на 25 или 100 фута.
Скорост във въздуха
Пакет с TC=19. Интересното тук е, че скоростта може да бъде или точна, спрямо земята (наземна скорост), или във въздуха, измерена от самолетен сензор (въздушна скорост). Много различни полета също се предават:
(
Заключение
Както можете да видите, технологията ADS-B се превърна в интересна симбиоза, когато стандартът е полезен не само за професионалисти, но и за обикновени потребители. Но разбира се, ключова роля в това изигра по-евтината технология на цифровите SDR приемници, която позволява на устройството буквално да приема сигнали с честоти над гигахерца „за стотинки“.
В самия стандарт, разбира се, има много повече. Желаещите могат да видят PDF на страницата
Малко вероятно е всичко по-горе да бъде полезно за мнозина, но поне общата представа за това как работи, надявам се, остава.
Между другото, вече съществува готов декодер в Python, можете да го изучавате
Изходният код на анализатора, описан в статията, е даден под изрезката. Това е тестов пример, който не претендира за производство, но някои неща работят в него и може да се използва за анализиране на файла, записан по-горе.
Изходен код (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
Надявам се някой да е проявил интерес, благодаря за вниманието.
Източник: www.habr.com