Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Сәлем Хабр. Туыстарын немесе достарын ұшақта қарсы алған немесе шығарып салған кез келген адам тегін Flightradar24 қызметін пайдаланған шығар. Бұл нақты уақыт режимінде ұшақтың орнын бақылаудың өте ыңғайлы тәсілі.

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

В Бірінші бөлім Мұндай онлайн қызметтің жұмыс істеу принципі сипатталды. Біз енді әрі қарай жүріп, ұшақтан қабылдау станциясына қандай деректер жіберілетінін және қабылданатынын анықтаймыз және оны Python көмегімен өзіміз декодтаймыз.

История

Пайдаланушылар смартфондарында көру үшін ұшақ деректері берілмейтіні анық. Жүйе ADS-B (Automatic dependent Surveillance — хабар тарату) деп аталады және әуе кемесі туралы ақпаратты басқару орталығына автоматты түрде жіберу үшін пайдаланылады - оның идентификаторы, координаттары, бағыты, жылдамдығы, биіктігі және басқа да мәліметтер беріледі. Бұрын мұндай жүйелер пайда болғанға дейін диспетчер радардағы нүктені ғана көре алатын. Ұшақтар тым көп болған кезде бұл жеткіліксіз болды.

Техникалық тұрғыдан алғанда, ADS-B 1090 МГц жеткілікті жоғары жиілікте ақпарат пакеттерін мерзімді түрде жіберетін ұшақтағы таратқыштан тұрады (басқа режимдер бар, бірақ бізді онша қызықтырмайды, өйткені координаттар тек осында беріледі). Әрине, таратқыштан басқа, әуежайдың бір жерінде қабылдағыш да бар, бірақ біз үшін, пайдаланушылар ретінде, өз қабылдағышымыз қызықты.

Айтпақшы, салыстыру үшін, қарапайым пайдаланушыларға арналған Airnav Radarbox бірінші жүйесі 2007 жылы пайда болды және шамамен $900 тұрады, желілік қызметтерге жазылу жылына тағы $250 тұрады.

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Сол алғашқы ресейлік иелерінің пікірлерін форумда оқуға болады радиосканер. Енді RTL-SDR қабылдағыштары кеңінен қол жетімді болғандықтан, ұқсас құрылғыны 30 долларға жинауға болады; бұл туралы толығырақ Бірінші бөлім. Хаттаманың өзіне көшейік - оның қалай жұмыс істейтінін көрейік.

Сигналдарды қабылдау

Алдымен сигналды жазу керек. Бүкіл сигналдың ұзақтығы небәрі 120 микросекундты құрайды, сондықтан оның құрамдастарын ыңғайлы бөлшектеу үшін дискретизация жиілігі кемінде 5 МГц болатын SDR қабылдағыш қажет.

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Жазудан кейін біз 5000000 30 500 үлгі/сек таңдау жылдамдығымен WAV файлын аламыз; мұндай жазбаның XNUMX секунды шамамен XNUMX МБ «салмағы». Оны медиа ойнатқышпен тыңдау, әрине, пайдасыз - файлда дыбыс жоқ, бірақ тікелей цифрланған радиосигнал - Software Defined Radio дәл осылай жұмыс істейді.

Біз файлды Python көмегімен ашамыз және өңдейміз. Өз бетінше тәжірибе жасағысы келетіндер жазбаның үлгісін жүктей алады байланыс.

Файлды жүктеп алып, ішінде не бар екенін көрейік.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Нәтиже: фондық шуға қарсы айқын «импульстарды» көреміз.

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Әрбір «импульс» - бұл сигнал, оның құрылымы, егер сіз графиктегі ажыратымдылықты арттырсаңыз, анық көрінеді.

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Көріп отырғаныңыздай, сурет жоғарыдағы сипаттамада берілген нәрсеге сәйкес келеді. Деректерді өңдеуді бастауға болады.

Декодтау

Алдымен сізге аздап ағын алу керек. Сигналдың өзі Манчестер кодтауы арқылы кодталған:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Нибблдер деңгейінің айырмашылығынан нақты «0» және «1» алу оңай.

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Сигналдың құрылымы келесідей:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Өрістерді толығырақ қарастырайық.

DF (Downlink Format, 5 бит) – хабарлама түрін анықтайды. Бірнеше түрлері бар:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы
(кесте көзі)

Бізді тек DF17 түрі қызықтырады, өйткені... Бұл ұшақтың координаттарын қамтиды.

ИКАО (24 бит) – әуе кемесінің халықаралық бірегей коды. Ұшақты оның коды бойынша тексеруге болады сайтта (өкінішке орай, автор дерекқорды жаңартуды тоқтатты, бірақ ол әлі де өзекті). Мысалы, 3c5ee2 коды үшін бізде келесі ақпарат бар:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Өңдеу: в мақалаға түсініктемелер ИКАО кодының сипаттамасы толығырақ берілген, қызығушылық танытқандарға оны оқуға кеңес беремін.

ДЕРЕКТЕР (56 немесе 112 бит) - біз декодтайтын нақты деректер. Деректердің алғашқы 5 биті өріс болып табылады Кодты теріңіз, сақталатын деректердің ішкі түрін қамтитын (DF-пен шатастырмау керек). Бұл түрлердің бірнешеуі бар:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы
(кесте көзі)

Пакеттердің бірнеше мысалын қарастырайық.

Әуе кемесінің идентификациясы

Екілік пішіндегі мысал:

00100 011 000101 010111 000111 110111 110001 111000

Деректер өрістері:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, әрбір C1-C8 таңбасында жолдағы индекстерге сәйкес кодтар бар:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_##############0123456789######

Жолды декодтау арқылы ұшақ кодын алу оңай: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Әуедегі позиция

Егер атау қарапайым болса, онда координаталар күрделірек. Олар 2, жұп және тақ кадр түрінде беріледі. Өріс коды TC = 01011b = 11.

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы

Жұп және тақ пакеттердің мысалы:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Координаталарды есептеу өте күрделі формула бойынша жүреді:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы
(көзі)

Мен ГАЖ сарапшысы емеспін, сондықтан оның қайдан шыққанын білмеймін. Кім біледі, коментке жазыңыз.

Биіктігі қарапайым болып саналады - нақты битке байланысты оны 25 немесе 100 фут еселік ретінде көрсетуге болады.

Әуедегі жылдамдық

TC=19 бар пакет. Мұндағы қызықты нәрсе, жылдамдық дәл, жерге қатысты (Ground Speed) немесе әуе кемесінің сенсорымен (Airspeed) өлшенетін болуы мүмкін. Көптеген әртүрлі өрістер де жіберіледі:

Flightradar24 - бұл қалай жұмыс істейді? 2-бөлім, ADS-B хаттамасы
(көзі)

қорытынды

Көріп отырғаныңыздай, стандарт тек кәсіпқойларға ғана емес, қарапайым пайдаланушыларға да пайдалы болған кезде ADS-B технологиясы қызықты симбиозға айналды. Бірақ, әрине, бұл жерде негізгі рөлді сандық SDR қабылдағыштарының арзан технологиясы ойнады, бұл құрылғыға гигагерцтен жоғары жиіліктегі сигналдарды «тиынға» қабылдауға мүмкіндік береді.

Стандарттың өзінде, әрине, әлдеқайда көп. Қызығушылық танытқандар бетте PDF файлын көре алады ИКАО немесе жоғарыда айтылғанға кіріңіз веб-сайт.

Жоғарыда айтылғандардың барлығына пайдалы болуы екіталай, бірақ кем дегенде оның қалай жұмыс істейтіні туралы жалпы идея сақталады деп үміттенемін.

Айтпақшы, Python-да дайын декодер бар, оны зерттеуге болады осында. Ал SDR қабылдағыштарының иелері дайын ADS-B декодерін жинап, іске қоса алады бетінен, бұл туралы толығырақ талқыланды Бірінші бөлім.

Мақалада сипатталған талдаушының бастапқы коды кесіндінің астында берілген. Бұл өндіріс болып көрінбейтін сынақ үлгісі, бірақ онда кейбір нәрселер жұмыс істейді және оны жоғарыда жазылған файлды талдау үшін пайдалануға болады.
Бастапқы код (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Біреу қызықтырды деп үміттенемін, назарларыңызға рахмет.

Ақпарат көзі: www.habr.com

пікір қалдыру