Bonjour Habr. Probablement tous ceux qui ont déjà rencontré ou accompagné des parents ou des amis dans un avion ont utilisé le service gratuit Flightradar24. C'est un moyen très pratique de suivre la position de l'avion en temps réel.
В
histoire
Évidemment, les données de l’avion ne sont pas transmises pour que les utilisateurs puissent les voir sur leur smartphone. Le système s'appelle ADS-B (Surveillance dépendante automatique – diffusion) et est utilisé pour transmettre automatiquement des informations sur l'avion au centre de contrôle - son identifiant, ses coordonnées, sa direction, sa vitesse, son altitude et d'autres données sont transmises. Auparavant, avant l'avènement de tels systèmes, le répartiteur ne pouvait voir qu'un point sur le radar. Cela ne suffisait plus lorsqu’il y avait trop d’avions.
Techniquement, l'ADS-B consiste en un émetteur sur un avion qui envoie périodiquement des paquets d'informations à une fréquence assez élevée de 1090 MHz (il existe d'autres modes, mais ils ne nous intéressent pas tellement, puisque les coordonnées ne sont transmises qu'ici). Bien sûr, en plus de l'émetteur, il y a aussi un récepteur quelque part à l'aéroport, mais pour nous, en tant qu'utilisateurs, notre propre récepteur est intéressant.
À propos, à titre de comparaison, le premier système de ce type, Airnav Radarbox, conçu pour les utilisateurs ordinaires, est apparu en 2007 et coûtait environ 900 dollars ; un abonnement aux services réseau coûte 250 dollars supplémentaires par an.
Les avis de ces premiers propriétaires russes peuvent être lus sur le forum
Réception de signaux
Tout d’abord, le signal doit être enregistré. L'ensemble du signal a une durée de seulement 120 microsecondes, donc pour démonter confortablement ses composants, un récepteur SDR avec une fréquence d'échantillonnage d'au moins 5 MHz est souhaitable.
Après l'enregistrement, nous recevons un fichier WAV avec un taux d'échantillonnage de 5000000 30 500 d'échantillons/sec ; XNUMX secondes d'un tel enregistrement « pèsent » environ XNUMX Mo. L'écouter avec un lecteur multimédia, bien sûr, est inutile - le fichier ne contient pas de son, mais un signal radio directement numérisé - c'est exactement ainsi que fonctionne Software Defined Radio.
Nous ouvrirons et traiterons le fichier en utilisant Python. Ceux qui souhaitent expérimenter par eux-mêmes peuvent télécharger un exemple d'enregistrement
Téléchargeons le fichier et voyons ce qu'il contient.
from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
plt.plot(A)
plt.show()
Résultat : on voit des « impulsions » évidentes sur le bruit de fond.
Chaque « impulsion » est un signal dont la structure est clairement visible si vous augmentez la résolution sur le graphique.
Comme vous pouvez le constater, l’image est tout à fait conforme à ce qui est donné dans la description ci-dessus. Vous pouvez commencer à traiter les données.
Décodage
Tout d’abord, vous devez obtenir un peu de flux. Le signal lui-même est codé à l'aide du codage Manchester :
A partir de la différence de niveau entre les quartets, il est facile d'obtenir les vrais « 0 » et « 1 ».
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
La structure du signal lui-même est la suivante :
Examinons les champs plus en détail.
DF (Format de liaison descendante, 5 bits) - détermine le type de message. Il en existe plusieurs types :
Nous ne nous intéressons qu'au type DF17, car... C'est celui-ci qui contient les coordonnées de l'avion.
OACI (24 bits) - code international unique de l'avion. Vous pouvez vérifier l'avion par son code
Modifier : dans
DONNEES (56 ou 112 bits) - les données réelles que nous allons décoder. Les 5 premiers bits de données constituent le champ code de type, contenant le sous-type des données stockées (à ne pas confondre avec DF). Il en existe plusieurs de ces types :
Regardons quelques exemples de packages.
Identification de l'aéronef
Exemple sous forme binaire :
00100 011 000101 010111 000111 110111 110001 111000
Champs de données:
+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+
TC = 00100b = 4, chaque caractère C1-C8 contient des codes correspondant aux indices dans la ligne :
#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######
En décodant la chaîne, il est facile d'obtenir le code de l'avion : EWG7184
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
Position aérienne
Si le nom est simple, les coordonnées sont plus compliquées. Ils sont transmis sous forme de 2 trames paires et impaires. Code de champ TC = 01011b = 11.
Exemple de paquets pairs et impairs :
01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000
Le calcul des coordonnées lui-même s'effectue selon une formule assez délicate :
(
Je ne suis pas un expert en SIG, donc je ne sais pas d'où cela vient. Qui sait, écrivez dans les commentaires.
La hauteur est considérée comme plus simple - selon le bit spécifique, elle peut être représentée comme un multiple de 25 ou 100 pieds.
Vitesse aéroportée
Forfait avec TC=19. Ce qui est intéressant ici, c'est que la vitesse peut être soit précise, par rapport au sol (Ground Speed), soit aérienne, mesurée par un capteur d'avion (Airspeed). De nombreux domaines différents sont également transmis :
(
Conclusion
Comme vous pouvez le constater, la technologie ADS-B est devenue une symbiose intéressante, lorsqu'une norme est utile non seulement aux professionnels, mais aussi aux utilisateurs ordinaires. Mais bien sûr, un rôle clé a été joué par la technologie moins chère des récepteurs numériques SDR, qui permet à l'appareil de recevoir littéralement des signaux avec des fréquences supérieures au gigahertz « pour quelques centimes ».
Bien entendu, la norme elle-même contient bien plus encore. Les personnes intéressées peuvent consulter le PDF sur la page
Il est peu probable que tout ce qui précède soit utile à beaucoup, mais au moins l'idée générale de son fonctionnement, je l'espère, demeure.
D'ailleurs, un décodeur prêt à l'emploi en Python existe déjà, vous pouvez l'étudier
Le code source de l'analyseur décrit dans l'article est indiqué sous la coupe. Il s'agit d'un exemple de test qui ne prétend pas être une production, mais certaines choses y fonctionnent et il peut être utilisé pour analyser le fichier enregistré ci-dessus.
Code source (Python)
from __future__ import print_function
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys
def parse_message(data, start, bit_len):
max_len = bit_len*128
A = data[start:start + max_len]
A = signal.resample(A, 10*max_len)
bits = np.zeros(10*max_len)
bit_len *= 10
start_data = bit_len*8
# Parse first 8 bits
bits_str = ""
for p in range(8):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
df = int(bits_str[0:5], 2)
# Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
bits_str = ""
for p in range(8, 32):
pos = start_data + bit_len * p
p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
elif avg1 > avg2:
bits_str += "1"
# print "Aircraft address:", bits_str, hex(int(bits_str, 2))
address = hex(int(bits_str, 2))
# Filter specific aircraft (optional)
# if address != "0x3c5ee2":
# return
if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
# print "Pos:", start, "DF:", msg_type
# Data (56bit)
bits_str = ""
for p in range(32, 88):
pos = start_data + bit_len*p
p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
avg1, avg2 = np.average(p1), np.average(p2)
if avg1 < avg2:
bits_str += "0"
# bits[pos + bit_len / 2] = 50
elif avg1 > avg2:
bits_str += "1"
# http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
# print "Data:"
# print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
# Type Code:
tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
# print("DF:", df, "TC:", tc)
# 1 - 4 Aircraft identification
# 5 - 8 Surface position
# 9 - 18 Airborne position (w/ Baro Altitude)
# 19 Airborne velocities
if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
print("Aircraft address:", address)
print("Data:")
print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])
symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))
print()
if tc == 11:
print("Aircraft address:", address)
print("Data: (11)")
print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])
# Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
# First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
# f = bits_str[21:22]
# print("F:", int(f, 2))
# Altitude
alt1b = bits_str[8:20]
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
# lat_dec = int(bits_str[22:22+17], 2)
# lon_dec = int(bits_str[39:39+17], 2)
# print("Lat/Lon:", lat_dec, lon_dec)
# http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
print()
if tc == 19:
print("Aircraft address:", address)
print("Data:")
# print(bits_str)
print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])
subtype = int(bits_str[5:8], 2)
# https://mode-s.org/decode/adsb/airborne-velocity.html
spd, hdg, rocd = -1, -1, -1
if subtype == 1 or subtype == 2:
print("Velocity Subtype 1: Ground speed")
v_ew_sign = int(bits_str[13], 2)
v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity
v_ns_sign = int(bits_str[24], 2)
v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity
v_we = -1*v_ew if v_ew_sign else v_ew
v_sn = -1*v_ns if v_ns_sign else v_ns
spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts
hdg = math.atan2(v_we, v_sn)
hdg = math.degrees(hdg) # convert to degrees
hdg = hdg if hdg >= 0 else hdg + 360 # no negative val
if subtype == 3:
print("Subtype Subtype 3: Airspeed")
hdg = int(bits_str[14:24], 2)/1024.0*360.0
spd = int(bits_str[25:35], 2)
vr_sign = int(bits_str[36], 2)
vr = int(bits_str[36:45], 2)
rocd = -1*vr if vr_sign else vr # rate of climb/descend
print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
print()
# print()
def calc_coordinates():
def _cprN(lat, is_odd):
nl = _cprNL(lat) - is_odd
return nl if nl > 1 else 1
def _cprNL(lat):
try:
nz = 15
a = 1 - math.cos(math.pi / (2 * nz))
b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
nl = 2 * math.pi / (math.acos(1 - a/b))
return int(math.floor(nl))
except:
# happens when latitude is +/-90 degree
return 1
def floor_(x):
return int(math.floor(x))
lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
# 131072 is 2^17, since CPR lat and lon are 17 bits each
cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
print(cprlat_even, cprlon_even)
j = floor_(59*cprlat_even - 60*cprlat_odd)
print(j)
air_d_lat_even = 360.0 / 60
air_d_lat_odd = 360.0 / 59
# Lat
lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# Lon
ni = _cprN(lat_even, 0)
m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
print("Lat", lat_even, "Lon", lon)
# Altitude
# Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
# The value can represent altitudes from -1000 to +50175 ft.
if alt1b[-5] == '1':
bits = alt1b[:-5] + alt1b[-4:]
n = int(bits, 2)
alt_ft = n*25 - 1000
print("Alt (ft)", alt_ft)
fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs
print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))
data = data.astype(float)
cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)
bits = np.zeros(cnt)
# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)
print("Extracting signals...")
pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
# P1 - message start
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
msg_start = pos
bits[pos] = 100
pos += 4
break
pos += 1
start1, start2, start3, start4 = msg_start, 0, 0, 0
# P2
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start2 = pos
bits[pos] = 90
pos += 1
break
pos += 1
# P3
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start3 = pos
bits[pos] = 80
pos += 1
break
pos += 1
# P4
while pos < cnt - 16*1024:
if A[pos] < avg and A[pos+1] > avg:
start4 = pos
bits[pos] = 70
pos += 1
break
pos += 1
sig_diff = start4 - start1
if 20 < sig_diff < 25:
bits[msg_start] = 500
bit_len = int((start4 - start1) / 4.5)
# print(pos, start1, start4, ' - ', bit_len)
# start = start1 + 8*bit_len
parse_message(A, msg_start, bit_len)
pos += 450
# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()
J'espère que quelqu'un était intéressé, merci pour votre attention.
Source: habr.com