在過去
SOA、MSA 和消息傳遞
SOA、MSA 是定義構建系統規則的系統架構,而消息傳遞為其實現提供原語。
我不想傳播這個或那個系統架構。 我支持為特定項目和業務應用最有效和有用的實踐。 無論我們選擇什麼範例,最好是著眼於 Unix 方式來創建系統塊:具有最小連接性的組件,負責各個實體。 API 方法對實體執行最簡單的操作。
消息——顧名思義——一個消息代理。 它的主要目的是接收和發送消息。 它負責發送信息的接口,系統內部傳輸信息的邏輯通道的形成,路由和均衡,以及系統級的故障處理。
開發的消息傳遞不會試圖與 rabbitmq 競爭或取代它。 其主要特點:
- 分配。
可以在集群的所有節點上創建交換點,盡可能靠近使用它們的代碼。 - 簡單。
專注於最小化樣板代碼和易用性。 - 更好的性能。
我們不打算重複 rabbitmq 的功能,但我們只選擇架構和傳輸層,我們盡可能簡單地將其融入 OTP,從而最大限度地降低成本。 - 靈活性。
每個服務可以組合許多交換模板。 - 彈性設計。
- 可擴展性。
消息傳遞隨著應用程序的發展而增長。 隨著負載的增加,您可以將交換點移動到不同的機器上。
評論。 在代碼組織方面,元項目非常適合複雜的 Erlang/Elixir 系統。 所有項目代碼都在一個存儲庫中——一個傘式項目。 同時,微服務盡可能隔離,執行簡單的操作,由單獨的實體負責。 採用這種方式,很容易維護整個系統的API,很容易進行修改,方便編寫單元測試和集成測試。
系統組件直接交互或通過代理交互。 從消息傳遞的角度來看,每個服務都有幾個生命階段:
- 服務初始化。
在此階段,執行服務和依賴項的進程的配置和啟動發生。 - 創建交換點。
該服務可以使用主機配置中指定的靜態交換點,或動態創建交換點。 - 服務註冊。
為了服務請求,它必須在交換點上註冊。 - 普通手術。
該服務做有用的工作。 - 關閉。
有兩種類型的關機:常規和緊急。 對於常規服務,它會與交換點斷開連接並停止。 在緊急情況下,消息傳遞會執行其中一種故障轉移方案。
看起來挺複雜的,但是代碼沒那麼嚇人。 稍後將在模板分析中給出帶註釋的代碼示例。
換貨
交換點是一個消息傳遞過程,它實現了與消息傳遞模板中的組件交互的邏輯。 在下面的所有示例中,組件通過交換點進行交互,這些交換點的組合形成了消息傳遞。
消息交換模式 (MEP)
在全球範圍內,交換模式可分為雙邊和單邊。 前者意味著對傳入消息的響應,後者則不然。 客戶端-服務器體系結構中雙向模式的一個典型示例是請求-響應模式。 考慮模板及其修改。
請求-響應或 RPC
當我們需要從另一個進程獲得響應時使用 RPC。 這個進程可以運行在同一台主機上,也可以運行在不同的大陸上。 下圖是客戶端和服務器之間通過消息傳遞進行交互的示意圖。
由於消息傳遞是完全異步的,因此客戶端的交換分為 2 個階段:
-
發送請求
messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).
允許 ‒ 獨特的兌換點名稱
響應匹配標籤 ‒ 用於處理響應的本地標籤。 例如,在發送屬於不同用戶的多個相同請求的情況下。
請求定義 ‒ 請求正文
處理程序 ‒ 處理程序的 PID。 此過程將收到來自服務器的響應。 -
響應處理
handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)
響應載荷 - 服務器響應。
對於服務器,該過程也包括 2 個階段:
- 兌換點初始化
- 處理傳入請求
讓我們用代碼來說明這個模板。 假設我們需要實現一個提供單一精確時間方法的簡單服務。
服務器代碼
讓我們將服務 API 定義移動到 api.hrl:
%% =====================================================
%% entities
%% =====================================================
-record(time, {
unixtime :: non_neg_integer(),
datetime :: binary()
}).
-record(time_error, {
code :: non_neg_integer(),
error :: term()
}).
%% =====================================================
%% methods
%% =====================================================
-record(time_req, {
opts :: term()
}).
-record(time_resp, {
result :: #time{} | #time_error{}
}).
在time_controller.erl中定義服務控制器
%% В примере показан только значимый код. Вставив его в шаблон gen_server можно получить рабочий сервис.
%% инициализация gen_server
init(Args) ->
%% подключение к точке обмена
messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self())
{ok, #{}}.
%% обработка события потери связи с точкой обмена. Это же событие приходит, если точка обмена еще не запустилась.
handle_info(#exchange_die{exchange = ?EXCHANGE}, State) ->
erlang:send(self(), monitor_exchange),
{noreply, State};
%% обработка API
handle_info(#time_req{opts = _Opts}, State) ->
messaging:response_once(Client, #time_resp{
result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())}
});
{noreply, State};
%% завершение работы gen_server
terminate(_Reason, _State) ->
messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()),
ok.
客戶端程式碼
要向服務發送請求,您可以在客戶端的任何位置調用消息傳遞請求 API:
case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of
ok -> ok;
_ -> %% repeat or fail logic
end
在分佈式系統中,組件的配置可能非常不同,並且在請求的時候,消息傳遞可能還沒有開始,或者服務控制器還沒有準備好為請求服務。 因此,我們需要檢查消息響應並處理失敗情況。
成功發送到客戶端后,服務將收到響應或錯誤。
讓我們在 handle_info 中處理這兩種情況:
handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) ->
?debugVal(Utime),
{noreply, State};
handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) ->
?debugVal({error, ErrorCode}),
{noreply, State};
請求分塊響應
最好避免發送大量消息。 整個系統的響應速度和穩定運行都取決於此。 如果對查詢的響應佔用大量內存,則必須進行拆分。
以下是此類情況的幾個示例:
- 組件交換二進制數據,例如文件。 將答案分成小部分有助於高效地處理任何大小的文件,並且不會出現內存溢出。
- 清單。 例如,我們需要從數據庫中的一個巨大的表中選擇所有記錄並將其傳遞給另一個組件。
我把這樣的反應稱為火車頭。 在任何情況下,1024 條 1MB 的消息都比一條 1GB 的消息好。
在 Erlang 集群中,我們得到了一個額外的好處——減少交換點和網絡的負載,因為答案會繞過交換點立即發送給接收者。
響應請求
這是用於構建會話系統的 RPC 模式的一個相當罕見的修改。
發布-訂閱(數據分佈樹)
事件驅動系統在數據準備就緒後立即將數據交付給消費者。 因此,系統更傾向於推模型而不是拉模型或輪詢模型。 這個特性讓你不會因為不斷地請求和等待數據而浪費資源。
該圖顯示了向訂閱特定主題的消費者分發消息的過程。
使用這種模式的典型例子是狀態分佈:電腦遊戲中的遊戲世界、交易所的市場數據、數據饋送中的有用信息。
考慮訂閱者代碼:
init(_Args) ->
%% подписываемся на обменник, ключ = key
messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
{ok, #{}}.
handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) ->
%% если точка обмена недоступна, то пытаемся переподключиться
messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
{noreply, State};
%% обрабатываем пришедшие сообщения
handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) ->
?debugVal(Msg),
{noreply, State};
%% при остановке потребителя - отключаемся от точки обмена
terminate(_Reason, _State) ->
messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()),
ok.
源端可以在任何方便的地方調用消息的發布函數:
messaging:publish_message(Exchange, Key, Message).
允許 - 兌換點的名稱,
關鍵 ‒ 路由鍵
您的留言 - 有效載荷
反向發布訂閱
通過部署 pub-sub,您可以獲得一種便於日誌記錄的模式。 來源和消費者的集合可以完全不同。 該圖顯示了一個具有一個消費者和多個來源的案例。
任務分配模式
在幾乎每個項目中,都有延遲處理的任務,例如生成報告、傳遞通知以及從第三方系統接收數據。 執行這些任務的系統的吞吐量可以通過添加處理器輕鬆擴展。 剩下的就是形成一個處理器集群,並在它們之間平均分配任務。
考慮使用 3 個處理程序的示例出現的情況。 即使在任務分配階段,也會出現handler分配的公平性和溢出的問題。 round-robin分配會負責公平性,為了避免handler溢出的情況,我們會引入一個限制 預取限制. 在過渡模式中 預取限制 不允許一個處理程序接收所有任務。
消息管理隊列和處理優先級。 處理器在任務到達時接收任務。 任務可以成功完成或失敗:
messaging:ack(Tack)
‒ 成功處理消息時調用messaging:nack(Tack)
‒ 在所有緊急情況下都被呼叫。 任務返回後,消息傳遞會將其傳遞給另一個處理程序。
假設在處理三個任務時,發生了一個複雜的故障:處理程序 1 在收到任務後,沒有來得及向交換點報告任何事情就崩潰了。 在這種情況下,交換點將在 ack 超時到期後將作業轉移到另一個處理程序。 Handler 3 出於某種原因放棄了任務並發送了一個 nack,結果任務也傳遞給了另一個成功完成它的 handler。
初步總結
我們分解了分佈式系統的基本構建塊,並對它們在 Erlang/Elixir 中的使用有了基本的了解。
通過組合基本模板,可以構建複雜的範例來解決新出現的問題。
在本週期的最後一部分,我們將考慮組織服務、路由和平衡的一般問題,並討論系統的可擴展性和容錯性的實際方面。
第二部分結束。
照片
插圖由 websequencediagrams.com 提供
來源: www.habr.com