Γεια σου Χαμπρ. Πιθανώς όλοι όσοι έχουν συναντήσει ή έχουν αποχωρήσει συγγενείς ή φίλους σε ένα αεροπλάνο έχουν χρησιμοποιήσει τη δωρεάν υπηρεσία Flightradar24. Αυτός είναι ένας πολύ βολικός τρόπος για να παρακολουθείτε τη θέση του αεροσκάφους σε πραγματικό χρόνο.
В
Ιστορία
Προφανώς, τα δεδομένα του αεροσκάφους δεν μεταδίδονται για να τα δουν οι χρήστες στα smartphone τους. Το σύστημα ονομάζεται ADS-B (Automatic dependent surveillance-broadcast) και χρησιμοποιείται για την αυτόματη μετάδοση πληροφοριών σχετικά με το αεροσκάφος στο κέντρο ελέγχου - το αναγνωριστικό, οι συντεταγμένες, η κατεύθυνση, η ταχύτητα, το υψόμετρο και άλλα δεδομένα μεταδίδονται. Προηγουμένως, πριν από την εμφάνιση τέτοιων συστημάτων, ο αποστολέας μπορούσε να δει μόνο ένα σημείο στο ραντάρ. Αυτό δεν ήταν πλέον αρκετό όταν υπήρχαν πάρα πολλά αεροπλάνα.
Τεχνικά, το ADS-B αποτελείται από έναν πομπό σε ένα αεροσκάφος που στέλνει περιοδικά πακέτα πληροφοριών σε αρκετά υψηλή συχνότητα 1090 MHz (υπάρχουν και άλλοι τρόποι, αλλά δεν μας ενδιαφέρουν τόσο πολύ, αφού οι συντεταγμένες μεταδίδονται μόνο εδώ). Βέβαια, εκτός από τον πομπό, υπάρχει και δέκτης κάπου στο αεροδρόμιο, αλλά για εμάς τους χρήστες έχει ενδιαφέρον ο δικός μας δέκτης.
Παρεμπιπτόντως, για σύγκριση, το πρώτο τέτοιο σύστημα, το Airnav Radarbox, σχεδιασμένο για απλούς χρήστες, εμφανίστηκε το 2007 και κόστιζε περίπου 900 $· μια συνδρομή σε υπηρεσίες δικτύου κόστιζε άλλα 250 $ το χρόνο.
Οι κριτικές αυτών των πρώτων Ρώσων ιδιοκτητών μπορούν να διαβαστούν στο φόρουμ
Λήψη σημάτων
Πρώτα, το σήμα πρέπει να καταγραφεί. Ολόκληρο το σήμα έχει διάρκεια μόνο 120 μικροδευτερόλεπτα, επομένως για να αποσυναρμολογήσετε άνετα τα εξαρτήματά του, είναι επιθυμητός ένας δέκτης SDR με συχνότητα δειγματοληψίας τουλάχιστον 5 MHz.
Μετά την εγγραφή, λαμβάνουμε ένα αρχείο WAV με ρυθμό δειγματοληψίας 5000000 δείγματα/δευτερόλεπτο· 30 δευτερόλεπτα τέτοιας εγγραφής «ζυγίζουν» περίπου 500 MB. Το να το ακούτε με media player, φυσικά, είναι άχρηστο - το αρχείο δεν περιέχει ήχο, αλλά ένα απευθείας ψηφιοποιημένο ραδιοφωνικό σήμα - έτσι ακριβώς λειτουργεί το Software Defined Radio.
Θα ανοίξουμε και θα επεξεργαστούμε το αρχείο χρησιμοποιώντας Python. Όσοι θέλουν να πειραματιστούν μόνοι τους μπορούν να κατεβάσουν ένα παράδειγμα εγγραφής
Ας κατεβάσουμε το αρχείο και ας δούμε τι υπάρχει μέσα.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Αποτέλεσμα: βλέπουμε προφανείς «παλμούς» στον θόρυβο του περιβάλλοντος.
Κάθε "παλμός" είναι ένα σήμα, η δομή του οποίου είναι σαφώς ορατή εάν αυξήσετε την ανάλυση στο γράφημα.
Όπως μπορείτε να δείτε, η εικόνα είναι αρκετά συνεπής με αυτό που δίνεται στην παραπάνω περιγραφή. Μπορείτε να ξεκινήσετε την επεξεργασία των δεδομένων.
Αποκρυπτογράφηση
Πρώτα, πρέπει να κάνετε λίγη ροή. Το ίδιο το σήμα κωδικοποιείται χρησιμοποιώντας την κωδικοποίηση Manchester:
Από τη διαφορά επιπέδου στα τσιμπήματα είναι εύκολο να λάβετε πραγματικό "0" και "1".
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
Η δομή του ίδιου του σήματος είναι η εξής:
Ας δούμε τα πεδία με περισσότερες λεπτομέρειες.
DF (Μορφή Downlink, 5 bit) - καθορίζει τον τύπο του μηνύματος. Υπάρχουν διάφοροι τύποι:
Μας ενδιαφέρει μόνο ο τύπος DF17, γιατί... Είναι αυτό που περιέχει τις συντεταγμένες του αεροσκάφους.
ICAO (24 bit) - διεθνής μοναδικός κωδικός του αεροσκάφους. Μπορείτε να ελέγξετε το αεροπλάνο με τον κωδικό του
Επεξεργασία: σε
ΣΤΟΙΧΕΙΑ (56 ή 112 bit) - τα πραγματικά δεδομένα που θα αποκωδικοποιήσουμε. Τα πρώτα 5 bit δεδομένων είναι το πεδίο Πληκτρολογήστε τον κωδικό, που περιέχει τον υποτύπο των δεδομένων που αποθηκεύονται (δεν πρέπει να συγχέεται με το DF). Υπάρχουν αρκετοί από αυτούς τους τύπους:
Ας δούμε μερικά παραδείγματα πακέτων.
Αναγνώριση αεροσκάφους
Παράδειγμα σε δυαδική μορφή:
00100 011 000101 010111 000111 110111 110001 111000
Πεδία δεδομένων:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, κάθε χαρακτήρας C1-C8 περιέχει κωδικούς που αντιστοιχούν σε δείκτες στη γραμμή:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789#################XNUMX######
Αποκωδικοποιώντας τη συμβολοσειρά, είναι εύκολο να λάβετε τον κωδικό του αεροσκάφους: EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Αερομεταφερόμενη θέση
Εάν το όνομα είναι απλό, τότε οι συντεταγμένες είναι πιο περίπλοκες. Μεταδίδονται με τη μορφή 2, ζυγών και περιττών πλαισίων. Κωδικός πεδίου TC = 01011b = 11.
Παράδειγμα ζυγών και περιττών πακέτων:
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Ο ίδιος ο υπολογισμός των συντεταγμένων γίνεται σύμφωνα με έναν μάλλον δύσκολο τύπο:
(
Δεν είμαι ειδικός στο GIS, οπότε δεν ξέρω από πού προέρχεται. Ποιος ξέρει, γράψτε στα σχόλια.
Το ύψος θεωρείται απλούστερο - ανάλογα με το συγκεκριμένο bit, μπορεί να αναπαρασταθεί είτε ως πολλαπλάσιο των 25 ή 100 ποδιών.
Αερομεταφερόμενη ταχύτητα
Πακέτο με TC=19. Το ενδιαφέρον εδώ είναι ότι η ταχύτητα μπορεί να είναι είτε ακριβής, σε σχέση με το έδαφος (Ταχύτητα εδάφους), είτε αερομεταφερόμενη, μετρούμενη από έναν αισθητήρα αεροσκάφους (Airspeed). Μεταδίδονται επίσης πολλά διαφορετικά πεδία:
(
Συμπέρασμα
Όπως μπορείτε να δείτε, η τεχνολογία ADS-B έχει γίνει μια ενδιαφέρουσα συμβίωση, όταν ένα πρότυπο είναι χρήσιμο όχι μόνο για επαγγελματίες, αλλά και για απλούς χρήστες. Αλλά φυσικά, βασικό ρόλο σε αυτό έπαιξε η φθηνότερη τεχνολογία των ψηφιακών δεκτών SDR, η οποία επιτρέπει στη συσκευή να λαμβάνει κυριολεκτικά σήματα με συχνότητες πάνω από ένα gigahertz «για τις πένες».
Στο ίδιο το πρότυπο, φυσικά, υπάρχουν πολλά περισσότερα. Οι ενδιαφερόμενοι μπορούν να δουν το PDF στη σελίδα
Είναι απίθανο όλα τα παραπάνω να είναι χρήσιμα σε πολλούς, αλλά τουλάχιστον η γενική ιδέα για το πώς λειτουργεί, ελπίζω, παραμένει.
Παρεμπιπτόντως, ένας έτοιμος αποκωδικοποιητής στην Python υπάρχει ήδη, μπορείτε να τον μελετήσετε
Ο πηγαίος κώδικας του αναλυτή που περιγράφεται στο άρθρο δίνεται κάτω από την περικοπή. Αυτό είναι ένα παράδειγμα δοκιμής που δεν προσποιείται ότι είναι παραγωγή, αλλά κάποια πράγματα λειτουργούν σε αυτό και μπορεί να χρησιμοποιηθεί για την ανάλυση του αρχείου που καταγράφηκε παραπάνω.
Πηγαίος κώδικας (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
Ελπίζω να ενδιαφερόταν κάποιος, ευχαριστώ για την προσοχή σας.
Πηγή: www.habr.com