مرحبا هبر. ربما استخدم كل شخص التقى مرة واحدة على الأقل الأقارب أو الأصدقاء أو رحلوا عنهم على متن طائرة خدمة Flightradar24 المجانية. هذه طريقة مريحة للغاية لتتبع موقع الطائرة في الوقت الفعلي.
В
قصة
من الواضح أن بيانات الطائرات لا تتم مشاركتها ليراها المستخدمون على هواتفهم الذكية. يُطلق على النظام اسم ADS-B (المراقبة التلقائية التابعة - البث) ، ويتم استخدامه لنقل المعلومات تلقائيًا حول الطائرة إلى مركز التحكم - يتم إرسال معرفها وإحداثياتها واتجاهها وسرعتها وارتفاعها وغيرها من البيانات. في السابق ، قبل ظهور مثل هذه الأنظمة ، كان بإمكان المرسل رؤية نقطة فقط على الرادار. لم يكن هذا كافيًا عندما كان هناك عدد كبير جدًا من الطائرات.
من الناحية الفنية ، يتكون ADS-B من جهاز إرسال للطائرة يرسل بشكل دوري حزمًا تحتوي على معلومات بتردد عالٍ إلى حد ما يبلغ 1090 ميجاهرتز (هناك أوضاع أخرى ، لكنها ليست مثيرة للاهتمام بالنسبة لنا ، لأن الإحداثيات تنتقل هنا فقط). بالطبع ، بالإضافة إلى جهاز الإرسال ، يوجد أيضًا جهاز استقبال في مكان ما في المطار ، ولكن بالنسبة لنا ، كمستخدمين ، فإن جهاز الاستقبال الخاص بنا مهم.
بالمناسبة ، للمقارنة ، ظهر أول نظام من هذا النوع ، Airnav Radarbox ، المصمم للمستخدمين العاديين ، في عام 2007 وتكلف حوالي 900 دولار ، وتكلفة الاشتراك في خدمات الشبكة حوالي 250 دولارًا سنويًا.
يمكن قراءة آراء هؤلاء المالكين الروس الأوائل في المنتدى
استقبال الإشارة
أولاً ، يجب تسجيل الإشارة. تبلغ مدة الإشارة بأكملها 120 ميكروثانية فقط ، لذلك من أجل تفكيك مكوناتها بشكل مريح ، من المستحسن وجود جهاز استقبال SDR بمعدل أخذ عينات لا يقل عن 5 ميجاهرتز.
بعد التسجيل ، نحصل على ملف WAV بمعدل أخذ عينات يبلغ 5000000 عينة / ثانية ، 30 ثانية من هذا التسجيل "يزن" حوالي 500 ميجابايت. بالطبع ، من غير المجدي الاستماع إليه باستخدام مشغل وسائط - لا يحتوي الملف على صوت ، ولكنه يحتوي على إشارة راديو رقمية مباشرة - هذه هي الطريقة التي يعمل بها Software Defined Radio.
سنفتح الملف ومعالجته باستخدام بايثون. يمكن لأولئك الذين يرغبون في التجربة بمفردهم تنزيل مثال على التسجيل
لنقم بتنزيل الملف ونرى ما بداخله.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
النتيجة: نرى "نبضات" واضحة على خلفية الضوضاء.
كل "نبضة" هي إشارة ، يمكن رؤية هيكلها بوضوح إذا قمت بزيادة الدقة على الرسم البياني.
كما ترى ، فإن الصورة متوافقة تمامًا مع ما ورد في الوصف أعلاه. يمكنك البدء في معالجة البيانات.
فك
أولاً ، تحتاج إلى الحصول على تيار بت. الإشارة نفسها مشفرة باستخدام ترميز مانشيستر:
من السهل الحصول على "0" و "1" حقيقيين من فرق المستوى في المقضم.
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
هيكل الإشارة نفسها كما يلي:
لنلق نظرة على الحقول بمزيد من التفصيل.
DF (تنسيق الوصلة الهابطة ، 5 بت) - يحدد نوع الرسالة. هناك عدة أنواع:
نحن مهتمون فقط بنوع DF17 ، لأن أنه يحتوي على إحداثيات الطائرة.
منظمة الطيران المدني الدولي (24 بت) - الرمز الدولي الفريد للطائرة. يمكنك التحقق من الطائرة من خلال رمزها
تحرير: في
بيانات (56 أو 112 بت) - البيانات الفعلية التي سنقوم بفك تشفيرها. أول 5 بتات من البيانات هي الحقل كود نوعيحتوي على النوع الفرعي للبيانات المخزنة (يجب عدم الخلط بينه وبين DF). هناك عدد غير قليل من هذه الأنواع:
لنلقِ نظرة على بعض أمثلة الحزم.
تحديد هوية الطائرات
مثال ثنائي:
00100 011 000101 010111 000111 110111 110001 111000
حقول البيانات:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4 ، يحتوي كل حرف C1-C8 على أكواد تتوافق مع الفهارس في السلسلة:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ ##### _ ############### 0123456789 ######
بعد فك تشفير السلسلة ، من السهل الحصول على رمز الطائرة: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
موقف محمول جوا
إذا كان كل شيء بسيطًا بالاسم ، فسيكون الأمر أكثر تعقيدًا مع الإحداثيات. يتم إرسالها كإطارات 2x ، فردي وحتى زوجي. رمز الحقل TC = 01011b = 11.
مثال على الحزم الفردية والزوجية:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
يتم حساب الإحداثيات نفسها وفقًا لصيغة صعبة نوعًا ما:
(
أنا لست متخصصًا في نظم المعلومات الجغرافية ، لذلك لا أعرف من أين أتى. من يدري ، اكتب في التعليقات.
يعتبر الارتفاع أبسط - اعتمادًا على البت المحدد ، يمكن تمثيله كمضاعفات 25 أو 100 قدم.
السرعة المحمولة جوا
حزمة مع TC = 19. الشيء المثير هنا هو أن السرعة يمكن أن تكون دقيقة ، بالنسبة إلى الأرض (سرعة الأرض) ، والهواء ، تقاس بواسطة مستشعر الطائرة (السرعة الجوية). يتم أيضًا تمرير الكثير من الحقول المختلفة:
(
اختتام
كما ترى ، أصبحت تقنية ADS-B تعايشًا مثيرًا للاهتمام ، عندما يكون المعيار مفيدًا ليس فقط للمحترفين ، ولكن أيضًا للمستخدمين العاديين. لكن الدور الرئيسي في ذلك بالطبع تم لعبه من خلال التكنولوجيا الأرخص لمستقبلات SDR الرقمية ، والتي تسمح للجهاز حرفياً "مقابل فلس واحد" باستقبال الإشارات بتردد أعلى من جيجاهيرتز.
في المعيار نفسه ، بالطبع ، أكثر من ذلك بكثير. يمكن للمهتمين عرض ملف PDF على الصفحة
من غير المحتمل أن يكون كل ما سبق مفيدًا للكثيرين ، ولكن على الأقل الفكرة العامة لكيفية عملها ، كما آمل ، تظل قائمة.
بالمناسبة ، يوجد بالفعل وحدة فك ترميز Python جاهزة ، يمكنك دراستها
الكود المصدري للمحلل اللغوي الموصوف في المقالة مُدرج تحت القطع. هذا مثال اختبار لا يدعي أنه إنتاج ، ولكن هناك شيء يعمل فيه ، ويمكنهم تحليل الملف المسجل أعلاه.
كود المصدر (بايثون)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
أتمنى أن يكون شخص ما مهتمًا ، شكرًا لاهتمامك.
المصدر: www.habr.com