Netty PooledByteBufAllocator

    科技2024-11-29  17

    Netty PooledByteBufAllocator

    池化内存分配

    newDirectBuffer() 和 newHeapBuffer() 两个主要方法.

    @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); // 获取 线程局部缓存 PoolThreadCache PoolArena<ByteBuffer> directArena = cache.directArena; // 在 线程局部缓存的 Area 上 进行内存分配 final ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }

    PooledByteBufAllocator 内存分配

    从 PoolThreadLocalCache 中 获取 线程局部缓存 PoolThreadCache

    构造方法

    分配 heapArenas 和 directArenas, 默认 arenas 的数量由 DEFAULT_NUM_HEAP_ARENA 指定.

    静态代码块

    final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2; final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER; // 初始化默认 chunk 的大小 DEFAULT_NUM_HEAP_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numHeapArenas", (int) Math.min( defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3))); DEFAULT_NUM_DIRECT_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numDirectArenas", (int) Math.min( defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

    DEFAULT_NUM_HEAP_ARENA 默认大小是 cpu 可用核数 * 2, 和 NioEventLoop 线程数 一样. 则每个线程都会有一个 arena, 分配内存无需加锁.

    PoolThreadLocalCache

    final class PoolThreadLocalCache extends FastThreadLocal 用于存储线程相关的 PoolThreadCache, 每个线程都有唯一的 PoolThreadCache

    通过 heapArena 和 directArena 初始化 线程相关的 PoolThreadCache.

    @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { return new PoolThreadCache( // 创建线程相关的 PoolThreadCache. PoolThreadCache 包含一个 heapArena 和 directArena heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); }

    每个线程分配内存时, 通过 threadLocal 拿到 PoolThreadCache. 通过 PoolThreadCache.allocate() 分配内存. PoolThreadCache 分为 cache 和 arena 两种方式.

    PoolThreadCache

    成员变量 // arena 是开辟一块内存 final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; // cache 是分配一块缓存 // Hold the caches for the different size classes, which are tiny, small and normal. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private final AtomicBoolean freed = new AtomicBoolean(); private int allocations; 构造方法 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { // ... tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena);

    会通过 tinyCacheSize,smallCacheSize,normalCacheSize 创建 subPageCaches 和 normalCaches

    PoolArena 竞技场

    PoolArena 包含多个 PoolChunkList, 每个 PoolChunkList 又包含多个 Chunk

    private final int maxOrder; final int pageSize; final int pageShifts; final int chunkSize; final int subpageOverflowMask; final int numSmallSubpagePools; final int directMemoryCacheAlignment; final int directMemoryCacheAlignmentMask; private final PoolSubpage<T>[] tinySubpagePools; private final PoolSubpage<T>[] smallSubpagePools; private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100; +-----------+ +-----------+ +-----------+ | | | | | | | +-------+ | | +-------+ | | +-------+ | | | Chunk | +------->+ | Chunk | +------->+ | Chunk | | | +-+---+-+ | | +-+---+-+ | | +-+---+-+ | | ^ | | | ^ | | | ^ | | | | v | | | v | | | v | | +-+---+-+ | | +-+---+-+ | | +-+---+-+ | | | Chunk | | | | Chunk | | | | Chunk | | | +-+---+-+ | | +-+---+-+ | | +-+---+-+ | | ^ | | | ^ | | | ^ | | | | v | | | v | | | v | | +-+---+-+ | | +-+---+-+ | | +-+---+-+ | | | Chunk | +<-------+ | Chunk | +<-------+ | Chunk | | | +-------+ | | +-------+ | | +-------+ | | | | | | | +-----------+ +-----------+ +-----------+ ChunkList ChunkList ChunkList

    一个 ChunkList 代表不同内存使用率的 chunk 的集合.

    +------------------------+ +-------------------+ | | | | | +------+ +------+ | | +------+ +------+ | | | 8K | | 8K | | | | 2K | | 2K | | | +------+ +------+ | | +------+ +------+ | | | | | | +------+ +------+ | | +------+ +------+ | | | 8K | | 8K | | | | 2K | | 2K | | | +------+ +------+ | | +------+ +------+ | | | | | +------------------------+ +-------------------+ chunk subpage[]

    chunk 按照 8k 大小划分成一个个 page. 比如分配 16k, 则分配两个 page. 将一个 page(8k) 划分成 4 个 2k 的 subpage, 也可能按照 1k 等进行划分.

    PoolArena.PoolSubpage: 子页

    directArena 分配 direct 内存的流程

    从对象池中拿到 PooledByteBuf 进行复用从缓存上进行内存分配若上面无法分配. 在内存堆里进行真实的内存分配

    io.netty.buffer.PoolArena#allocate()

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; }
    Processed: 0.010, SQL: 8