socket的sk

    科技2025-11-28  18

        在socket的结构体里有一个sk_wmem_alloc字段,该字段表示已经提交到ip层,但还没有从本机发送出去的skb占用空间大小。

    分配时机

        当tcp层封装好skb数据后,会调用tcp_transmit_skb,在该函数会根据skb的长度相应增加sk_wmem_alloc的值,然后发送给ip层。

    static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it, gfp_t gfp_mask) { ... //设置skb的释放处理函数 skb->destructor = skb_is_tcp_pure_ack(skb) ? __sock_wfree : tcp_wfree; skb_set_hash_from_sk(skb, sk); //增加sk_wmen_alloc大小 atomic_add(skb->truesize, &sk->sk_wmem_alloc); //发送给ip层 err = icsk->icsk_af_ops->queue_xmit(sk, skb, &inet->cork.fl); return net_xmit_eval(err); }

    释放时机

        当驱动发送完skb,收到中断消息后,会进入ixgbe_clean_tx_irq流程,这里会调用napi_consume_skb,最终通过skb_release_head_state调用skb->destructor回收sk_wmem_alloc内存空间,正常非纯ack的tcp数据,destructor执行的是tcp_wfree函数。

    void napi_consume_skb(struct sk_buff *skb, int budget) { if (unlikely(!skb)) return; /* Zero budget indicate non-NAPI context called us, like netpoll */ if (unlikely(!budget)) { dev_consume_skb_any(skb); return; } if (likely(atomic_read(&skb->users) == 1)) smp_rmb(); else if (likely(!atomic_dec_and_test(&skb->users))) return; /* if reaching here SKB is ready to free */ trace_consume_skb(skb); /* if SKB is a clone, don't handle this case */ //驱动收到中断时,走这个流程,执行__kfree_skb if (skb->fclone != SKB_FCLONE_UNAVAILABLE) { __kfree_skb(skb); return; } _kfree_skb_defer(skb); } static void skb_release_head_state(struct sk_buff *skb) { skb_dst_drop(skb); #ifdef CONFIG_XFRM secpath_put(skb->sp); #endif if (skb->destructor) { WARN_ON(in_irq()); skb->destructor(skb); } #if IS_ENABLED(CONFIG_NF_CONNTRACK) nf_conntrack_put(skb_nfct(skb)); #endif #if IS_ENABLED(CONFIG_BRIDGE_NETFILTER) nf_bridge_put(skb->nf_bridge); #endif } void tcp_wfree(struct sk_buff *skb) { struct sock *sk = skb->sk; struct tcp_sock *tp = tcp_sk(sk); unsigned long flags, nval, oval; int wmem; /* Keep one reference on sk_wmem_alloc. * Will be released by sk_free() from here or tcp_tasklet_func() */ //回收sk_wmem_alloc wmem = atomic_sub_return(skb->truesize - 1, &sk->sk_wmem_alloc); /* If this softirq is serviced by ksoftirqd, we are likely under stress. * Wait until our queues (qdisc + devices) are drained. * This gives : * - less callbacks to tcp_write_xmit(), reducing stress (batches) * - chance for incoming ACK (processed by another cpu maybe) * to migrate this flow (skb->ooo_okay will be eventually set) */ if (wmem >= SKB_TRUESIZE(1) && this_cpu_ksoftirqd() == current) goto out; for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) { struct tsq_tasklet *tsq; bool empty; if (!(oval & TSQF_THROTTLED) || (oval & TSQF_QUEUED)) goto out; nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED; nval = cmpxchg(&sk->sk_tsq_flags, oval, nval); if (nval != oval) continue; /* queue this socket to tasklet queue */ local_irq_save(flags); tsq = this_cpu_ptr(&tsq_tasklet); empty = list_empty(&tsq->head); list_add(&tp->tsq_node, &tsq->head); if (empty) tasklet_schedule(&tsq->tasklet); local_irq_restore(flags); return; } out: sk_free(sk); }

    skb数据销毁时机

    在tcp_wfree函数里并没有真正的释放发送的skb数据,仅仅只是回收sk_wmem_alloc空间,因为tcp为了保证可靠性,skb的数据需要等到ack流程里才能释放,如果超时丢包等还需要再次用到skb重传。在tcp_ack流程里,会通过tcp_clean_rtx_queue将skb从发送队列里移除,并调用sk_wmem_free_skb真正释放skb的数据内容。

    static int tcp_clean_rtx_queue(struct sock *sk, int prior_fackets, u32 prior_snd_una, int *acked, struct tcp_sacktag_state *sack) { while ((skb = tcp_write_queue_head(sk)) && skb != tcp_send_head(sk)) { ... tcp_unlink_write_queue(skb, sk); sk_wmem_free_skb(sk, skb); } } void __kfree_skb(struct sk_buff *skb) { skb_release_all(skb); kfree_skbmem(skb); }

    sk_wmem_free_skb最终也会通过__kfree_skb来释放skb数据内容,之前在驱动收到tx中断处理流程里,回收sk_wmem_alloc空间的时候也是通过这个函数来完成的,两个流程调用同一个处理函数,怎么做到驱动只是回收sk_wmem_alloc,而ack流程才去真正释放skb数据内容的呢?要回答这个问题就得先了解清楚skb的分配过程了。

    sk_stream_alloc_skb

        在tcp_sendmsg里,当发现发送队列的最后skb空间不足时,内核会调用sk_stream_alloc_skb分配一个新的skb,分配完成后通过skb_entail将skb插入write_queue队列里。在看sk_stream_alloc_skb实现细节前,先看下它的参数size,这个size表示要分配的skb的线性区域空间大小,它是通过select_size函数来计算出来,当不支持sg的时候分配线性区域空间大小为mss的大小,当支持sg并且可以gso的时候,会通过linear_payload_sz来计算,这里的first_skb表示是否是wirte_queue的第一个skb,如果是,则申请(2048-tcp消息头)长度,否则不申请线性区域,这里主要是为了提高sack的处理流程,在sack里需要通过tcp_shift_skb_data对skb做一些迁移操作,skb只有非线性数据可以提高处理效率。

    static int select_size(const struct sock *sk, bool sg, bool first_skb) { const struct tcp_sock *tp = tcp_sk(sk); int tmp = tp->mss_cache; if (sg) { if (sk_can_gso(sk)) { tmp = linear_payload_sz(first_skb); } else { int pgbreak = SKB_MAX_HEAD(MAX_TCP_HEADER); if (tmp >= pgbreak && tmp <= pgbreak + (MAX_SKB_FRAGS - 1) * PAGE_SIZE) tmp = pgbreak; } } return tmp; } static int linear_payload_sz(bool first_skb) { if (first_skb) return SKB_WITH_OVERHEAD(2048 - MAX_TCP_HEADER); return 0; } struct sk_buff *sk_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp, bool force_schedule) { struct sk_buff *skb; /* The TCP header must be at least 32-bit aligned. */ size = ALIGN(size, 4); if (unlikely(tcp_under_memory_pressure(sk))) sk_mem_reclaim_partial(sk); skb = alloc_skb_fclone(size + sk->sk_prot->max_header, gfp); if (likely(skb)) { bool mem_scheduled; if (force_schedule) { mem_scheduled = true; sk_forced_mem_schedule(sk, skb->truesize); } else { mem_scheduled = sk_wmem_schedule(sk, skb->truesize); } if (likely(mem_scheduled)) { skb_reserve(skb, sk->sk_prot->max_header); /* * Make sure that we have exactly size bytes * available to the caller, no more, no less. */ skb->reserved_tailroom = skb->end - skb->tail - size; return skb; } __kfree_skb(skb); } else { sk->sk_prot->enter_memory_pressure(sk); sk_stream_moderate_sndbuf(sk); } return NULL; }

        sk_stream_alloc_skb调用alloc_skb_fclone分配skb,最终调用欧冠__alloc_skb时会传递一个SKB_ALLOC_FCLONE参数,这个参数表示fast clone的意思,因为tcp在分配完skb并填充完用户数据,通过tcp_transmit_skb进一步封装tcp头然后发送给ip层,在tcp_transmit_skb里会先把skb clone一份,这样传递给ip层的skb就是clone出来的skb,而原来的skb则还是保存在write_queue里。

    static inline struct sk_buff *alloc_skb_fclone(unsigned int size, gfp_t priority) { return __alloc_skb(size, priority, SKB_ALLOC_FCLONE, NUMA_NO_NODE); }

        通过fclone分配的skb,会同时分配两个skb,每个skb都有一个fclone标志为,用来表示是orig的skb(存放在write_queue)还是clone的skb(真正给驱动发送使用的),它的空间布局如下所示,第一个skb的fclone=1(SKB_FCLONE_ORIG)表示skb为源skb,第二个skb的fclone设置为2(SKB_FCLONE_CLONE),表示是clone出来的skb。这个是在__alloc_skb里初始化设置的。

    struct sk_buff *__alloc_skb(unsigned int size, gfp_t gfp_mask, int flags, int node) { if (flags & SKB_ALLOC_FCLONE) { struct sk_buff_fclones *fclones; fclones = container_of(skb, struct sk_buff_fclones, skb1); kmemcheck_annotate_bitfield(&fclones->skb2, flags1); //设置第一个skb为源skb skb->fclone = SKB_FCLONE_ORIG; //设置fclone_ref引用参数为1 atomic_set(&fclones->fclone_ref, 1); //设置第二个skb为clone skb fclones->skb2.fclone = SKB_FCLONE_CLONE; } out: return skb; nodata: kmem_cache_free(cache, skb); skb = NULL; goto out; }

     

    tcp_transmit_skb

        在封装好用户数据后,调用tcp_transimit_skb准备发送skb时,这里的clone_it表置位为1,因此进入skb_clone,在skb_clone返回的skb即为__alloc_skb里分配的的二个skb,当然clone的过程会把第二个skb的内容指向第一个skb。

    static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it, gfp_t gfp_mask) { if (clone_it) { skb_mstamp_get(&skb->skb_mstamp); TCP_SKB_CB(skb)->tx.in_flight = TCP_SKB_CB(skb)->end_seq - tp->snd_una; tcp_rate_skb_sent(sk, skb); if (unlikely(skb_cloned(skb))) skb = pskb_copy(skb, gfp_mask); else //返回sk_buff_fclones的第二个skb,同时更新fclones->fclone_ref引用参数为2 skb = skb_clone(skb, gfp_mask); if (unlikely(!skb)) return -ENOBUFS; } }

    __kfree_skb

        现在再来看下__kfree_skb的调用流程,会有两个调用时机:

        1)、驱动收到tx中断时,通过napi_consume_skb调用(这里使用的是第二个skb,即clone出来的skb);

        2)、tcp_ack流程通过tcp_clean_rtx_queue调用(这里使用的是write_queue队列的skb,即orig skb);

      __kfree_skb里会调用两个函数,skb_release_all和kfree_skbmem,在skb_release_all会通过skb->destructor回收skb分配的内存空间(不释放skb),在kfree_skbmem里会释放skb的内存数据。其中skb->destructor会在tcp_transmit_skb时赋值,并且只会针对clone的skb赋值,这样就能保证第一次驱动调用__kfree_skb时,能正确回收skb的内存空间,而在tcp_ack流程里不会重复回收(orig skb的destructor为null);同时在kfree_skbmem流程里,会根据skb是orig还是clone的来决定是否需要真正释放skb的空间。

    void __kfree_skb(struct sk_buff *skb) { skb_release_all(skb); kfree_skbmem(skb); } static void skb_release_head_state(struct sk_buff *skb) { skb_dst_drop(skb); #ifdef CONFIG_XFRM secpath_put(skb->sp); #endif if (skb->destructor) { WARN_ON(in_irq()); skb->destructor(skb); } #if IS_ENABLED(CONFIG_NF_CONNTRACK) nf_conntrack_put(skb_nfct(skb)); #endif #if IS_ENABLED(CONFIG_BRIDGE_NETFILTER) nf_bridge_put(skb->nf_bridge); #endif } static void kfree_skbmem(struct sk_buff *skb) { struct sk_buff_fclones *fclones; switch (skb->fclone) { case SKB_FCLONE_UNAVAILABLE: kmem_cache_free(skbuff_head_cache, skb); return; case SKB_FCLONE_ORIG: fclones = container_of(skb, struct sk_buff_fclones, skb1); /* We usually free the clone (TX completion) before original skb * This test would have no chance to be true for the clone, * while here, branch prediction will be good. */ //如果是第一个skb(tcp_ack流程调用),会进入fastpash,然后free skb数据内容 if (atomic_read(&fclones->fclone_ref) == 1) goto fastpath; break; default: /* SKB_FCLONE_CLONE */ fclones = container_of(skb, struct sk_buff_fclones, skb2); break; } //如果是第二个skb,检查fclone_ref是否为1,正常驱动在napi_consume_skb进入时 //这个fclone_ref为2,因此减1之后还是非0,直接return,不释放skb数据内容 if (!atomic_dec_and_test(&fclones->fclone_ref)) return; fastpath: kmem_cache_free(skbuff_fclone_cache, fclones); }

     

    Processed: 0.023, SQL: 10