Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

你好,Habr。可能每個在飛機上迎接或送別親戚或朋友的人都使用過免費服務 Flightradar24。這是一種非常方便的即時追蹤飛機位置的方法。

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

В 第一部分 描述了這種線上服務的運作原理。現在我們將進一步了解從飛機到接收站傳輸和接收了哪些數據,並使用 Python 自行解碼。

故事

顯然,飛機的數據不會傳輸給用戶在智慧型手機上查看。該系統名為 ADS-B(廣播式自動相關監視),用於自動向控制中心傳輸有關飛機的資訊-其識別碼、座標、方向、速度、高度和其他資料都會被傳輸。此前,在這種系統出現之前,控制員只能看到雷達上的一個點。當飛機數量過多時,這種方法就不夠了。

從技術上講,ADS-B 由飛機上的發射器組成,該發射器以 1090 MHz 的相當高的頻率定期發送訊息包(還有其他模式,但我們對它們不太感興趣,因為僅在此處傳輸座標)。當然,除了發射器之外,機場某處還設有接收器,但對於我們作為用戶來說,我們感興趣的是我們自己的接收器。

順便一提,作為對比,首款此類系統 Airnav Radarbox 是為普通用戶設計的,於 2007 年問世,售價約為 900 美元。 250$ 網路服務訂閱費用按年收取。

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

您可以在論壇上閱讀第一批俄羅斯車主的評論 無線電掃描儀。現在 RTL-SDR 接收器已經廣泛普及,類似的設備只需 30 美元即可組裝,有關此設備的更多詳細資訊請參見 第一部分。讓我們繼續討論協議本身——看看它是如何運作的。

接收訊號

首先,需要記錄訊號。整個訊號只有 120 微秒長,因此為了輕鬆解析其組成部分,需要使用取樣頻率至少為 5 MHz 的 SDR 接收器。

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

錄音後,我們得到一個取樣頻率為5000000次/秒的WAV檔; 30秒的此類錄音「重量」約為500 MB。當然,用媒體播放器收聽它是沒用的——檔案不包含聲音,而是直接數位化的無線電訊號——這就是軟體定義無線電的工作原理。

我們將使用 Python 打開並處理該檔案。想要自己嘗試的人可以下載範本錄音 鏈接.

讓我們下載該文件並查看裡面有什麼。

from scipy.io import wavfile
import matplotlib.pyplot as plt
import numpy as np

fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
data = data.astype(float)
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

plt.plot(A)
plt.show()

結果:我們在背景噪音中看到了清晰的「脈衝」。

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

每個「脈衝」都是一個訊號,如果增加圖表的分辨率,其結構就清晰可見。

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

如您所見,圖片與上面的描述非常一致。您可以開始處理資料。

解碼

首先,您需要取得位元流。訊號本身採用曼徹斯特編碼進行編碼:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

從半位元組的電平差異很容易獲得真正的“0”和“1”。

    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

訊號本身的結構如下:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

讓我們更詳細地看一下這些欄位。

DF (下行鏈路格式,5 位元)- 定義訊息類型。有以下幾種類型:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議
(表格來源)

我們只對 DF17 類型感興趣,因為它包含飛機的座標。

國際民航組織 (24 位元)- 國際唯一飛機代碼。您可以透過飛機代碼檢查飛機 在線 (不幸的是,作者停止更新資料庫,但它仍然具有相關性)。例如,對於代碼 3c5ee2,我們有以下資訊:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

編輯:在 文章的評論 對於ICAO代碼的描述比較詳細,建議有興趣的可以看一下。

數據 (56 或 112 位元)- 我們將解碼的實際資料。資料的前5位元是字段 類型代碼,包含儲存資料的子類型(不要與 DF 混淆)。這些類型有很多:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議
(表格來源)

讓我們來看一些包的例子。

飛機識別

二進位形式的範例:

00100 011 000101 010111 000111 110111 110001 111000

資料欄位:

+------+------+------+------+------+------+------+------+------+------+
| TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 |
+------+------+------+------+------+------+------+------+------+------+

TC = 00100b = 4,每個字元 C1-C8 包含與字串中的索引相對應的代碼:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ####_#################0123456789######

透過解碼字串,很容易獲得飛機代碼:EWG7184

symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
code_str = ""
for p in range(8):
     c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
     code_str += symbols[c]
print("Aircraft Identification:", code_str.replace('#', ''))

空中位置

如果名稱簡單,那麼座標就比較複雜。它們以 2x、偶數和奇數幀的形式傳輸。字段代碼 TC = 01011b = 11。

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議

偶數和奇數資料包的範例:

01011 000 000101110110 00 10111000111001000 10000110101111001
01011 000 000110010000 01 10010011110000110 10000011110001000

座標本身的計算是根據一個相當複雜的公式進行的:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議
()

我不是 GIS 專家,所以我不知道它來自哪裡。如果有人知道,請在評論中寫下。

高度計算更簡單 - 根據具體位,它可以表示為 25 或 100 英尺的倍數。

空中速度

TC=19 的軟體包。這裡有趣的是,速度可以是精確的,相對於地面(地速),也可以是空速,由飛機感測器測量(空速)。也傳遞了許多不同的字段:

Flightradar24 - 它是如何工作的? 第 2 部分,ADS-B 協議
()

結論

如您所見,ADS-B 技術已成為一種有趣的共生體,任何標準不僅對專業人士有用,而且對普通用戶也有用。當然,這其中的關鍵作用在於數位 SDR 接收器技術成本的降低,這使得設備能夠以「幾分錢」的價格接收頻率高於千兆赫的訊號。

當然,標準本身還有更多內容。有興趣的可以查看頁面上的 PDF 國際民航組織 或訪問上面已經提到的 網站.

很多人可能不會發現上面寫的所有內容都有用,但我希望至少它的工作原理的整體思路能夠保留下來。

順便說一下,Python 中已經有一個現成的解碼器,你可以研究一下 這裡。 SDR 接收器的擁有者可以組裝並啟動現成的 ADS-B 解碼器 從頁面,有關此內容的更多詳細資訊請參見 第一部分.

文章中描述的解析器的原始程式碼如下。這是一個測試範例,不用於生產,但其中有些功能可以運行,您可以使用它來解析上面寫的檔案。
原始碼(Python)

from __future__ import print_function

from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import math
import sys


def parse_message(data, start, bit_len):
    max_len = bit_len*128
    A = data[start:start + max_len]
    A = signal.resample(A, 10*max_len)
    bits = np.zeros(10*max_len)
    bit_len *= 10
    start_data = bit_len*8

    # Parse first 8 bits
    bits_str = ""
    for p in range(8):
        pos = start_data + bit_len*p
        p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"

    df = int(bits_str[0:5], 2)

    # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c )
    bits_str = ""
    for p in range(8, 32):
        pos = start_data + bit_len * p
        p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len]
        avg1, avg2 = np.average(p1), np.average(p2)
        if avg1 < avg2:
            bits_str += "0"
        elif avg1 > avg2:
            bits_str += "1"
    # print "Aircraft address:", bits_str, hex(int(bits_str, 2))
    address = hex(int(bits_str, 2))

    # Filter specific aircraft (optional)
    # if address != "0x3c5ee2":
    #    return

    if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21:
        # print "Pos:", start, "DF:", msg_type

        # Data (56bit)
        bits_str = ""
        for p in range(32, 88):
            pos = start_data + bit_len*p
            p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len]
            avg1, avg2 = np.average(p1), np.average(p2)
            if avg1 < avg2:
                bits_str += "0"
                # bits[pos + bit_len / 2] = 50
            elif avg1 > avg2:
                bits_str += "1"

        # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html
        # print "Data:"
        # print bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]
        # Type Code:
        tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2)
        # print("DF:", df, "TC:", tc)
        
        # 1 - 4  Aircraft identification
        # 5 - 8  Surface position
        # 9 - 18  Airborne position (w/ Baro Altitude)
        # 19  Airborne velocities

        if tc >= 1 and tc <= 4: # and (df == 17 or df == 18):
            print("Aircraft address:", address)
            print("Data:")
            print(bits_str[:8], bits_str[8:14],  bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44])

            symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
            code_str = ""
            for p in range(8):
                c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2)
                code_str += symbols[c]
            print("Aircraft Identification:", code_str.replace('#', ''))
            print()
        if tc == 11:
            print("Aircraft address:", address)
            print("Data: (11)")
            print(bits_str[:8], bits_str[8:20],  bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17])

            # Bit 22 contains the F flag which indicates which CPR format is used (odd or even)
            # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd
            # f = bits_str[21:22]
            # print("F:", int(f, 2))

            # Altitude
            alt1b = bits_str[8:20]
            if alt1b[-5] == '1':
                bits = alt1b[:-5] + alt1b[-4:]
                n = int(bits, 2)
                alt_ft = n*25 - 1000
                print("Alt (ft)", alt_ft)

            # lat_dec = int(bits_str[22:22+17], 2)
            # lon_dec = int(bits_str[39:39+17], 2)
            # print("Lat/Lon:", lat_dec, lon_dec)

            # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf
            print()
        if tc == 19:
            print("Aircraft address:", address)
            print("Data:")
            # print(bits_str)
            print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65])

            subtype = int(bits_str[5:8], 2)
            # https://mode-s.org/decode/adsb/airborne-velocity.html
            spd, hdg, rocd = -1, -1, -1
            if subtype == 1 or subtype == 2:
                print("Velocity Subtype 1: Ground speed")
            
                v_ew_sign = int(bits_str[13], 2)
                v_ew = int(bits_str[14:24], 2) - 1       # east-west velocity
                
                v_ns_sign = int(bits_str[24], 2)
                v_ns = int(bits_str[25:35], 2) - 1       # north-south velocity
                
                v_we = -1*v_ew if v_ew_sign else v_ew
                v_sn = -1*v_ns if v_ns_sign else v_ns
                
                spd = math.sqrt(v_sn*v_sn + v_we*v_we)  # unit in kts
                
                hdg = math.atan2(v_we, v_sn)
                hdg = math.degrees(hdg)                 # convert to degrees
                hdg = hdg if hdg >= 0 else hdg + 360    # no negative val
            if subtype == 3:
                print("Subtype Subtype 3: Airspeed")
                hdg = int(bits_str[14:24], 2)/1024.0*360.0
                spd = int(bits_str[25:35], 2)
            
            vr_sign = int(bits_str[36], 2)
            vr = int(bits_str[36:45], 2)
            rocd = -1*vr if vr_sign else vr         # rate of climb/descend
            print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg)
            print()

        # print()

def calc_coordinates():
    def _cprN(lat, is_odd):
        nl = _cprNL(lat) - is_odd
        return nl if nl > 1 else 1

    def _cprNL(lat):
        try:
            nz = 15
            a = 1 - math.cos(math.pi / (2 * nz))
            b = math.cos(math.pi / 180.0 * abs(lat)) ** 2
            nl = 2 * math.pi / (math.acos(1 - a/b))
            return int(math.floor(nl))
        except:
            # happens when latitude is +/-90 degree
            return 1
    
    def floor_(x):
        return int(math.floor(x))
  
    lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001"
    lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111"
    lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2)
    lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2)
    
    # 131072 is 2^17, since CPR lat and lon are 17 bits each
    cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0
    cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0
    print(cprlat_even, cprlon_even)

    j = floor_(59*cprlat_even - 60*cprlat_odd)
    print(j)

    air_d_lat_even = 360.0 / 60
    air_d_lat_odd = 360.0 / 59

    # Lat
    lat_even = float(air_d_lat_even * (j % 60 + cprlat_even))
    lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd))
    if lat_even >= 270:
        lat_even = lat_even - 360
    if lat_odd >= 270:
        lat_odd = lat_odd - 360

    # Lon
    ni = _cprN(lat_even, 0)
    m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5)
    lon = (360.0 / ni) * (m % ni + cprlon_even)
    print("Lat", lat_even, "Lon", lon)

    # Altitude
    # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft)
    # The value can represent altitudes from -1000 to +50175 ft.
    if alt1b[-5] == '1':
        bits = alt1b[:-5] + alt1b[-4:]
        n = int(bits, 2)
        alt_ft = n*25 - 1000
        print("Alt (ft)", alt_ft)


fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav")
T = 1/fs

print("Sample rate %f MS/s" % (fs / 1e6))
print("Cnt samples %d" % len(data))
print("Duration: %f s" % (T * len(data)))

data = data.astype(float)

cnt = data.shape[0]
# Processing only part on file (faster):
# cnt = 10000000
# data = data[:cnt]
print("Processing I/Q...")
I, Q = data[:, 0], data[:, 1]
A = np.sqrt(I*I + Q*Q)

bits = np.zeros(cnt)

# To see scope without any processing, uncomment
# plt.plot(A)
# plt.show()
# sys.exit(0)

print("Extracting signals...")

pos = 0
avg = 200
msg_start = 0
# Find beginning of each signal
while pos < cnt - 16*1024:
    # P1 - message start
    while pos < cnt - 16*1024:
        if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000:
            msg_start = pos
            bits[pos] = 100
            pos += 4
            break
        pos += 1

    start1, start2, start3, start4 = msg_start, 0, 0, 0
    # P2
    while pos < cnt - 16*1024:
        if A[pos] < avg and A[pos+1] > avg:
            start2 = pos
            bits[pos] = 90
            pos += 1
            break
        pos += 1
    # P3
    while pos < cnt - 16*1024:
        if A[pos] < avg and A[pos+1] > avg:
            start3 = pos
            bits[pos] = 80
            pos += 1
            break
        pos += 1
    # P4
    while pos < cnt - 16*1024:
        if A[pos] < avg and A[pos+1] > avg:
            start4 = pos
            bits[pos] = 70
            pos += 1
            break
        pos += 1


    sig_diff = start4 - start1
    if 20 < sig_diff < 25:
        bits[msg_start] = 500
        bit_len = int((start4 - start1) / 4.5)
        # print(pos, start1, start4, ' - ', bit_len)
        # start = start1 + 8*bit_len
        parse_message(A, msg_start, bit_len)

        pos += 450

# For debugging: check signal start
# plt.plot(A)
# plt.plot(bits)
# plt.show()

我希望有人覺得它很有趣,感謝您的關注。

來源: www.habr.com

為具有 DDoS 保護、VPS VDS 服務器的站點購買可靠的主機 🔥 購買具備 DDoS 防護的可靠網站寄存服務,包括 VPS 和 VDS 伺服器 | ProHoster