
讓我們以哲學家就餐問題為例,看看並發和並行編程在 .Net 中是如何工作的。 計劃是這樣的,從線程/進程的同步,到actor模型(在後面的部分)。 這篇文章可能對初次相識或為了刷新您的知識有用。
為什麼要這樣做? 晶體管達到其最小尺寸,摩爾定律依賴於光速的限制,因此觀察到數量增加,可以製造更多晶體管。 與此同時,數據量不斷增長,用戶期望系統能夠立即做出響應。 在這種情況下,當我們有一個執行線程時,“正常”編程不再有效。 您需要以某種方式解決同時或併發執行的問題。 而且,這個問題存在於不同的層面:線程層面,進程層面,網絡機器層面(分佈式系統)。 .NET 擁有高質量、經過時間考驗的技術,可以快速有效地解決此類問題。
任務
Edsger Dijkstra 早在 1965 年就向他的學生提出了這個問題。確定的表述如下。 有一定數量(通常是五個)的哲學家和相同數量的叉子。 他們坐在一張圓桌旁,中間擺著叉子。 哲學家可以從他們無盡的食物盤子裡吃,思考或等待。 要吃哲學家,您需要拿兩把叉子(最後一把與第一把共用叉子)。 拿起和放下叉子是兩個獨立的動作。 所有的哲學家都保持沉默。 任務是找到這樣一種算法,即使在 54 年後,他們所有人都會思考並充滿。
首先,讓我們嘗試通過使用共享空間來解決這個問題。 叉子放在公用桌子上,哲學家們只是在它們在的時候拿走,然後放回去。 這裡有同步問題,到底什麼時候採取surebets? 如果沒有叉子怎麼辦? 等等 但首先,讓我們開始哲學家。
為了啟動線程,我們通過以下方式使用線程池 Task.Run 方法:
var cancelTokenSource = new CancellationTokenSource();
Action<int> create = (i) => RunPhilosopher(i, cancelTokenSource.Token);
for (int i = 0; i < philosophersAmount; i++)
{
int icopy = i;
// Поместить задачу в очередь пула потоков. Метод RunDeadlock не запускаеться
// сразу, а ждет своего потока. Асинхронный запуск.
philosophers[i] = Task.Run(() => create(icopy), cancelTokenSource.Token);
}線程池旨在優化線程的創建和刪除。 這個池有一個任務隊列,CLR 根據這些任務的數量創建或刪除線程。 所有 AppDomains 的一個池。 這個池應該幾乎總是被使用,因為。 無需費心創建、刪除線程、它們的隊列等。沒有池是可能的,但是你必須直接使用它 Thread,這對於需要更改線程的優先級,當我們有一個長時間的操作,對於前台線程等情況很有用。
換言之, System.Threading.Tasks.Task 班級是一樣的 Thread,但具有各種便利:能夠在其他任務塊之後運行任務,從函數返回它們,方便地中斷它們等等。 等。它們需要支持異步/等待結構(基於任務的異步模式,用於等待 IO 操作的語法糖)。 這個我們稍後再說。
CancelationTokenSource 這裡需要它,以便線程可以在調用線程發出信號時自行終止。
同步問題
被封鎖的哲學家
好的,我們知道如何創建線程,讓我們嘗試吃午飯:
// Кто какие вилки взял. К примеру: 1 1 3 3 - 1й и 3й взяли первые две пары.
private int[] forks = Enumerable.Repeat(0, philosophersAmount).ToArray();
// То же, что RunPhilosopher()
private void RunDeadlock(int i, CancellationToken token)
{
// Ждать вилку, взять её. Эквивалентно:
// while(true)
// if forks[fork] == 0
// forks[fork] = i+1
// break
// Thread.Sleep() или Yield() или SpinWait()
void TakeFork(int fork) =>
SpinWait.SpinUntil(() =>
Interlocked.CompareExchange(ref forks[fork], i+1, 0) == 0);
// Для простоты, но можно с Interlocked.Exchange:
void PutFork(int fork) => forks[fork] = 0;
while (true)
{
TakeFork(Left(i));
TakeFork(Right(i));
eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
PutFork(Left(i));
PutFork(Right(i));
Think(i);
// Завершить работу по-хорошему.
token.ThrowIfCancellationRequested();
}
}這裡我們先嘗試拿左邊的叉子,再拿右邊的叉子,如果成功了,我們就吃,然後放回去。 拿一個叉子是原子的,即兩個線程不能同時佔用一個(不正確:第一個讀取叉子是免費的,第二個 - 同樣,第一個佔用,第二個佔用)。 為了這 Interlocked.CompareExchange,必須用處理器指令(TSL, XCHG), 鎖定一塊內存進行原子順序讀寫。 而 SpinWait 等同於構造 while(true) 只有一點點“魔法” - 線程佔用處理器(Thread.SpinWait), 但有時會將控制轉移到另一個線程 (Thread.Yeild) 或睡著了 (Thread.Sleep).
但是這個解決方案不起作用,因為流量很快(對我來說是一秒鐘之內)被阻塞:所有哲學家都拿左叉,但不是右叉。 forks 數組的值是:1 2 3 4 5。

圖中,阻塞線程(deadlock)。 綠色 - 執行,紅色 - 同步,灰色 - 線程正在休眠。 菱形表示任務的開始時間。
哲學家的飢餓
雖然沒有必要想特別多的食物,但飢餓會使任何人放棄哲學。 讓我們嘗試模擬我們問題中線程飢餓的情況。 飢餓是線程正在運行,但沒有做任何有意義的工作,換句話說,這是同樣的死鎖,只是現在線程沒有休眠,而是在積極尋找食物,但沒有食物。 為了避免頻繁阻塞,如果我們不能再拿一個叉子,我們會把叉子放回去。
// То же что и в RunDeadlock, но теперь кладем вилку назад и добавляем плохих философов.
private void RunStarvation(int i, CancellationToken token)
{
while (true)
{
bool hasTwoForks = false;
var waitTime = TimeSpan.FromMilliseconds(50);
// Плохой философов может уже иметь вилку:
bool hasLeft = forks[Left(i)] == i + 1;
if (hasLeft || TakeFork(Left(i), i + 1, waitTime))
{
if (TakeFork(Right(i), i + 1, TimeSpan.Zero))
hasTwoForks = true;
else
PutFork(Left(i)); // Иногда плохой философ отдает вилку назад.
}
if (!hasTwoForks)
{
if (token.IsCancellationRequested) break;
continue;
}
eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
bool goodPhilosopher = i % 2 == 0;
// А плохой философ забывает положить свою вилку обратно:
if (goodPhilosopher)
PutFork(Left(i));
// А если и правую не положит, то хорошие будут вообще без еды.
PutFork(Right(i));
Think(i);
if (token.IsCancellationRequested)
break;
}
}
// Теперь можно ждать определенное время.
bool TakeFork(int fork, int philosopher, TimeSpan? waitTime = null)
{
return SpinWait.SpinUntil(
() => Interlocked.CompareExchange(ref forks[fork], philosopher, 0) == 0,
waitTime ?? TimeSpan.FromMilliseconds(-1)
);
}這段代碼的重要之處在於四分之二的哲學家忘記放下他們的左手叉子。 事實證明,他們吃得更多,而其他人則開始挨餓,儘管線程具有相同的優先級。 在這裡,他們並沒有完全餓死,因為。 糟糕的哲學家有時會把叉子放回去。 事實證明,好人比壞人少吃大約 5 倍。 所以代碼中的一個小錯誤會導致性能下降。 這裡還值得注意的是,當所有哲學家都拿左邊的叉子時,可能會出現一種罕見的情況,沒有右邊的,他們放左邊的,等待,再拿左邊的,等等。 這種情況也是一種飢餓,更像是一種僵局。 我沒有重複它。 下圖是兩個糟糕的哲學家拿走了兩把叉子,而兩個好哲學家正在挨餓的情況。

在這裡你可以看到線程有時會被喚醒並嘗試獲取資源。 四個核心中的兩個什麼都不做(上面的綠色圖表)。
哲學家之死
好吧,另一個可能打斷哲學家盛宴的問題是,如果他們中的一個人突然死去,手裡拿著叉子(他們會像那樣埋葬他)。 然後鄰居們將沒有午餐。 這個case你可以自己出一個示例代碼,比如拋出 NullReferenceException 在哲學家拿起叉子之後。 而且,順便說一句,異常不會被處理,調用代碼也不會只是捕獲它(為此 AppDomain.CurrentDomain.UnhandledException 等等。)。 因此,線程本身需要錯誤處理程序並正常終止。
服務員
好的,我們如何解決這個僵局、飢餓和死亡問題? 我們將只允許一個哲學家到達分叉處,為這個地方添加線程互斥。 怎麼做? 假設哲學家旁邊有一位服務員允許任何一位哲學家拿叉子。 我們如何讓這個服務員以及哲學家將如何問他,這些問題很有趣。
最簡單的方法是哲學家會不斷地詢問服務員是否可以使用叉子。 那些。 現在哲學家不會在附近等待叉子,而是等待或詢問服務員。 起初,我們為此只使用用戶空間,在其中我們不使用中斷從內核調用任何過程(關於它們在下面)。
用戶空間的解決方案
在這裡,我們將像以前對一個叉子和兩個哲學家所做的一樣,我們將在一個循環中旋轉並等待。 但是現在它將是所有哲學家,並且可以說只有一個叉子,即。 可以說,只有從服務員手中接過這把“金叉子”的哲學家才會吃飯。 為此,我們使用自旋鎖。
private static SpinLock spinLock = new SpinLock(); // Наш "официант"
private void RunSpinLock(int i, CancellationToken token)
{
while (true)
{
// Взаимная блокировка через busy waiting. Вызываем до try, чтобы
// выбрасить исключение в случае ошибки в самом SpinLock.
bool hasLock = false;
spinLock.Enter(ref hasLock);
try
{
// Здесь может быть только один поток (mutual exclusion).
forks[Left(i)] = i + 1; // Берем вилку сразу, без ожидания.
forks[Right(i)] = i + 1;
eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
forks[Left(i)] = 0;
forks[Right(i)] = 0;
}
finally
{
if(hasLock) spinLock.Exit(); // Избегаем проблемы со смертью философа.
}
Think(i);
if (token.IsCancellationRequested)
break;
}
}SpinLock 這是一個阻滯劑,粗略地說,是一樣的 while(true) { if (!lock) break; },但比在 SpinWait (在那裡使用)。 現在他知道如何計算等待的人數,讓他們睡一會兒,等等。 等總的來說,千方百計優化。 但我們必須記住,這仍然是消耗處理器資源並保持流量的同一個活動週期,如果其中一位哲學家變得比其他哲學家更優先,但沒有金叉(優先級倒置問題),這可能導致飢餓. 因此,我們只將它用於共享內存中非常非常短的更改,沒有任何第三方調用、嵌套鎖和其他意外情況。

繪圖為 SpinLock. 溪流不斷“爭搶”金叉。 有失敗 - 在圖中,選定區域。 內核未得到充分利用:這四個線程僅使用了大約 2/3。
這裡的另一個解決方案是只使用 Interlocked.CompareExchange 與上面代碼中所示的相同的主動等待(在飢餓的哲學家中),但是正如已經說過的,這在理論上可能會導致阻塞。
Про Interlocked 需要注意的是,不僅有 CompareExchange,還有其他用於原子讀寫的方法。 並通過重複更改,以防另一個線程有時間進行更改(讀取 1、讀取 2、寫入 2、寫入 1 是錯誤的),它可用於對單個值進行複雜更改(Interlocked Anything 模式) .
內核模式解決方案
為了避免在循環中浪費資源,讓我們看看如何阻塞線程。 換句話說,繼續我們的例子,讓我們看看服務員是如何讓哲學家入睡並只在必要時叫醒他的。 首先,讓我們看一下如何通過操作系統的內核模式來實現這一點。 那裡的所有結構通常都比用戶空間中的結構慢。 例如慢幾倍 AutoResetEvent 可能慢 53 倍 SpinLock [里希特]。 但是在他們的幫助下,您可以在整個系統中同步流程,無論是否受管。
這裡的基本構造是 Dijkstra 半個多世紀前提出的信號量。 信號量簡單來說就是一個由系統管理的正整數,以及對它的兩個操作,遞增和遞減。 如果它不能減少,為零,則調用線程被阻塞。 當其他一些活動線程/進程增加該數字時,將跳過線程並且信號量再次減少傳遞的數字。 人們可以想像火車在一個信號量的瓶頸中。 .NET 提供了幾種具有類似功能的結構: AutoResetEvent, ManualResetEvent, Mutex 和我自己 Semaphore. 我們將使用 AutoResetEvent,這是這些構造中最簡單的:只有兩個值 0 和 1(假,真)。 她的方法 WaitOne() 如果值為 0,則阻塞調用線程;如果值為 1,則將其降低為 0 並跳過它。 一個方法 Set() 升到 1 並讓一名服務員通過,服務員再次降低到 0。就像地鐵十字轉門一樣。
讓我們將解決方案複雜化,並為每個哲學家使用鎖,而不是一次為所有哲學家使用鎖。 那些。 現在可以同時有幾個哲學家,而不是一個。 但是我們再次阻止對錶的訪問,以便正確地避免比賽(比賽條件),採取 surebets。
// Для блокирования отдельного философа.
// Инициализируется: new AutoResetEvent(true) для каждого.
private AutoResetEvent[] philosopherEvents;
// Для доступа к вилкам / доступ к столу.
private AutoResetEvent tableEvent = new AutoResetEvent(true);
// Рождение философа.
public void Run(int i, CancellationToken token)
{
while (true)
{
TakeForks(i); // Ждет вилки.
// Обед. Может быть и дольше.
eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
PutForks(i); // Отдать вилки и разблокировать соседей.
Think(i);
if (token.IsCancellationRequested) break;
}
}
// Ожидать вилки в блокировке.
void TakeForks(int i)
{
bool hasForks = false;
while (!hasForks) // Попробовать еще раз (блокировка не здесь).
{
// Исключающий доступ к столу, без гонок за вилками.
tableEvent.WaitOne();
if (forks[Left(i)] == 0 && forks[Right(i)] == 0)
forks[Left(i)] = forks[Right(i)] = i + 1;
hasForks = forks[Left(i)] == i + 1 && forks[Right(i)] == i + 1;
if (hasForks)
// Теперь философ поест, выйдет из цикла. Если Set
// вызван дважды, то значение true.
philosopherEvents[i].Set();
// Разблокировать одного ожидающего. После него значение tableEvent в false.
tableEvent.Set();
// Если имеет true, не блокируется, а если false, то будет ждать Set от соседа.
philosopherEvents[i].WaitOne();
}
}
// Отдать вилки и разблокировать соседей.
void PutForks(int i)
{
tableEvent.WaitOne(); // Без гонок за вилками.
forks[Left(i)] = 0;
// Пробудить левого, а потом и правого соседа, либо AutoResetEvent в true.
philosopherEvents[LeftPhilosopher(i)].Set();
forks[Right(i)] = 0;
philosopherEvents[RightPhilosopher(i)].Set();
tableEvent.Set();
}要理解這裡發生的事情,請考慮哲學家沒有拿叉子的情況,那麼他的行為如下。 他正在等待訪問表。 收到它後,他試圖拿起叉子。 沒有成功。 它提供對錶的訪問(互斥)。 並通過他的“旋轉門”(AutoResetEvent)(它們最初是開放的)。 又進入循環了,因為他沒有叉子。 他試圖拿走它們,並在他的“十字轉門”前停了下來。 右邊或左邊的一些更幸運的鄰居吃完飯後,打開我們的哲學家的鎖,“打開他的十字轉門”。 我們的哲學家第二次經過它(它在它後面關閉)。 他第三次嘗試拿起叉子。 祝你好運。 然後他經過十字轉門去吃飯。
當此類代碼中存在隨機錯誤時(它們始終存在),例如,錯誤指定了鄰居或創建了相同的對象 AutoResetEvent 對全部 (Enumerable.Repeat),那麼哲學家們將等待開發者,因為在這樣的代碼中查找錯誤是一項相當困難的任務。 這個解決方案的另一個問題是它不能保證某些哲學家不會挨餓。
混合解決方案
我們已經研究了兩種計時方法,一種是停留在用戶模式並循環,另一種是通過內核阻塞線程。 第一種方法適用於短鎖,第二種適用於長鎖。 通常需要先在循環中短暫地等待一個變量的變化,等待時間長了再阻塞線程。 這種方法是在所謂的實現。 混合結構。 以下是與內核模式相同的構造,但現在帶有用戶模式循環: SemaphorSlim, ManualResetEventSlim 等等 這裡最受歡迎的設計是 Monitor, 因為在 C# 中有一個眾所周知的 lock 句法。 Monitor 這是同一個信號量,最大值為 1(互斥量),但支持循環等待、遞歸、條件變量模式(更多內容見下文)等。讓我們看看它的解決方案。
// Спрячем объект для Монитора от всех, чтобы без дедлоков.
private readonly object _lock = new object();
// Время ожидания потока.
private DateTime?[] _waitTimes = new DateTime?[philosophersAmount];
public void Run(int i, CancellationToken token)
{
while (true)
{
TakeForks(i);
eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
PutForks(i);
Think(i);
if (token.IsCancellationRequested) break;
}
}
// Наше сложное условие для Condition Variable паттерна.
bool CanIEat(int i)
{
// Если есть вилки:
if (forks[Left(i)] != 0 && forks[Right(i)] != 0)
return false;
var now = DateTime.Now;
// Может, если соседи не более голодные, чем текущий.
foreach(var p in new int[] {LeftPhilosopher(i), RightPhilosopher(i)})
if (_waitTimes[p] != null && now - _waitTimes[p] > now - _waitTimes[i])
return false;
return true;
}
void TakeForks(int i)
{
// Зайти в Монитор. То же самое: lock(_lock) {..}.
// Вызываем вне try, чтобы возможное исключение выбрасывалось выше.
bool lockTaken = false;
Monitor.Enter(_lock, ref lockTaken);
try
{
_waitTimes[i] = DateTime.Now;
// Condition Variable паттерн. Освобождаем лок, если не выполненно
// сложное условие. И ждем пока кто-нибудь сделает Pulse / PulseAll.
while (!CanIEat(i))
Monitor.Wait(_lock);
forks[Left(i)] = i + 1;
forks[Right(i)] = i + 1;
_waitTimes[i] = null;
}
finally
{
if (lockTaken) Monitor.Exit(_lock);
}
}
void PutForks(int i)
{
// То же самое: lock (_lock) {..}.
bool lockTaken = false;
Monitor.Enter(_lock, ref lockTaken);
try
{
forks[Left(i)] = 0;
forks[Right(i)] = 0;
// Освободить все потоки в очереди ПОСЛЕ вызова Monitor.Exit.
Monitor.PulseAll(_lock);
}
finally
{
if (lockTaken) Monitor.Exit(_lock);
}
}在這裡,我們再次阻塞了整個表以訪問叉子,但現在我們一次解除了所有線程的阻塞,而不是當有人吃完時的鄰居。 那些。 首先,有人吃東西並擋住了鄰居,當這個人吃完了,但又想馬上再次吃東西時,他進入阻塞狀態並喚醒了他的鄰居,因為。 它的等待時間更少。
這就是我們如何避免僵局和某些哲學家的飢餓。 我們使用循環進行短暫的等待,並阻塞線程進行長時間的等待。 一次解鎖所有人比只解鎖鄰居要慢,就像在解決方案中一樣 AutoResetEvent,但差異應該不會很大,因為線程必須首先處於用戶模式。
У lock 語法有令人討厭的驚喜。 推薦使用 Monitor 直接 [Richter] [Eric Lippert]。 其中之一是 lock 總是出 Monitor,即使出現異常,在這種情況下另一個線程也可以更改共享內存狀態。 在這種情況下,進入死鎖或以某種方式安全地終止程序通常會更好。 另一個驚喜是 Monitor 使用同步塊(SyncBlock),它們存在於所有對像中。 因此,如果選擇了一個不合適的對象,您很容易陷入死鎖(例如,如果您鎖定一個駐留字符串)。 我們為此使用始終隱藏的對象。
條件變量模式允許您更簡潔地實現對某些複雜條件的期望。 在我看來,在 .NET 中它是不完整的,因為理論上,應該有多個隊列在多個變量上(如在 Posix 線程中),而不是在一個 lok 上。 然後可以為所有哲學家製作它們。 但即使是這種形式,它也可以讓您減少代碼。
許多哲學家或 async / await
好的,現在我們可以有效地阻塞線程了。 但是如果我們有很多哲學家呢? 100? 10000? 例如,我們收到了 100000 個對 Web 服務器的請求。 為每個請求創建一個線程會產生開銷,因為這麼多線程不會並行運行。 只會運行與邏輯核心一樣多的數量(我有 4 個)。 而其他人只會拿走資源。 這個問題的一個解決方案是異步/等待模式。 它的想法是,如果函數需要等待某些事情繼續進行,則該函數不會保留線程。 當它做某事時,它會恢復執行(但不一定在同一個線程上!)。 在我們的例子中,我們將等待分叉。
SemaphoreSlim 為此 WaitAsync() 方法。 這是使用此模式的實現。
// Запуск такой же, как раньше. Где-нибудь в программе:
Task.Run(() => Run(i, cancelTokenSource.Token));
// Запуск философа.
// Ключевое слово async -- компилятор транслирует этот метот в асинхронный.
public async Task Run(int i, CancellationToken token)
{
while (true)
{
// await -- будем ожидать какого-то события.
await TakeForks(i);
// После await, продолжение возможно в другом потоке.
eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
// Может быть несколько событий для ожидания.
await PutForks(i);
Think(i);
if (token.IsCancellationRequested) break;
}
}
async Task TakeForks(int i)
{
bool hasForks = false;
while (!hasForks)
{
// Взаимоисключающий доступ к столу:
await _tableSemaphore.WaitAsync();
if (forks[Left(i)] == 0 && forks[Right(i)] == 0)
{
forks[Left(i)] = i+1;
forks[Right(i)] = i+1;
hasForks = true;
}
_tableSemaphore.Release();
// Будем ожидать, чтобы сосед положил вилки:
if (!hasForks)
await _philosopherSemaphores[i].WaitAsync();
}
}
// Ждем доступа к столу и кладем вилки.
async Task PutForks(int i)
{
await _tableSemaphore.WaitAsync();
forks[Left(i)] = 0;
// "Пробудить" соседей, если они "спали".
_philosopherSemaphores[LeftPhilosopher(i)].Release();
forks[Right(i)] = 0;
_philosopherSemaphores[RightPhilosopher(i)].Release();
_tableSemaphore.Release();
}方法與 async / await 被翻譯成一個棘手的狀態機,立即返回其內部 Task. 通過它,您可以等待方法完成、取消它以及您可以使用 Task 執行的所有其他操作。 在方法內部,狀態機控制執行。 最重要的是,如果沒有延遲,則執行是同步的,如果有,則釋放線程。 為了更好地理解這一點,最好看一下這個狀態機。 您可以從這些創建鏈 async / await 方法。
讓我們測試一下。 100 位哲學家在具有 4 個邏輯核心的機器上的工作,8 秒。 之前使用 Monitor 的解決方案只運行了前 4 個線程,其餘的根本沒有運行。 這 4 個線程中的每一個都空閒了大約 2ms。 async / await 解決方案運行了全部 100 個,平均每個等待 6.8 秒。 當然,在真實的系統中,空閒6秒是不能接受的,最好不要處理這麼多這樣的請求。 事實證明,帶有 Monitor 的解決方案根本無法擴展。
結論
從這些小示例中可以看出,.NET 支持許多同步結構。 然而,如何使用它們並不總是顯而易見的。 我希望這篇文章對您有所幫助。 現在,到此為止,但還有很多有趣的東西,例如,線程安全集合、TPL 數據流、響應式編程、軟件事務模型等。
來源
- 流可視化:
- MSDN: , 還有很多 其他
- [Richter] - 通過 C# 實現 CLR,Jeffrey Richter
- [埃里克·利珀特] -
- 圖片 - “劍中之舞”,G. Semiradsky
來源: www.habr.com
