Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Γεια σου Χαμπρ. Πιθανώς όλοι όσοι έχουν συναντήσει ή έχουν αποχωρήσει συγγενείς ή φίλους σε ένα αεροπλάνο έχουν χρησιμοποιήσει τη δωρεάν υπηρεσία Flightradar24. Αυτός είναι ένας πολύ βολικός τρόπος για να παρακολουθείτε τη θέση του αεροσκάφους σε πραγματικό χρόνο.

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

В το πρώτο μέρος Περιγράφηκε η αρχή λειτουργίας μιας τέτοιας διαδικτυακής υπηρεσίας. Θα προχωρήσουμε τώρα και θα καταλάβουμε ποια δεδομένα αποστέλλονται και λαμβάνονται από το αεροσκάφος στον σταθμό λήψης και θα τα αποκωδικοποιούμε μόνοι μας χρησιμοποιώντας Python.

Ιστορία

Προφανώς, τα δεδομένα του αεροσκάφους δεν μεταδίδονται για να τα δουν οι χρήστες στα smartphone τους. Το σύστημα ονομάζεται ADS-B (Automatic dependent surveillance-broadcast) και χρησιμοποιείται για την αυτόματη μετάδοση πληροφοριών σχετικά με το αεροσκάφος στο κέντρο ελέγχου - το αναγνωριστικό, οι συντεταγμένες, η κατεύθυνση, η ταχύτητα, το υψόμετρο και άλλα δεδομένα μεταδίδονται. Προηγουμένως, πριν από την εμφάνιση τέτοιων συστημάτων, ο αποστολέας μπορούσε να δει μόνο ένα σημείο στο ραντάρ. Αυτό δεν ήταν πλέον αρκετό όταν υπήρχαν πάρα πολλά αεροπλάνα.

Τεχνικά, το ADS-B αποτελείται από έναν πομπό σε ένα αεροσκάφος που στέλνει περιοδικά πακέτα πληροφοριών σε αρκετά υψηλή συχνότητα 1090 MHz (υπάρχουν και άλλοι τρόποι, αλλά δεν μας ενδιαφέρουν τόσο πολύ, αφού οι συντεταγμένες μεταδίδονται μόνο εδώ). Βέβαια, εκτός από τον πομπό, υπάρχει και δέκτης κάπου στο αεροδρόμιο, αλλά για εμάς τους χρήστες έχει ενδιαφέρον ο δικός μας δέκτης.

Παρεμπιπτόντως, για σύγκριση, το πρώτο τέτοιο σύστημα, το Airnav Radarbox, σχεδιασμένο για απλούς χρήστες, εμφανίστηκε το 2007 και κόστιζε περίπου 900 $· μια συνδρομή σε υπηρεσίες δικτύου κόστιζε άλλα 250 $ το χρόνο.

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Οι κριτικές αυτών των πρώτων Ρώσων ιδιοκτητών μπορούν να διαβαστούν στο φόρουμ ραδιοσαρωτής. Τώρα που οι δέκτες RTL-SDR έχουν γίνει ευρέως διαθέσιμοι, μια παρόμοια συσκευή μπορεί να συναρμολογηθεί με 30 $· περισσότερα για αυτό ήταν στο το πρώτο μέρος. Ας προχωρήσουμε στο ίδιο το πρωτόκολλο - ας δούμε πώς λειτουργεί.

Λήψη σημάτων

Πρώτα, το σήμα πρέπει να καταγραφεί. Ολόκληρο το σήμα έχει διάρκεια μόνο 120 μικροδευτερόλεπτα, επομένως για να αποσυναρμολογήσετε άνετα τα εξαρτήματά του, είναι επιθυμητός ένας δέκτης SDR με συχνότητα δειγματοληψίας τουλάχιστον 5 MHz.

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Μετά την εγγραφή, λαμβάνουμε ένα αρχείο WAV με ρυθμό δειγματοληψίας 5000000 δείγματα/δευτερόλεπτο· 30 δευτερόλεπτα τέτοιας εγγραφής «ζυγίζουν» περίπου 500 MB. Το να το ακούτε με media player, φυσικά, είναι άχρηστο - το αρχείο δεν περιέχει ήχο, αλλά ένα απευθείας ψηφιοποιημένο ραδιοφωνικό σήμα - έτσι ακριβώς λειτουργεί το Software Defined Radio.

Θα ανοίξουμε και θα επεξεργαστούμε το αρχείο χρησιμοποιώντας Python. Όσοι θέλουν να πειραματιστούν μόνοι τους μπορούν να κατεβάσουν ένα παράδειγμα εγγραφής по ссылке.

Ας κατεβάσουμε το αρχείο και ας δούμε τι υπάρχει μέσα.

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

Αποτέλεσμα: βλέπουμε προφανείς «παλμούς» στον θόρυβο του περιβάλλοντος.

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Κάθε "παλμός" είναι ένα σήμα, η δομή του οποίου είναι σαφώς ορατή εάν αυξήσετε την ανάλυση στο γράφημα.

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Όπως μπορείτε να δείτε, η εικόνα είναι αρκετά συνεπής με αυτό που δίνεται στην παραπάνω περιγραφή. Μπορείτε να ξεκινήσετε την επεξεργασία των δεδομένων.

Αποκρυπτογράφηση

Πρώτα, πρέπει να κάνετε λίγη ροή. Το ίδιο το σήμα κωδικοποιείται χρησιμοποιώντας την κωδικοποίηση Manchester:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Από τη διαφορά επιπέδου στα τσιμπήματα είναι εύκολο να λάβετε πραγματικό "0" και "1".

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

Η δομή του ίδιου του σήματος είναι η εξής:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Ας δούμε τα πεδία με περισσότερες λεπτομέρειες.

DF (Μορφή Downlink, 5 bit) - καθορίζει τον τύπο του μηνύματος. Υπάρχουν διάφοροι τύποι:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B
(πηγή πίνακα)

Μας ενδιαφέρει μόνο ο τύπος DF17, γιατί... Είναι αυτό που περιέχει τις συντεταγμένες του αεροσκάφους.

ICAO (24 bit) - διεθνής μοναδικός κωδικός του αεροσκάφους. Μπορείτε να ελέγξετε το αεροπλάνο με τον κωδικό του σε απευθείας σύνδεση (δυστυχώς, ο συγγραφέας έχει σταματήσει να ενημερώνει τη βάση δεδομένων, αλλά εξακολουθεί να είναι σχετικός). Για παράδειγμα, για τον κωδικό 3c5ee2 έχουμε τις ακόλουθες πληροφορίες:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Επεξεργασία: σε σχόλια για το άρθρο Η περιγραφή του κωδικού ICAO δίνεται αναλυτικότερα· προτείνω στους ενδιαφερόμενους να τον διαβάσουν.

ΣΤΟΙΧΕΙΑ (56 ή 112 bit) - τα πραγματικά δεδομένα που θα αποκωδικοποιήσουμε. Τα πρώτα 5 bit δεδομένων είναι το πεδίο Πληκτρολογήστε τον κωδικό, που περιέχει τον υποτύπο των δεδομένων που αποθηκεύονται (δεν πρέπει να συγχέεται με το DF). Υπάρχουν αρκετοί από αυτούς τους τύπους:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B
(πηγή πίνακα)

Ας δούμε μερικά παραδείγματα πακέτων.

Αναγνώριση αεροσκάφους

Παράδειγμα σε δυαδική μορφή:

00100 011 000101 010111 000111 110111 110001 111000

Πεδία δεδομένων:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4, κάθε χαρακτήρας C1-C8 περιέχει κωδικούς που αντιστοιχούν σε δείκτες στη γραμμή:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789#################XNUMX######

Αποκωδικοποιώντας τη συμβολοσειρά, είναι εύκολο να λάβετε τον κωδικό του αεροσκάφους: EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

Αερομεταφερόμενη θέση

Εάν το όνομα είναι απλό, τότε οι συντεταγμένες είναι πιο περίπλοκες. Μεταδίδονται με τη μορφή 2, ζυγών και περιττών πλαισίων. Κωδικός πεδίου TC = 01011b = 11.

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B

Παράδειγμα ζυγών και περιττών πακέτων:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

Ο ίδιος ο υπολογισμός των συντεταγμένων γίνεται σύμφωνα με έναν μάλλον δύσκολο τύπο:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B
(πηγή)

Δεν είμαι ειδικός στο GIS, οπότε δεν ξέρω από πού προέρχεται. Ποιος ξέρει, γράψτε στα σχόλια.

Το ύψος θεωρείται απλούστερο - ανάλογα με το συγκεκριμένο bit, μπορεί να αναπαρασταθεί είτε ως πολλαπλάσιο των 25 ή 100 ποδιών.

Αερομεταφερόμενη ταχύτητα

Πακέτο με TC=19. Το ενδιαφέρον εδώ είναι ότι η ταχύτητα μπορεί να είναι είτε ακριβής, σε σχέση με το έδαφος (Ταχύτητα εδάφους), είτε αερομεταφερόμενη, μετρούμενη από έναν αισθητήρα αεροσκάφους (Airspeed). Μεταδίδονται επίσης πολλά διαφορετικά πεδία:

Flightradar24 - πώς λειτουργεί; Μέρος 2, Πρωτόκολλο ADS-B
(πηγή)

Συμπέρασμα

Όπως μπορείτε να δείτε, η τεχνολογία ADS-B έχει γίνει μια ενδιαφέρουσα συμβίωση, όταν ένα πρότυπο είναι χρήσιμο όχι μόνο για επαγγελματίες, αλλά και για απλούς χρήστες. Αλλά φυσικά, βασικό ρόλο σε αυτό έπαιξε η φθηνότερη τεχνολογία των ψηφιακών δεκτών SDR, η οποία επιτρέπει στη συσκευή να λαμβάνει κυριολεκτικά σήματα με συχνότητες πάνω από ένα gigahertz «για τις πένες».

Στο ίδιο το πρότυπο, φυσικά, υπάρχουν πολλά περισσότερα. Οι ενδιαφερόμενοι μπορούν να δουν το PDF στη σελίδα ICAO ή επισκεφθείτε αυτό που ήδη αναφέρθηκε παραπάνω δικτυακός τόπος.

Είναι απίθανο όλα τα παραπάνω να είναι χρήσιμα σε πολλούς, αλλά τουλάχιστον η γενική ιδέα για το πώς λειτουργεί, ελπίζω, παραμένει.

Παρεμπιπτόντως, ένας έτοιμος αποκωδικοποιητής στην Python υπάρχει ήδη, μπορείτε να τον μελετήσετε εδώ. Και οι κάτοχοι δεκτών SDR μπορούν να συναρμολογήσουν και να ξεκινήσουν έναν έτοιμο αποκωδικοποιητή ADS-B από τη σελίδα, αυτό συζητήθηκε λεπτομερέστερα στο το πρώτο μέρος.

Ο πηγαίος κώδικας του αναλυτή που περιγράφεται στο άρθρο δίνεται κάτω από την περικοπή. Αυτό είναι ένα παράδειγμα δοκιμής που δεν προσποιείται ότι είναι παραγωγή, αλλά κάποια πράγματα λειτουργούν σε αυτό και μπορεί να χρησιμοποιηθεί για την ανάλυση του αρχείου που καταγράφηκε παραπάνω.
Πηγαίος κώδικας (Python)

from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
#    return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4  Aircraft identification
# 5 - 8  Surface position
# 9 - 18  Airborne position (w/ Baro Altitude)
# 19  Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg)                 # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr         # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

Ελπίζω να ενδιαφερόταν κάποιος, ευχαριστώ για την προσοχή σας.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο