Прывітанне Хабр. Мусіць кожны, хто хоць раз сустракаў ці праводзіў сваякоў ці сяброў на самалёт, карыстаўся бясплатным сэрвісам Flightradar24. Гэта вельмі зручны спосаб адсочвання становішча самалёта ў рэальным часе.
В
Гісторыя
Відавочна, што дадзеныя аб самалётах перадаюцца не для таго, каб карыстачы бачылі іх на сваіх смартфонах. Сістэма называецца ADS-B (Automatic dependent surveillance-broadcast), і служыць для аўтаматычнай перадачы інфармацыі аб паветраным судне ў дыспетчарскі цэнтр – перадаюцца яго ідэнтыфікатар, каардынаты, кірунак, хуткасць, вышыня і іншыя дадзеныя. Раней, да з'яўлення такіх сістэм, дыспетчар мог бачыць толькі кропку на радары. Гэтага стала недастаткова, калі самалётаў стала зашмат.
Тэхнічна, ADS-B складаецца з перадатчыка на паветраным судне, які перыядычна пасылае пакеты з інфармацыяй на дастаткова высокай частаце 1090 Мгц (ёсць і іншыя рэжымы, але нам яны не так цікавыя, бо каардынаты перадаюцца толькі тут). Зразумела, акрамя перадатчыка, ёсць і прымач дзесьці ў аэрапорце, але для нас, як для карыстачоў, цікавы прымач наш уласны.
Дарэчы, для параўнання, першая такая сістэма, Airnav Radarbox, разлічаная на звычайных карыстачоў, з'явілася ў 2007 году, і каштавала каля 900$, яшчэ каля 250$ у год каштавала падпіска на сеткавыя сэрвісы.
Водгукі тых першых расійскіх уладальнікаў можна пачытаць на форуме
Прыём сігналаў
Для пачатку, сігнал трэба запісаць. Увесь сігнал мае працягласць усяго толькі 120 мікрасекунд, таму каб камфортна разабраць яго кампаненты, пажаданы SDR-прымач з частатой дыскрэтызацыі не менш за 5Мгц.
Пасля запісу мы атрымліваем WAV-файл з частатой дыскрэтызацыі 5000000 сэмплаў/сек, 30 секунд такога запісу "важаць" каля 500Мб. Слухаць яе медыяплэерам зразумела, бескарысна – файл утрымоўвае не гук, а непасрэдна аблічбаваны радыёсігнал – менавіта так працуе Software Defined Radio.
Адчыняць і апрацоўваць файл мы будзем з дапамогай Python. Жадаючыя паэксперыментаваць самастойна, могуць запампаваць прыклад запісу
Загрузім файл, і паглядзім што ўсярэдзіне.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Вынік: мы бачым відавочныя "імпульсы" на фоне шуму.
Кожны «імпульс» - гэта і ёсць сігнал, структуру якога добра відаць, калі павялічыць дазвол на графіцы.
Як можна бачыць, карцінка цалкам адпавядае таму, што прыведзена ў апісанні вышэй. Можна прыступаць да апрацоўкі дадзеных.
Дэкадаванне
Для пачатку, трэба атрымаць бітавы паток. Сам сігнал закадаваны з дапамогай manchester encoding:
З розніцы ўзроўняў у паўбайтах лёгка атрымаць рэальныя "0" і "1".
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
Структура самога сігналу мае наступны выгляд:
Разгледзім палі больш падрабязна.
DF (Downlink Format, 5 біт) - вызначае тып паведамлення. Іх некалькі тыпаў:
Нас цікавіць толькі тып DF17, т.я. менавіта ён змяшчае каардынаты паветранага судна.
ІКАО (24 біта) - міжнародны унікальны код паветранага судна. Праверыць самалёт па ім коду можна
Праўка: у
ДАДЗЕНЫЯ (56 або 112 біт) - уласна дадзеныя, якія мы і будзем дэкадаваць. Першыя 5 біт дадзеных - поле Код тыпу, якое змяшчае падтып якія захоўваюцца дадзеных (не блытаць з DF). Такіх тыпаў даволі шмат:
Разбяром некалькі прыкладаў пакетаў.
Aircraft identification
Прыклад у бінарным выглядзе:
00100 011 000101 010111 000111 110111 110001 111000
Палі даных:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, кожны сімвал C1-C8 змяшчае коды, якія адпавядаюць індэксам у радку:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######
Раскадаваўшы радок, нескладана атрымаць код самалёта: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Airborne position
Калі з назвай усё проста, то з каардынатамі больш складана. Яны перадаюцца ў выглядзе 2х, цотных і няцотных фрэймаў. Код поля TC=01011b=11.
Прыклад цотнага і няцотнага пакетаў:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Само вылічэнне каардынат адбываецца па дастаткова хітрай формуле:
(
Я не спецыяліст па ГІС, так што адкуль яно выводзіцца, не ведаю. Хто ў курсе, напішыце ў каментарах.
Вышыня лічыцца прасцей у залежнасці ад вызначанага біта, яна можа ўяўляцца або кратнай 25, або 100 футам.
Airborne Velocity
Пакет з TC=19. Цікава тут тое, што хуткасць можа быць як дакладная, адносна землі (Ground Speed), так і паветраная, якая вымяраецца датчыкам самалёта (Airspeed). Яшчэ перадаецца мноства розных палёў:
(
Заключэнне
Як можна бачыць, тэхналогія ADS-B стала цікавым сімбіёзам, калі які-небудзь стандарт спатрэбіцца не толькі прафесіяналам, але і звычайным карыстачам. Але зразумела, ключавую ролю ў гэтым адыграла патанненне тэхналогіі лічбавых SDR-прымачоў, якія дазваляюць на дэвайсе літаральна "за капейкі" прымаць сігналы з частатой вышэй гігагерца.
У самім стандарце зразумела, значна больш за ўсё. Жадаючыя могуць паглядзець PDF на старонцы
Ці наўрад шматлікім спатрэбіцца ўсё вышэйнапісанае, але прынамсі агульная ідэя таго, як гэта працуе, спадзяюся, засталася.
Дарэчы, гатовы дэкодэр на Python ужо існуе, яго можна вывучыць
Зыходны код парсера, апісаны ў артыкуле, прыведзены пад катом. Гэта тэставы прыклад, які не прэтэндуе на production, але сёе-тое ў ім працуе, і парсіць запісаны вышэй файл, ім можна.
Зыходны код (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
Спадзяюся, камусьці было цікава, дзякуй за ўвагу.
Крыніца: habr.com