Привіт Хабр. Напевно, кожен, хто хоч раз зустрічав або проводжав родичів або друзів на літак, користувався безкоштовним сервісом Flightradar24. Це дуже зручний спосіб відстеження положення літака у реальному часі.
В
Історія
Очевидно, дані про літаки передаються не для того, щоб користувачі бачили їх на своїх смартфонах. Система називається ADS-B (Automatic dependent surveillance-broadcast), і служить для автоматичної передачі інформації про повітряне судно в диспетчерський центр - передаються його ідентифікатор, координати, напрям, швидкість, висота та інші дані. Раніше до появи таких систем диспетчер міг бачити лише крапку на радарі. Цього стало недостатньо, коли літаків стало надто багато.
Технічно, ADS-B складається з передавача на повітряному судні, який періодично надсилає пакети з інформацією на досить високій частоті 1090 МГц (є й інші режими, але нам вони не такі цікаві, тому що координати передаються тільки тут). Зрозуміло, крім передавача, є і приймач десь в аеропорту, але для нас, як для користувачів, цікавий наш приймач.
До речі, для порівняння, перша така система Airnav Radarbox, розрахована на звичайних користувачів, з'явилася в 2007 році, і коштувала близько 900 $, ще близько 250 $ на рік коштувала підписка на мережеві сервіси.
Відгуки тих перших російських власників можна почитати на форумі
Прийом сигналів
Для початку сигнал потрібно записати. Весь сигнал має тривалість лише 120 мікросекунд, тому щоб комфортно розібрати його компоненти, бажаний SDR-приймач з частотою дискретизації не менше 5МГц.
Після запису ми отримуємо WAV-файл із частотою дискретизації 5000000 семплів/сек, 30 секунд такого запису «важать» близько 500Мб. Слухати її медіаплеєром зрозуміло, марно - файл містить не звук, а безпосередньо оцифрований радіосигнал - саме так працює Software Defined Radio.
Відкривати та обробляти файл ми будемо за допомогою Python. Бажаючі поекспериментувати самостійно, можуть завантажити приклад запису
Завантажимо файл, і подивимося, що всередині.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Результат: бачимо явні «імпульси» і натомість шуму.
Кожен «імпульс» — це сигнал, структуру якого добре видно, якщо збільшити дозвіл на графіці.
Як можна бачити, картинка цілком відповідає тому, що описано вище. Можна приступати до обробки даних.
Декодування
Для початку потрібно отримати бітовий потік. Сам сигнал закодований за допомогою manchester encoding:
З різниці рівнів у напівбайтах легко отримати реальні «0» та «1».
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
Структура самого сигналу має такий вигляд:
Розглянемо поля докладніше.
DF (Downlink Format, 5 біт) – визначає тип повідомлення. Їх кілька типів:
Нас цікавить лише тип DF17, т.к. саме він містить координати повітряного судна.
ІКАО (24 біти) - міжнародний унікальний код повітряного судна. Перевірити літак за його кодом можна
Виправлення: в
ДАНІ (56 або 112 біт) - власне дані, які ми і декодуватимемо. Перші 5 біт даних - поле Тип коду, що містить підтип даних, що зберігаються (не плутати з DF). Таких типів досить багато:
Розберемо кілька прикладів пакетів.
Aircraft identification
Приклад у бінарному вигляді:
00100 011 000101 010111 000111 110111 110001 111000
Поля даних:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, кожен символ C1-C8 містить коди, що відповідають індексам у рядку:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######
Розкодувавши рядок, нескладно отримати код літака: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Airborne position
Якщо з назвою все просто, то з координатами складніше. Вони передаються у вигляді 2х, парних та непарних фреймів. Код поля TC=01011b=11.
Приклад парного та непарного пакетів:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Саме обчислення координат відбувається за досить хитрою формулою:
(
Я не фахівець із ГІС, тож звідки воно виводиться, не знаю. Хто в курсі, напишіть у коментарях.
Висота вважається простіше — залежно від певного біта, вона може бути або кратною 25, або 100 футам.
Airborne Velocity
Пакет із TC=19. Цікаво тут те, що швидкість може бути як точна, щодо землі (Ground Speed), так і повітряна вимірювана датчиком літака (Airspeed). Ще передається безліч різних полів:
(
Висновок
Як можна бачити, технологія ADS-B стала цікавим симбіозом, коли будь-який стандарт знадобиться не лише професіоналам, а й звичайним користувачам. Але зрозуміло, ключову роль у цьому відіграло здешевлення технології цифрових SDR-приймачів, що дозволяє на девайсі буквально «за копійки» приймати сигнали з частотою вище за гігагерця.
У самому стандарті, зрозуміло, набагато більше. Бажаючі можуть переглянути PDF на сторінці
Навряд чи багатьом стане у нагоді все вищенаписане, але принаймні загальна ідея того, як це працює, сподіваюся, залишилася.
До речі, готовий декодер на Python вже існує, його можна вивчити
Вихідний код парсера, описаний у статті, наведено під катом. Це тестовий приклад, що не претендує на production, але дещо в ньому працює, і парсувати записаний вище файл, ним можна.
Вихідний код (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
Сподіваюся, комусь було цікаво, дякую за увагу.
Джерело: habr.com