Unity tcp多线程网络层框架(一)数据结构

    科技2022-08-19  116

    Unity tcp多线程网络层框架(一)数据结构

    概述

    和大多数游戏相同,之前我们游戏的客户端采用lua的csocket作为网络底层,由于该库是采用selec和iocp作为底层库,是同步非阻塞的的io方式,所以客户端的处理是在每个update中,受限于客户端性能,仅仅处理30个包,如果服务器优化堪忧,发送大量小包,就会出现数据堆积,直到服务器踢掉客户端。 为了处理该问题,我们设计了新的网络层。

    使用多线程,将协议解析,反序列化,解密的工作放到其他线程。对协议进行整理,同帧内的部分协议被优化。使用多线程,减少对内核缓冲的占用,改为对mono动态内存的引用。

    基于上述设计需要,产生了一些新的数据结构满足高性能和高复用的要求。

    数据结构类型

    可复用缓冲区对象 BlockPool

    定义一个字节对象 public class DataBlock { public Byte[] data; public int length; }

    该对象表示一块空闲的mono堆和有效的字节长度。 网络流中的字节块可能有多种长度,根据其大小,我们分为:

    const int K_MIN_BLOCK_SIZE = 8; const int K_LARGE_BLOCK_SIZE = 1024; const int K_MAX_BLOCK_SIZE = 8 * 1024;

    最大长度不超过8 * 1024。 依据tcmalloc的分配策略,我们将内存池分为依次递增的字节块:

    // tc malloc strategy // Examples: // Size Expression Index // ----------------------------------------------------- // 0 (0 + 7) / 8 0 // 1 (1 + 7) / 8 1 // ... // 1024 (1024 + 7) / 8 128 // 1025 (1025 + 127 + (120<<7)) / 128 129 // ... // 8192 (8192 + 127 + (120<<7)) / 128 184

    简单来说,小于LARGE的BLOCK其池子大小步长较小,而大于该size的将以更大的步长来增长,其计算公式如下:

    bool sizePoolIndexMaybe(int size, ref int idx) { if (size <= K_LARGE_BLOCK_SIZE) { idx = (size + 7) >> 3; return true; } else if (size <= K_MAX_BLOCK_SIZE) { idx = (size + 127 + (120 << 7)) >> 7; return true; } else { return false; } } 自旋锁对象 由于从内存池中拿出一个块是很快的操作,往往只有几个到十数个指令,然而切换上下文的开销远远大于该操作,所以内存池中用自旋锁来做线程安全保证。 c#中由于没有显式的析构函数,所以自旋锁设计为lambda调用的形式: public class SpinLock<T> { public SpinLock(object mut, T data) { _mut = mut; _data = data; } public delegate void SpinLockAction(T data); private object _mut; private int _flag; private T _data; public void SafeAction(SpinLockAction act) { while (_flag == 1) ; Interlocked.Exchange(ref _flag, 1); lock (_mut) { try { act(_data); } catch (Exception e) { UnityEngine.Debug.LogError(e.ToString()); } } Interlocked.Exchange(ref _flag, 0); } } ... // 定义一个包含容器对象的自旋对象 _cacheSpinlock = new SpinLock<Stack<Byte[]>[]>(_cacheMutex, _chacheMap); 内存池实现 void initSizeArray() { for(int i = 0; i <= K_MAX_BLOCK_SIZE; ++i) { if (i <= K_LARGE_BLOCK_SIZE) { _sizeArray[(i + 7) >> 3] = i; } else { _sizeArray[(i + 127 + (120 << 7)) >> 7] = i; } } } ... public Byte[] Alloc(int size) { int idx = 0; if (sizePoolIndexMaybe(size, ref idx)) { Byte[] ret = null; _cacheSpinlock.SafeAction((Stack<Byte[]>[] container) => { Stack<Byte[]> pool = container[idx]; int alignSize = _sizeArray[idx]; if (pool.Count != 0) { ret = new Byte[alignSize]; } else { ret = pool.Pop(); } }); return ret; } else { return new Byte[size]; } } public void Free(Byte[] data) { int idx = 0; if (sizePoolIndexMaybe(data.Length, ref idx)) { _cacheSpinlock.SafeAction((Stack<Byte[]>[] container) => { Stack<Byte[]> pool = container[idx]; pool.Push(data); }); } }

    这个地方使用的是数组而不是hash结构,当然也是为了更快的速度,然后需要将块大小与数组index的映射关系先进行初始化。(后来想了想这样和重写hash函数的dictionary没什么区别) 在tcmalloc中,会重新去映射内存页,达到优化内存碎片的目的,但是在c#中很难做到这一点,除非提前new一个大的堆内存,但是客户端内存有限,不能这样做,所以此处没有这样设计。 这个BlockPool的设计有以下几点优点:

    高速,在运行一小段时间后,每次Alloc的速度远远大于new的速度。无GC低内存占用,抛除mono底层的对齐,最大占用在0.3M左右,当然,如果提高 K_MAX_BLOCK_SIZE 的大小,占用会有一点上升。低内存冗余。Alloc中对一个block是进行了内存对齐的,但是由于使用tcmalloc的策略,该对齐只会有很低的冗余值,依据测量数据,在5%左右。

    计算最大内存占用:

    int cur = -1; int total = 0; for (int i = 0; i <= K_MAX_BLOCK_SIZE; ++i) { int size = 0; if (i <= K_LARGE_BLOCK_SIZE) { size = (i + 7) >> 3; } else { size = (i + 127 + (120 << 7)) >> 7; } if (size != cur) { total += i; cur = size; } }

    无锁队列RingBuffer

    /// <summary> /// 无锁环形队列,请保证消费者单一和生产者单一 /// 为了高效,未对同时读或同时写进行加锁 /// </summary> internal class RingBuffer { Byte[] _buffer = null; int _readIndex = 0; int _writeIndex = 0; public int Count { get { return (_writeIndex - _readIndex) % _capacity; } } int _capacity = 0; public RingBuffer(int capacity) { _capacity = capacity; _buffer = new Byte[_capacity]; } public bool Full(int add = 0) { return (Count + add) >= _capacity; } public bool Empty() { return Count <= 0; } /// <param name="size"></param> /// <param name="ret">if return false, plz wait a while</param> /// <param name="withoutShift">data will be checking only, wont be pop out</param> /// <returns></returns> public bool TryRead(int size, ref Byte[] ret, bool withoutShift = false) { if (Empty()) return false; else { // there will be only one consumer, so it's naturally thread-safe if (Count - size <= 0) return false; for (int i = 0; i < size; ++i) { if (withoutShift) { ret[i] = _buffer[_readIndex + i]; } else { _readIndex += i; ret[i] = _buffer[_readIndex]; } } } return false; } /// <summary> /// using with TryRead(withoutShift) /// </summary> /// <returns></returns> public bool TryReadShift(int size) { if (Count - size <= 0) return false; _readIndex += size; return true; } /// <param name="ret">if return false, plz wait a while</param> /// <returns></returns> public bool TryWrite(Byte[] data, int length = 0) { if (Full()) return false; else { int size = data.Length; if (length != 0) size = length; if (size + Count >= _capacity) return false; for (int i = 0; i < size; ++i) { _writeIndex += i; _buffer[_writeIndex] = data[i]; } } return false; } public void Clear() { _readIndex = 0; _writeIndex = 0; } }

    为了避免 Unity 主线程产生中断,引起其他问题,此处使用无锁队列的形式来访问网络包数据。 网络线程将数据处理完毕后交付到队列,主线程直接访问队列数据。

    Processed: 0.010, SQL: 9