基于dpdk实现一个存数据的小例子

    科技2026-06-16  2

    学习手撸dpdk,从写最简单的例子开始。

    功能说明

    数据包: ETHER HEADER | TYPE | LENGTH | VALUE

    TYPE = 0x03   write req

    TYPE = 0x04   write reply

    收到write req的单播请求,保存value, 回复write reply消息。

     

    测试客户端

    利用python3 scapy库来构造符合规则的二层数据包

    [root@localhost ~]# python3 Python 3.6.8 (default, Apr 2 2020, 13:34:55) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> >>> from scapy.all import * >>> pkt1 = Ether(dst='00:0C:29:0D:CD:6C', src='00:0c:29:0d:cd:62')/'\3\5'/"Hello" >>> pkt2 = Ether(dst='00:0C:29:0D:CD:6C', src='00:0c:29:0d:cd:62')/'\3\25'/"What is wrong with u" >>> >>> pkt1.show() ###[ Ethernet ]### dst = 00:0C:29:0D:CD:6C src = 00:0c:29:0d:cd:62 type = LOOP ###[ Raw ]### load = '\x03\x05' ###[ Raw ]### load = 'Hello' >>> sendp(pkt1, iface='ens160') . Sent 1 packets. >>> pkt2.show() ###[ Ethernet ]### dst = 00:0C:29:0D:CD:6C src = 00:0c:29:0d:cd:62 type = LOOP ###[ Raw ]### load = '\x03\x15' ###[ Raw ]### load = 'What is wrong with u' >>> sendp(pkt2, iface='ens160') . Sent 1 packets. >>> >>>

    运行结果

    [root@localhost newproto]# ./build/ckproto -c 2 -n 4 Hello proto EAL: Detected 8 lcore(s) EAL: Detected 1 NUMA nodes EAL: Multi-process socket /var/run/dpdk/rte/mp_socket EAL: Selected IOVA mode 'PA' EAL: No available hugepages reported in hugepages-1048576kB EAL: Probing VFIO support... EAL: Invalid NUMA socket, default to 0 EAL: Invalid NUMA socket, default to 0 EAL: Probe PCI driver: net_e1000_em (8086:10d3) device: 0000:0b:00.0 (socket 0) EAL: Invalid NUMA socket, default to 0 EAL: Probe PCI driver: net_e1000_em (8086:10d3) device: 0000:13:00.0 (socket 0) EAL: No legacy callbacks, legacy socket not created Port 0: 0000:0b:00.0 Mac address: 00:0C:29:0D:CD:6C Port 1: 0000:13:00.0 Mac address: 00:0C:29:0D:CD:76 ==Config done == service 1 running Data save Len:5 data:Hello Data save Len:21 data:What is wrong with u

    抓包检查

    recv: 03 05 hello

    send: 04 05 hello

    recv: 03 21 What is wrong with u

    send: 04 21 What is wrong with u

     

    代码

    /************************************************************************* Author: 小湿哥 Date: Oct 7th, 2020 *************************************************************************/ #include <stdio.h> #include <string.h> #include <stdint.h> #include <stdlib.h> #include <errno.h> #include <sys/queue.h> #include <signal.h> #include <rte_memory.h> #include <rte_memzone.h> #include <rte_malloc.h> #include <rte_launch.h> #include <rte_tailq.h> #include <rte_eal.h> #include <rte_per_lcore.h> #include <rte_lcore.h> #include <rte_debug.h> #include <rte_ether.h> #include <rte_common.h> #include <rte_ethdev.h> #define USE_PORT 0 #define NUM_RX_QUEUES 1 #define NUM_TX_QUEUES 1 #define NUM_MBUFS 8191 #define MBUF_CACHE_SIZE 250 #define NUM_RX_DESC 1024 #define NUM_TX_DESC 1024 #define BURST_SIZE 32 #define MAX_PKT_BURST 32 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */ #define MEMPOOL_CACHE_SIZE 256 typedef struct global_config { uint8_t local_mac[6]; FILE* f; } global_config_t; static volatile bool force_quit; static struct rte_eth_conf port_conf_default; static struct rte_eth_dev_tx_buffer *tx_buffer; static global_config_t g_config; static void print_mac(uint8_t* mac) { printf("%02X:%02X:%02X:%02X:%02X:%02X", mac[0],mac[1],mac[2],mac[3],mac[4],mac[5]); } static void print_eth_mac(unsigned int port_id) { struct rte_ether_addr dev_eth_addr; rte_eth_macaddr_get(port_id, &dev_eth_addr); printf("Mac address: "); print_mac(dev_eth_addr.addr_bytes); printf("\n\n"); if (port_id == USE_PORT) { memcpy(g_config.local_mac, dev_eth_addr.addr_bytes, 6); } } static inline int use_port_init(uint8_t port, struct rte_mempool *mbuf_pool) { int ret = 0, i = 0; uint32_t lcore = 0; uint16_t rx_q = NUM_RX_QUEUES; uint16_t tx_q = NUM_TX_QUEUES; uint16_t rx_d = NUM_RX_DESC; uint16_t tx_d = NUM_TX_DESC; struct rte_eth_dev_info dev_info; uint8_t nb_ports = rte_eth_dev_count_avail(); if (port < 0 || port >= nb_ports) { printf("port is not right \n"); return -1; } port_conf_default.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN; struct rte_eth_conf port_conf = port_conf_default; rte_eth_dev_info_get(port, &dev_info); if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) { port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; } ret = rte_eth_dev_configure(port, rx_q, tx_q, &port_conf); if (ret != 0) { printf("Port %d rx/tx queues config failed \n", port); return ret; } rte_eth_dev_adjust_nb_rx_tx_desc(port, &rx_d, &tx_d); for (i = 0; i < rx_q; i++) { ret= rte_eth_rx_queue_setup(port, i, rx_d, rte_eth_dev_socket_id(port),NULL, mbuf_pool); if (ret < 0) { printf("rx queue desc congfig failed \n"); return ret; } } for (i = 0; i < tx_q; i++) { ret= rte_eth_tx_queue_setup(port, i, tx_d, rte_eth_dev_socket_id(port), NULL); if (ret < 0) { printf("tx queue desc congfig failed \n"); return ret; } } tx_buffer = rte_zmalloc_socket("tx_buffer", RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0, rte_eth_dev_socket_id(USE_PORT)); if (tx_buffer == NULL) rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", USE_PORT); rte_eth_tx_buffer_init(tx_buffer, MAX_PKT_BURST); ret = rte_eth_dev_start(port); if (ret < 0) { printf("rte_eth_dev_start failed \n"); return ret; } //rte_eth_promiscuous_enable(port); return 0; } static void csignal_handler(int signum) { if (signum == SIGINT || signum == SIGTERM) { printf("\n\nSignal %d received, preparing to exit...\n", signum); force_quit = true; } } static int data_save(uint8_t* data, uint32_t len) { printf("Data save Len:%u data:%s\n", len, data); /*write to memory or disk, to be done */ return 0; } int data_process(struct rte_mbuf *mbuf) { struct rte_ether_hdr *eth; uint32_t len = 0; eth = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); memcpy(eth->d_addr.addr_bytes, eth->s_addr.addr_bytes, 6); memcpy(eth->s_addr.addr_bytes, g_config.local_mac, 6); uint8_t* data = (uint8_t*)(eth + 1); if (data[0] == 0x03) { // write req len = data[1] & 0xff; data_save(&data[2], len); data[0] = 0x04; // reply type } rte_eth_tx_buffer(USE_PORT, 0, tx_buffer, mbuf); return 0; } int service_lcore(void *arg) { int i = 0; struct rte_mbuf *buffs[BURST_SIZE]; uint16_t nb_rx; uint16_t nb_tx; uint16_t buf; uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc; uint32_t lcore_id = rte_lcore_id(); const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US; printf("service %u running\n", lcore_id); while (!force_quit) { cur_tsc = rte_rdtsc(); diff_tsc = cur_tsc - prev_tsc; if (unlikely(diff_tsc > drain_tsc)) { rte_eth_tx_buffer_flush(USE_PORT, 0, tx_buffer); prev_tsc = cur_tsc; } nb_rx = rte_eth_rx_burst(USE_PORT, 0, buffs, BURST_SIZE); if (unlikely(nb_rx == 0)) continue; for (i= 0; i < nb_rx; i++) { struct rte_ether_hdr *eth; void *tmp; eth = rte_pktmbuf_mtod(buffs[i], struct rte_ether_hdr *); #if 0 printf("Recv packet:\nDst Mac: "); print_mac(eth->d_addr.addr_bytes); printf("\nSrc Mac: "); print_mac(eth->s_addr.addr_bytes); printf("\n"); #endif if (memcmp(eth->d_addr.addr_bytes, g_config.local_mac, 6) == 0) { data_process(buffs[i]); } else { rte_pktmbuf_free(buffs[i]); } } } //RTE_LOG(INFO, APP, "lcore %u exiting\n", lcore_id); return 0; } int main(int argc, char **argv) { int ret = 0, i = 0; uint32_t lcore_id; struct rte_mempool *mbuf_pool; printf("Hello proto\n\n"); ret = rte_eal_init(argc, argv); if (ret < 0) rte_exit(EXIT_FAILURE, "Cannot init EAL\n"); argc -= ret; argv += ret; force_quit = false; signal(SIGINT, csignal_handler); signal(SIGTERM, csignal_handler); uint8_t nb_ports = rte_eth_dev_count_avail(); for (i = 0; i < nb_ports; i++) { char dev_name[RTE_DEV_NAME_MAX_LEN]; rte_eth_dev_get_name_by_port(i, dev_name); printf("Port %d: %s ", i, dev_name); print_eth_mac(i); } mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); if (mbuf_pool == NULL) { rte_exit(EXIT_FAILURE, "mbuf_pool create failed\n"); } if (use_port_init(USE_PORT, mbuf_pool) != 0) { rte_exit(EXIT_FAILURE, "port init failed\n"); } printf("==Config done ==\n"); /* launch per-lcore init on every lcore */ rte_eal_mp_remote_launch(service_lcore, NULL, CALL_MASTER); RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (rte_eal_wait_lcore(lcore_id) < 0) { ret = -1; break; } } printf("Closing port %d...", USE_PORT); rte_eth_dev_stop(USE_PORT); rte_eth_dev_close(USE_PORT); printf(" Done\n"); printf("Bye...\n"); return 0; }

    小结

    基于dpdk写一个小例子,初始化和接口参数设置好,就可以收发二层包了。二层以上也可以为所欲为了。

    而实际上多核心,多线程,多队列的绑定和数据调度这些要自己开发还得继续学习了,想要做到更好的框架,就不是那么容易了。

    Processed: 0.018, SQL: 9