internal class ReliableUdpConnectionControlBlock : IDisposable
{
// массив байт для указанного ключа. Используется для сборки входящих сообщений
public ConcurrentDictionary<Tuple<EndPoint, Int32>, byte[]> IncomingStreams { get; private set;}
// массив байт для указанного ключа. Используется для отправки исходящих сообщений.
public ConcurrentDictionary<Tuple<EndPoint, Int32>, byte[]> OutcomingStreams { get; private set; }
// connection record для указанного ключа.
private readonly ConcurrentDictionary<Tuple<EndPoint, Int32>, ReliableUdpConnectionRecord> m_listOfHandlers;
// список подписчиков на сообщения.
private readonly List<ReliableUdpSubscribeObject> m_subscribers;
// локальный сокет
private Socket m_socketIn;
// порт для входящих сообщений
private int m_port;
// локальный IP адрес
private IPAddress m_ipAddress;
// локальная конечная точка
public IPEndPoint LocalEndpoint { get; private set; }
// коллекция предварительно инициализированных
// состояний конечного автомата
public StatesCollection States { get; private set; }
// генератор случайных чисел. Используется для создания TransmissionId
private readonly RNGCryptoServiceProvider m_randomCrypto;
//...
}
Асинхрон UDP серверийг хэрэгжүүлэх:
private void Receive()
{
EndPoint connectedClient = new IPEndPoint(IPAddress.Any, 0);
// создаем новый буфер, для каждого socket.BeginReceiveFrom
byte[] buffer = new byte[DefaultMaxPacketSize + ReliableUdpHeader.Length];
// передаем буфер в качестве параметра для асинхронного метода
this.m_socketIn.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref connectedClient, EndReceive, buffer);
}
private void EndReceive(IAsyncResult ar)
{
EndPoint connectedClient = new IPEndPoint(IPAddress.Any, 0);
int bytesRead = this.m_socketIn.EndReceiveFrom(ar, ref connectedClient);
//пакет получен, готовы принимать следующий
Receive();
// т.к. простейший способ решить вопрос с буфером - получить ссылку на него
// из IAsyncResult.AsyncState
byte[] bytes = ((byte[]) ar.AsyncState).Slice(0, bytesRead);
// получаем заголовок пакета
ReliableUdpHeader header;
if (!ReliableUdpStateTools.ReadReliableUdpHeader(bytes, out header))
{
// пришел некорректный пакет - отбрасываем его
return;
}
// конструируем ключ для определения connection record’а для пакета
Tuple<EndPoint, Int32> key = new Tuple<EndPoint, Int32>(connectedClient, header.TransmissionId);
// получаем существующую connection record или создаем новую
ReliableUdpConnectionRecord record = m_listOfHandlers.GetOrAdd(key, new ReliableUdpConnectionRecord(key, this, header.ReliableUdpMessageType));
// запускаем пакет в обработку в конечный автомат
record.State.ReceivePacket(record, header, bytes);
}
internal class ReliableUdpConnectionRecord : IDisposable
{
// массив байт с сообщением
public byte[] IncomingStream { get; set; }
// ссылка на состояние конечного автомата
public ReliableUdpState State { get; set; }
// пара, однозначно определяющая connection record
// в блоке управления передачей
public Tuple<EndPoint, Int32> Key { get; private set;}
// нижняя граница приемного окна
public int WindowLowerBound;
// размер окна передачи
public readonly int WindowSize;
// номер пакета для отправки
public int SndNext;
// количество пакетов для отправки
public int NumberOfPackets;
// номер передачи (именно он и есть вторая часть Tuple)
// для каждого сообщения свой
public readonly Int32 TransmissionId;
// удаленный IP endpoint – собственно получатель сообщения
public readonly IPEndPoint RemoteClient;
// размер пакета, во избежание фрагментации на IP уровне
// не должен превышать MTU – (IP.Header + UDP.Header + RelaibleUDP.Header)
public readonly int BufferSize;
// блок управления передачей
public readonly ReliableUdpConnectionControlBlock Tcb;
// инкапсулирует результаты асинхронной операции для BeginSendMessage/EndSendMessage
public readonly AsyncResultSendMessage AsyncResult;
// не отправлять пакеты подтверждения
public bool IsNoAnswerNeeded;
// последний корректно полученный пакет (всегда устанавливается в наибольший номер)
public int RcvCurrent;
// массив с номерами потерянных пакетов
public int[] LostPackets { get; private set; }
// пришел ли последний пакет. Используется как bool.
public int IsLastPacketReceived = 0;
//...
}
public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord)
{
if (connectionRecord.IsDone != 0)
return;
if (!ReliableUdpStateTools.CheckForNoPacketLoss(connectionRecord, connectionRecord.IsLastPacketReceived != 0))
{
// есть потерянные пакеты, отсылаем запросы на них
foreach (int seqNum in connectionRecord.LostPackets)
{
if (seqNum != 0)
{
ReliableUdpStateTools.SendAskForLostPacket(connectionRecord, seqNum);
}
}
// устанавливаем таймер во второй раз, для повторной попытки передачи
if (!connectionRecord.TimerSecondTry)
{
connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1);
connectionRecord.TimerSecondTry = true;
return;
}
// если после двух попыток срабатываний WaitForPacketTimer
// не удалось получить пакеты - запускаем таймер завершения соединения
StartCloseWaitTimer(connectionRecord);
}
else if (connectionRecord.IsLastPacketReceived != 0)
// успешная проверка
{
// высылаем подтверждение о получении блока данных
ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord);
connectionRecord.State = connectionRecord.Tcb.States.Completed;
connectionRecord.State.ProcessPackets(connectionRecord);
// вместо моментальной реализации ресурсов
// запускаем таймер, на случай, если
// если последний ack не дойдет до отправителя и он запросит его снова.
// по срабатыванию таймера - реализуем ресурсы
// в состоянии Completed метод таймера переопределен
StartCloseWaitTimer(connectionRecord);
}
// это случай, когда ack на блок пакетов был потерян
else
{
if (!connectionRecord.TimerSecondTry)
{
ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord);
connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1);
connectionRecord.TimerSecondTry = true;
return;
}
// запускаем таймер завершения соединения
StartCloseWaitTimer(connectionRecord);
}
}
public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord)
{
if (connectionRecord.IsDone != 0)
return;
// отправляем повторно последний пакет
// ( в случае восстановления соединения узел-приемник заново отправит запросы, которые до него не дошли)
ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.RetransmissionCreateUdpPayload(connectionRecord, connectionRecord.SndNext - 1));
// включаем таймер CloseWait – для ожидания восстановления соединения или его завершения
StartCloseWaitTimer(connectionRecord);
}
public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord)
{
if (connectionRecord.WaitForPacketsTimer != null)
connectionRecord.WaitForPacketsTimer.Dispose();
// собираем сообщение и передаем его подписчикам
ReliableUdpStateTools.CreateMessageFromMemoryStream(connectionRecord);
}
public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload)
{
if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket))
// отбрасываем пакет
return;
// комбинация двух флагов - FirstPacket и LastPacket - говорит что у нас единственное сообщение
if (header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket) &
header.Flags.HasFlag(ReliableUdpHeaderFlags.LastPacket))
{
ReliableUdpStateTools.CreateMessageFromSinglePacket(connectionRecord, header, payload.Slice(ReliableUdpHeader.Length, payload.Length));
if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.NoAsk))
{
// отправляем пакет подтверждение
ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord);
}
SetAsCompleted(connectionRecord);
return;
}
// by design все packet numbers начинаются с 0;
if (header.PacketNumber != 0)
return;
ReliableUdpStateTools.InitIncomingBytesStorage(connectionRecord, header);
ReliableUdpStateTools.WritePacketData(connectionRecord, header, payload);
// считаем кол-во пакетов, которые должны прийти
connectionRecord.NumberOfPackets = (int)Math.Ceiling((double) ((double) connectionRecord.IncomingStream.Length/(double) connectionRecord.BufferSize));
// записываем номер последнего полученного пакета (0)
connectionRecord.RcvCurrent = header.PacketNumber;
// после сдвинули окно приема на 1
connectionRecord.WindowLowerBound++;
// переключаем состояние
connectionRecord.State = connectionRecord.Tcb.States.Assembling;
// если не требуется механизм подтверждение
// запускаем таймер который высвободит все структуры
if (header.Flags.HasFlag(ReliableUdpHeaderFlags.NoAsk))
{
connectionRecord.CloseWaitTimer = new Timer(DisposeByTimeout, connectionRecord, connectionRecord.ShortTimerPeriod, -1);
}
else
{
ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord);
connectionRecord.WaitForPacketsTimer = new Timer(CheckByTimer, connectionRecord, connectionRecord.ShortTimerPeriod, -1);
}
}
public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload)
{
if (connectionRecord.IsDone != 0)
return;
if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.RequestForPacket))
return;
// расчет конечной границы окна
// берется граница окна + 1, для получения подтверждений доставки
int windowHighestBound = Math.Min((connectionRecord.WindowLowerBound + connectionRecord.WindowSize), (connectionRecord.NumberOfPackets));
// проверка на попадание в окно
if (header.PacketNumber < connectionRecord.WindowLowerBound || header.PacketNumber > windowHighestBound)
return;
connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1);
if (connectionRecord.CloseWaitTimer != null)
connectionRecord.CloseWaitTimer.Change(-1, -1);
// проверить на последний пакет:
if (header.PacketNumber == connectionRecord.NumberOfPackets)
{
// передача завершена
Interlocked.Increment(ref connectionRecord.IsDone);
SetAsCompleted(connectionRecord);
return;
}
// это ответ на первый пакет c подтверждением
if ((header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket) && header.PacketNumber == 1))
{
// без сдвига окна
SendPacket(connectionRecord);
}
// пришло подтверждение о получении блока данных
else if (header.PacketNumber == windowHighestBound)
{
// сдвигаем окно прием/передачи
connectionRecord.WindowLowerBound += connectionRecord.WindowSize;
// обнуляем массив контроля передачи
connectionRecord.WindowControlArray.Nullify();
// отправляем блок пакетов
SendPacket(connectionRecord);
}
// это запрос на повторную передачу – отправляем требуемый пакет
else
ReliableUdpStateTools.SendPacket(connectionRecord, ReliableUdpStateTools.RetransmissionCreateUdpPayload(connectionRecord, header.PacketNumber));
}
public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload)
{
// повторная отправка последнего пакета в связи с тем,
// что последний ack не дошел до отправителя
if (header.Flags.HasFlag(ReliableUdpHeaderFlags.LastPacket))
{
ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord);
}
}
private void StartTransmission(ReliableUdpMessage reliableUdpMessage, EndPoint endPoint, AsyncResultSendMessage asyncResult)
{
if (m_isListenerStarted == 0)
{
if (this.LocalEndpoint == null)
{
throw new ArgumentNullException( "", "You must use constructor with parameters or start listener before sending message" );
}
// запускаем обработку входящих пакетов
StartListener(LocalEndpoint);
}
// создаем ключ для словаря, на основе EndPoint и ReliableUdpHeader.TransmissionId
byte[] transmissionId = new byte[4];
// создаем случайный номер transmissionId
m_randomCrypto.GetBytes(transmissionId);
Tuple<EndPoint, Int32> key = new Tuple<EndPoint, Int32>(endPoint, BitConverter.ToInt32(transmissionId, 0));
// создаем новую запись для соединения и проверяем,
// существует ли уже такой номер в наших словарях
if (!m_listOfHandlers.TryAdd(key, new ReliableUdpConnectionRecord(key, this, reliableUdpMessage, asyncResult)))
{
// если существует – то повторно генерируем случайный номер
m_randomCrypto.GetBytes(transmissionId);
key = new Tuple<EndPoint, Int32>(endPoint, BitConverter.ToInt32(transmissionId, 0));
if (!m_listOfHandlers.TryAdd(key, new ReliableUdpConnectionRecord(key, this, reliableUdpMessage, asyncResult)))
// если снова не удалось – генерируем исключение
throw new ArgumentException("Pair TransmissionId & EndPoint is already exists in the dictionary");
}
// запустили состояние в обработку
m_listOfHandlers[key].State.SendPacket(m_listOfHandlers[key]);
}
public override void ReceivePacket(ReliableUdpConnectionRecord connectionRecord, ReliableUdpHeader header, byte[] payload)
{
if (!header.Flags.HasFlag(ReliableUdpHeaderFlags.FirstPacket))
// отбрасываем пакет
return;
// ...
// by design все packet numbers начинаются с 0;
if (header.PacketNumber != 0)
return;
// инициализируем массив для хранения частей сообщения
ReliableUdpStateTools.InitIncomingBytesStorage(connectionRecord, header);
// записываем данные пакет в массив
ReliableUdpStateTools.WritePacketData(connectionRecord, header, payload);
// считаем кол-во пакетов, которые должны прийти
connectionRecord.NumberOfPackets = (int)Math.Ceiling((double) ((double) connectionRecord.IncomingStream.Length/(double) connectionRecord.BufferSize));
// записываем номер последнего полученного пакета (0)
connectionRecord.RcvCurrent = header.PacketNumber;
// после сдвинули окно приема на 1
connectionRecord.WindowLowerBound++;
// переключаем состояние
connectionRecord.State = connectionRecord.Tcb.States.Assembling;
if (/*если не требуется механизм подтверждение*/)
// ...
else
{
// отправляем подтверждение
ReliableUdpStateTools.SendAcknowledgePacket(connectionRecord);
connectionRecord.WaitForPacketsTimer = new Timer(CheckByTimer, connectionRecord, connectionRecord.ShortTimerPeriod, -1);
}
}
public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord)
{
// ...
if (/*проверка на потерянные пакеты */)
{
// отправляем запросы на повторную доставку
// устанавливаем таймер во второй раз, для повторной попытки передачи
if (!connectionRecord.TimerSecondTry)
{
connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1);
connectionRecord.TimerSecondTry = true;
return;
}
// если после двух попыток срабатываний WaitForPacketTimer
// не удалось получить пакеты - запускаем таймер завершения соединения
StartCloseWaitTimer(connectionRecord);
}
else if (/*пришел последний пакет и успешная проверка */)
{
// ...
StartCloseWaitTimer(connectionRecord);
}
// если ack на блок пакетов был потерян
else
{
if (!connectionRecord.TimerSecondTry)
{
// повторно отсылаем ack
connectionRecord.WaitForPacketsTimer.Change(connectionRecord.ShortTimerPeriod, -1);
connectionRecord.TimerSecondTry = true;
return;
}
// запускаем таймер завершения соединения
StartCloseWaitTimer(connectionRecord);
}
}
public override void ProcessPackets(ReliableUdpConnectionRecord connectionRecord)
{
// ...
// отправляем повторно последний пакет
// ...
// включаем таймер CloseWait – для ожидания восстановления соединения или его завершения
StartCloseWaitTimer(connectionRecord);
}
public sealed class ReliableUdp : IDisposable
{
// получает локальную конечную точку
public IPEndPoint LocalEndpoint
// создает экземпляр ReliableUdp и запускает
// прослушивание входящих пакетов на указанном IP адресе
// и порту. Значение 0 для порта означает использование
// динамически выделенного порта
public ReliableUdp(IPAddress localAddress, int port = 0)
// подписка на получение входящих сообщений
public ReliableUdpSubscribeObject SubscribeOnMessages(ReliableUdpMessageCallback callback, ReliableUdpMessageTypes messageType = ReliableUdpMessageTypes.Any, IPEndPoint ipEndPoint = null)
// отписка от получения сообщений
public void Unsubscribe(ReliableUdpSubscribeObject subscribeObject)
// асинхронно отправить сообщение
// Примечание: совместимость с XP и Server 2003 не теряется, т.к. используется .NET Framework 4.0
public Task<bool> SendMessageAsync(ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteEndPoint, CancellationToken cToken)
// начать асинхронную отправку сообщения
public IAsyncResult BeginSendMessage(ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteEndPoint, AsyncCallback asyncCallback, Object state)
// получить результат асинхронной отправки
public bool EndSendMessage(IAsyncResult asyncResult)
// очистить ресурсы
public void Dispose()
}
public delegate void ReliableUdpMessageCallback( ReliableUdpMessage reliableUdpMessage, IPEndPoint remoteClient );
Зурвас:
public class ReliableUdpMessage
{
// тип сообщения, простое перечисление
public ReliableUdpMessageTypes Type { get; private set; }
// данные сообщения
public byte[] Body { get; private set; }
// если установлено в true – механизм подтверждения доставки будет отключен
// для передачи конкретного сообщения
public bool NoAsk { get; private set; }
}
public enum ReliableUdpMessageTypes : short
{
// Любое
Any = 0,
// Запрос к STUN server
StunRequest = 1,
// Ответ от STUN server
StunResponse = 2,
// Передача файла
FileTransfer =3,
// ...
}