饱腹的哲学家或 .NET 中的竞争性编程

饱腹的哲学家或 .NET 中的竞争性编程

让我们以哲学家午餐问题为例,看看并发和并行编程在 .Net 中是如何工作的。 计划如下,从线程/进程同步到参与者模型(在以下部分)。 这篇文章对于初次相识或刷新您的知识可能有用。

为什么甚至知道如何做到这一点? 晶体管的尺寸正在达到最小,摩尔定律正在达到光速的极限,因此可以观察到数量的增长;可以制造更多的晶体管。 与此同时,数据量不断增长,用户期望系统立即做出响应。 在这种情况下,当我们有一个执行线程时,“正常”编程就不再有效。 我们需要以某种方式解决同时或并发执行的问题。 而且,这个问题存在于不同的层面:线程层面、进程层面、网络上的机器层面(分布式系统)。 .NET 拥有高质量、经过时间考验的技术,可以快速有效地解决此类问题。

任务

Edsger Dijkstra 早在 1965 年就向他的学生提出了这个问题。既定的公式如下。 有一定数量(通常是五个)的哲学家和相同数量的叉子。 他们坐在圆桌旁,叉子放在中间。 哲学家可以从盘子里吃不完的食物,思考或等待。 为了吃饭,哲学家需要拿两把叉子(后者与前者共用一把叉子)。 拿起和放下叉子是两个独立的动作。 所有的哲学家都沉默了。 我们的任务是找到这样一种算法,让他们即使在 54 年后仍然能够思考并吃得饱。

首先,我们尝试使用共享空间来解决这个问题。 叉子放在公共桌子上,哲学家们只需在叉子在的时候把它们拿走然后放回去即可。 这就是同步问题出现的地方,到底什么时候进行分叉? 如果没有插头怎么办? 等等。但首先,让我们从哲学家开始。

为了启动线程,我们使用线程池 Task.Run 方法:

var cancelTokenSource = new CancellationTokenSource();
Action<int> create = (i) => RunPhilosopher(i, cancelTokenSource.Token);
for (int i = 0; i < philosophersAmount; i++) 
{
    int icopy = i;
    // Поместить задачу в очередь пула потоков. Метод RunDeadlock не запускаеться 
    // сразу, а ждет своего потока. Асинхронный запуск.
    philosophers[i] = Task.Run(() => create(icopy), cancelTokenSource.Token);
}

线程池旨在优化线程的创建和删除。 该池有一个任务队列,CLR 根据这些任务的数量创建或删除线程。 所有 AppDomain 的一个池。 这个池几乎应该一直使用,因为...... 无需费心创建和删除线程、线程队列等。您可以在没有池的情况下完成此操作,但随后您必须直接使用它 Thread,这对于当我们需要更改线程的优先级、当我们有一个很长的操作、对于前台线程等情况时很有用。

换言之, System.Threading.Tasks.Task 班级是一样的 Thread,但具有各种便利:能够在其他任务块之后启动任务、从函数返回它们、方便地中断它们等等。 等等。它们需要支持 async/await 结构(基于任务的异步模式,等待 IO 操作的语法糖)。 我们稍后再讨论这个问题。

CancelationTokenSource 这里,线程必须能够根据来自调用线程的信号自行终止。

同步问题

被封锁的哲学家

好的,我们知道如何创建线程,让我们尝试一下午餐:

// Кто какие вилки взял. К примеру: 1 1 3 3 - 1й и 3й взяли первые две пары.
private int[] forks = Enumerable.Repeat(0, philosophersAmount).ToArray();

// То же, что RunPhilosopher()
private void RunDeadlock(int i, CancellationToken token) 
{
    // Ждать вилку, взять её. Эквивалентно: 
    // while(true) 
    //     if forks[fork] == 0 
    //          forks[fork] = i+1
    //          break
    //     Thread.Sleep() или Yield() или SpinWait()
    void TakeFork(int fork) =>
        SpinWait.SpinUntil(() => 
            Interlocked.CompareExchange(ref forks[fork], i+1, 0) == 0);

    // Для простоты, но можно с Interlocked.Exchange:
    void PutFork(int fork) => forks[fork] = 0;

    while (true)
    {
        TakeFork(Left(i));
        TakeFork(Right(i));
        eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
        PutFork(Left(i));
        PutFork(Right(i));
        Think(i);

        // Завершить работу по-хорошему.
        token.ThrowIfCancellationRequested();
    }
}

在这里,我们首先尝试拿左边的叉子,然后再拿右边的叉子,如果有效,我们就吃东西并将它们放回去。 拿一根叉子是原子的,即两个线程不能同时获取一个(错误:第一个读取分叉是空闲的,第二个执行相同的操作,第一个获取,第二个获取)。 为了这 Interlocked.CompareExchange,必须使用处理器指令来实现(TSL, XCHG),它锁定一块内存以进行原子顺序读写。 而SpinWait则相当于构造 while(true) 只需一点“魔法” - 线程占用处理器(Thread.SpinWait),但有时会将控制权传递给另一个线程(Thread.Yeild)或睡着了(Thread.Sleep).

但这个解决方案不起作用,因为...... 线程很快(对我来说在一秒钟之内)就被阻塞了:所有哲学家都拿走了他们左边的叉子,但没有右边的叉子。 forks 数组的值为:1 2 3 4 5。

饱腹的哲学家或 .NET 中的竞争性编程

图中,线程阻塞(死锁)。 绿色表示正在执行,红色表示同步,灰色表示线程正在休眠。 菱形表示任务的启动时间。

哲学家的饥饿

虽然你不需要很多食物来思考,但饥饿会迫使任何人放弃哲学。 让我们尝试模拟我们的问题中线程饥饿的情况。 饥饿是指一个线程在工作,但没有做有意义的工作,换句话说,它是同样的死锁,只是现在线程没有休眠,而是在积极寻找吃的东西,但没有食物。 为了避免经常阻塞,如果拿不到另一把叉子,我们就把叉子放回去。

// То же что и в RunDeadlock, но теперь кладем вилку назад и добавляем плохих философов.
private void RunStarvation(int i, CancellationToken token)
{
    while (true)
    {
        bool hasTwoForks = false;
        var waitTime = TimeSpan.FromMilliseconds(50);
        // Плохой философов может уже иметь вилку:
        bool hasLeft = forks[Left(i)] == i + 1;
        if (hasLeft || TakeFork(Left(i), i + 1, waitTime))
        {
            if (TakeFork(Right(i), i + 1, TimeSpan.Zero))
                hasTwoForks = true;
            else
                PutFork(Left(i)); // Иногда плохой философ отдает вилку назад.
        } 
        if (!hasTwoForks)
        {
            if (token.IsCancellationRequested) break;
            continue;
        }
        eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
        bool goodPhilosopher = i % 2 == 0;
        // А плохой философ забывает положить свою вилку обратно:
        if (goodPhilosopher)
            PutFork(Left(i));
        // А если и правую не положит, то хорошие будут вообще без еды.
        PutFork(Right(i));

        Think(i);

        if (token.IsCancellationRequested)
            break;
    }
}

// Теперь можно ждать определенное время.
bool TakeFork(int fork, int philosopher, TimeSpan? waitTime = null)
{
    return SpinWait.SpinUntil(
        () => Interlocked.CompareExchange(ref forks[fork], philosopher, 0) == 0,
              waitTime ?? TimeSpan.FromMilliseconds(-1)
    );
}

这段代码的重要之处在于,四分之二的哲学家忘记放下左边的叉子。 事实证明,尽管线程具有相同的优先级,但它们吃的食物更多,而其他线程则开始挨饿。 在这里他们并没有完全挨饿,因为…… 糟糕的哲学家有时会把叉子放回去。 事实证明,好人吃的东西比坏人少5倍左右。 因此,代码中的一个小错误会导致性能下降。 这里还值得注意的是,可能出现一种罕见的情况,即所有哲学家都拿左边的叉子,没有右边的叉子,他们把左边的叉子放下,等待,再拿左边的叉子,等等。 这种情况也是饥饿,更像是互相堵塞。 我无法重复它。 下面的图片描述了这样一种情况:两个坏哲学家拿走了两把叉子,而两个好哲学家正在挨饿。

饱腹的哲学家或 .NET 中的竞争性编程

在这里您可以看到线程有时会醒来并尝试获取资源。 四分之二的核心不执行任何操作(上面的绿色图)。

哲学家之死

好吧,还有一个问题可能会打断哲学家们的光荣晚宴,那就是如果其中一位哲学家突然死时手里拿着叉子(他将以这种方式被埋葬)。 那么邻居们将没有午餐。 你可以自己想出一个针对这种情况的示例代码,例如它被扔掉 NullReferenceException 当哲学家拿起叉子之后。 而且,顺便说一下,异常不会被处理,调用代码也不会简单地捕获它(对于这个 AppDomain.CurrentDomain.UnhandledException 等等。)。 因此,线程本身需要错误处理程序并能够正常终止。

服务员

好吧,我们如何解决这个僵局、饥饿和死亡的问题呢? 我们将只允许一位哲学家进入叉子,并且我们将为这个地方添加线程的互斥。 怎么做? 假设哲学家旁边有一位服务员,他允许一位哲学家拿叉子。 我们应该如何塑造这个侍者以及哲学家将如何向他提问,这些都是有趣的问题。

对于哲学家来说,最简单的方法就是不断地询问服务员是否可以拿叉子。 那些。 现在哲学家不会在附近等待叉子,而是等待或询问服务员。 起初,我们仅使用用户空间来实现此目的;在其中,我们不使用中断来调用内核中的任何过程(下面将详细介绍)。

用户空间解决方案

在这里,我们将做与之前用一把叉子和两个哲学家所做的相同的事情,我们将循环旋转并等待。 但现在全都是哲学家,而且可以说只有一把叉子,即叉子。 可以说,只有从服务员手中接过这把“金叉子”的哲学家才会吃饭。 为此,我们使用 SpinLock。

private static SpinLock spinLock = new SpinLock();  // Наш "официант"
private void RunSpinLock(int i, CancellationToken token)
{
    while (true)
    {
        // Взаимная блокировка через busy waiting. Вызываем до try, чтобы
        // выбрасить исключение в случае ошибки в самом SpinLock.
        bool hasLock = false;
        spinLock.Enter(ref hasLock);
        try
        {
            // Здесь может быть только один поток (mutual exclusion).
            forks[Left(i)] = i + 1;  // Берем вилку сразу, без ожидания.
            forks[Right(i)] = i + 1;
            eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
            forks[Left(i)] = 0;
            forks[Right(i)] = 0;
        }
        finally
        {
            if(hasLock) spinLock.Exit();  // Избегаем проблемы со смертью философа.
        }

        Think(i);

        if (token.IsCancellationRequested)
            break;
    }
}

SpinLock 这是一个拦截器,粗略地说,是相同的 while(true) { if (!lock) break; },但比其中更“神奇” SpinWait (在那里使用)。 现在他知道如何计算等待的人数,让他们睡一会儿,等等。 等等。总的来说,它会尽一切可能进行优化。 但我们必须记住,这仍然是相同的活动循环,它会消耗处理器资源并占用一个线程,如果其中一个哲学家变得比其他哲学家更优先,但没有金叉(优先级反转问题),这可能会导致饥饿)。 因此,我们仅将它用于共享内存中非常非常短的更改,没有任何第三方调用、嵌套锁或其他意外。

饱腹的哲学家或 .NET 中的竞争性编程

绘图为 SpinLock。 流为了金叉不断地“争夺”。 发生故障 - 图中突出显示的区域。 核心未完全使用:这四个线程仅使用了大约 2/3。

这里的另一个解决方案是仅使用 Interlocked.CompareExchange 与上面代码中所示的相同的主动等待(在饥饿的哲学家中),但是,正如已经说过的,这在理论上可能会导致阻塞。

Про Interlocked 值得一提的是,不仅有 CompareExchange,还有其他原子读取和写入的方法。 通过重复更改,如果另一个线程设法进行更改(读 1、读 2、写 2、写 1 不好),则它可用于对一个值进行复杂的更改(互锁任何模式)。

内核态解决方案

为了避免在循环中浪费资源,让我们看看如何阻塞线程。 换句话说,继续我们的例子,让我们看看服务员如何让哲学家入睡并仅在必要时叫醒他。 首先,我们来看看如何通过操作系统的内核模式来做到这一点。 那里的所有结构通常都会比用户空间中的结构慢。 慢几倍,例如 AutoResetEvent 可能慢 53 倍 SpinLock [里克特]。 但在他们的帮助下,您可以同步整个系统(无论是否托管)的流程。

这里的基本设计是信号量,由 Dijkstra 在半个多世纪前提出。 简单来说,信号量就是一个由系统控制的正整数,以及对其进行的两个操作——增加和减少。 如果不可能减零,则调用线程将被阻塞。 当某个其他活动线程/进程增加该数量时,线程将被传递,并且信号量再次减少所传递的数量。 您可以想象带有信号量的瓶颈列车。 .NET 提供了几种具有类似功能的构造: AutoResetEvent, ManualResetEvent, Mutex 和我自己 Semaphore。 我们将使用 AutoResetEvent,这是这些构造中最简单的:只有两个值 0 和 1(false、true)。 她的方法 WaitOne() 如果值为 0,则阻塞调用线程;如果为 1,则将其降级为 0 并跳过它。 一个方法 Set() 增加到 1 并让一个人通过,该人再次减少到 0。就像地铁中的十字转门。

让我们将解决方案复杂化,并对每个哲学家使用阻塞,而不是同时对所有人使用阻塞。 那些。 现在,几位哲学家可以同时吃饭,而不仅仅是一个。 但我们再次阻止对表的访问,以便正确进行分叉,避免竞争条件。

// Для блокирования отдельного философа.
// Инициализируется: new AutoResetEvent(true) для каждого.
private AutoResetEvent[] philosopherEvents;

// Для доступа к вилкам / доступ к столу.
private AutoResetEvent tableEvent = new AutoResetEvent(true);

// Рождение философа.
public void Run(int i, CancellationToken token)
{
    while (true)
    {
        TakeForks(i); // Ждет вилки.
        // Обед. Может быть и дольше.
        eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
        PutForks(i); // Отдать вилки и разблокировать соседей.
        Think(i);
        if (token.IsCancellationRequested) break;
    }
}

// Ожидать вилки в блокировке.
void TakeForks(int i)
{
    bool hasForks = false;
    while (!hasForks) // Попробовать еще раз (блокировка не здесь).
    {
        // Исключающий доступ к столу, без гонок за вилками.
        tableEvent.WaitOne();
        if (forks[Left(i)] == 0 && forks[Right(i)] == 0)
            forks[Left(i)] = forks[Right(i)] = i + 1;
        hasForks = forks[Left(i)] == i + 1 && forks[Right(i)] == i + 1;
        if (hasForks)
            // Теперь философ поест, выйдет из цикла. Если Set 
            // вызван дважды, то значение true.
            philosopherEvents[i].Set();
        // Разблокировать одного ожидающего. После него значение tableEvent в false.
        tableEvent.Set(); 
        // Если имеет true, не блокируется, а если false, то будет ждать Set от соседа.
        philosopherEvents[i].WaitOne();
    }
}

// Отдать вилки и разблокировать соседей.
void PutForks(int i)
{
    tableEvent.WaitOne(); // Без гонок за вилками.
    forks[Left(i)] = 0;
    // Пробудить левого, а потом и правого соседа, либо AutoResetEvent в true.
    philosopherEvents[LeftPhilosopher(i)].Set();
    forks[Right(i)] = 0;
    philosopherEvents[RightPhilosopher(i)].Set();
    tableEvent.Set();
}

要理解这里发生的情况,请考虑当哲学家未能拿走叉子时的情况,那么他的行为将如下。 他正在等待进入桌子。 收到后,他试图拿起叉子。 没有成功。 它放弃了对表的访问(互斥)。 他经过他的“十字转门”(AutoResetEvent)(一开始它们是开放的)。 又陷入循环了,因为他没有叉子。 他试图拿走它们,但在他的“十字转门”前停了下来。 左边或右边的一些幸运的邻居吃完饭后,会通过“打开他的十字转门”来解锁我们的哲学家。 我们的哲学家第二次经历了它(它在他身后关闭了)。 第三次尝试拿叉子。 成功的。 他穿过十字转门去吃午饭。

当此类代码中存在随机错误时(它们始终存在),例如,将错误地指定邻居或创建相同的对象 AutoResetEvent 对全部 (Enumerable.Repeat),那么哲学家就会等待开发者,因为在这样的代码中查找错误是一项相当困难的任务。 这个解决方案的另一个问题是它不能保证某些哲学家不会挨饿。

混合解决方案

我们研究了两种同步方法:当我们处于用户模式并在循环中旋转时,以及当我们通过内核阻塞线程时。 第一种方法适用于短块,第二种方法适用于长块。 通常,您需要首先在循环中短暂等待变量发生变化,然后在等待时间较长时阻塞线程。 这种方法是在所谓的中实现的。 混合设计。 它具有与内核模式相同的构造,但现在具有用户模式循环: SemaphorSlim, ManualResetEventSlim 等等。这里最流行的设计是 Monitor, 因为在C#中有一个众所周知的 lock 句法。 Monitor 这是相同的信号量,最大值为 1(互斥体),但支持循环等待、递归、条件变量模式(更多内容见下文)等。让我们看一下它的解决方案。

// Спрячем объект для Монитора от всех, чтобы без дедлоков.
private readonly object _lock = new object();
// Время ожидания потока.
private DateTime?[] _waitTimes = new DateTime?[philosophersAmount];

public void Run(int i, CancellationToken token)
{
    while (true)
    {
        TakeForks(i);
        eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
        PutForks(i);
        Think(i);
        if (token.IsCancellationRequested) break;
    }
}

// Наше сложное условие для Condition Variable паттерна.
bool CanIEat(int i)
{
    // Если есть вилки:
    if (forks[Left(i)] != 0 && forks[Right(i)] != 0)
        return false;
    var now = DateTime.Now;
    // Может, если соседи не более голодные, чем текущий.
    foreach(var p in new int[] {LeftPhilosopher(i), RightPhilosopher(i)})
        if (_waitTimes[p] != null && now - _waitTimes[p] > now - _waitTimes[i])
            return false;
    return true;
}

void TakeForks(int i)
{
    // Зайти в Монитор. То же самое: lock(_lock) {..}.
    // Вызываем вне try, чтобы возможное исключение выбрасывалось выше.
    bool lockTaken = false;
    Monitor.Enter(_lock, ref lockTaken);
    try
    {
        _waitTimes[i] = DateTime.Now;
        // Condition Variable паттерн. Освобождаем лок, если не выполненно 
        // сложное условие. И ждем пока кто-нибудь сделает Pulse / PulseAll.
        while (!CanIEat(i))
            Monitor.Wait(_lock); 
        forks[Left(i)] = i + 1;
        forks[Right(i)] = i + 1;
        _waitTimes[i] = null;
    }
    finally
    {
        if (lockTaken) Monitor.Exit(_lock);
    }
}

void PutForks(int i)
{
    // То же самое: lock (_lock) {..}.
    bool lockTaken = false;
    Monitor.Enter(_lock, ref lockTaken);
    try
    {
        forks[Left(i)] = 0;
        forks[Right(i)] = 0;
        // Освободить все потоки в очереди ПОСЛЕ вызова Monitor.Exit.
        Monitor.PulseAll(_lock); 
    }
    finally
    {
        if (lockTaken) Monitor.Exit(_lock);
    }
}

在这里,我们再次阻止整个表访问叉子,但现在我们立即解除对所有线程的阻止,而不是当有人吃完时阻止邻居。 那些。 首先,有人吃东西并挡住了邻居,当有人吃完后,但想立即再吃东西时,他会进入街区并唤醒他的邻居,因为它的等待时间更短。

这样我们就可以避免僵局和某些哲学家的饥饿。 我们用循环来短时间等待,长时间阻塞线程。 一次解除所有人的封锁比仅解除邻居的封锁要慢,如以下解决方案所示 AutoResetEvent,但差别应该不会很大,因为线程必须首先保持在用户模式。

У lock 语法有一些令人不愉快的意外。 推荐使用 Monitor 直接[里希特][埃里克·利珀特]。 其中之一是 lock 总是出来 Monitor,即使出现异常,然后另一个线程也可以更改共享内存的状态。 在这种情况下,通常最好陷入死锁或以某种方式安全地终止程序。 另一个惊喜是 Monitor 使用时钟块(SyncBlock),存在于所有对象中。 因此,如果选择了不合适的对象,则很容易出现死锁(例如,如果锁定了实习字符串)。 为此,我们总是使用隐藏对象。

条件变量模式允许您更简洁地实现某些复杂条件的期望。 在我看来,在 .NET 中它是不完整的,因为...... 理论上,多个变量上应该有多个队列(如 Posix 线程中),而不是一个锁上。 那么就有可能为所有哲学家制作它们。 但即使采用这种形式,它也允许您缩短代码。

许多哲学家或 async / await

好的,现在我们可以有效地阻塞线程了。 但如果我们有很多哲学家怎么办? 100? 10000? 例如,我们收到了 100000 个对 Web 服务器的请求。 为每个请求创建一个线程将是昂贵的,因为这么多线程不会并行执行。 仅执行尽可能多的逻辑核心(我有 4 个)。 其他人只会夺走资源。 此问题的一种解决方案是异步/等待模式。 它的想法是,如果函数需要等待某些事情继续,则它不会保留线程。 当发生某些事情时,它会恢复执行(但不一定在同一个线程中!)。 在我们的例子中,我们将等待分叉。

SemaphoreSlim 为此 WaitAsync() 方法。 这是使用此模式的实现。

// Запуск такой же, как раньше. Где-нибудь в программе:
Task.Run(() => Run(i, cancelTokenSource.Token));

// Запуск философа.
// Ключевое слово async -- компилятор транслирует этот метот в асинхронный.
public async Task Run(int i, CancellationToken token)
{
    while (true)
    {
        // await -- будем ожидать какого-то события.
        await TakeForks(i);
        // После await, продолжение возможно в другом потоке.
        eatenFood[i] = (eatenFood[i] + 1) % (int.MaxValue - 1);
        // Может быть несколько событий для ожидания.
        await PutForks(i);

        Think(i);

        if (token.IsCancellationRequested) break;
    }
}

async Task TakeForks(int i)
{
    bool hasForks = false;
    while (!hasForks)
    {
        // Взаимоисключающий доступ к столу:
        await _tableSemaphore.WaitAsync();
        if (forks[Left(i)] == 0 && forks[Right(i)] == 0)
        {
            forks[Left(i)] = i+1;
            forks[Right(i)] = i+1;
            hasForks = true;
        }
        _tableSemaphore.Release();
        // Будем ожидать, чтобы сосед положил вилки:
        if (!hasForks)
            await _philosopherSemaphores[i].WaitAsync();
    }
}

// Ждем доступа к столу и кладем вилки.
async Task PutForks(int i)
{
    await _tableSemaphore.WaitAsync();
    forks[Left(i)] = 0;
    // "Пробудить" соседей, если они "спали".
    _philosopherSemaphores[LeftPhilosopher(i)].Release();
    forks[Right(i)] = 0;
    _philosopherSemaphores[RightPhilosopher(i)].Release();
    _tableSemaphore.Release();
}

方法与 async / await 被翻译成一个狡猾的有限状态机,它立即返回其内部 Task。 通过它,您可以等待该方法完成、取消它以及您可以使用 Task 执行的所有其他操作。 在该方法内部,状态机控制执行。 底线是,如果没有延迟,则执行是同步的,如果有,则释放线程。 为了更好地理解这一点,最好看看这个状态机。 您可以从这些创建链 async / await 方法。

我们来测试一下。 100 位哲学家在 4 个逻辑核心的机器上工作 8 秒。 之前使用 Monitor 的解决方案只执行了前 4 个线程,根本不执行其余的线程。 这 4 个线程中的每一个都有大约 2 毫秒的空闲时间。 而 async/await 解决方案则完成了全部 100 次,平均每次等待时间为 6.8 秒。 当然,在实际系统中,空闲6秒是不可接受的,最好不要这样处理这么多请求。 事实证明,使用 Monitor 的解决方案根本不可扩展。

结论

正如您从这些小示例中看到的,.NET 支持许多同步结构。 然而,如何使用它们并不总是显而易见的。 我希望这篇文章对您有所帮助。 我们现在就结束了,但仍然有很多有趣的东西,例如,线程安全集合、TPL 数据流、反应式编程、软件事务模型等。

来源

来源: habr.com

添加评论