1.7 实例
在对DPDK的原理和代码展开进一步解析之前,先看一些小而简单的例子,建立一个形象上的认知。
1)helloworld,启动基础运行环境,DPDK构建了一个基于操作系统的,但适合包处理的软件运行环境,你可以认为这是个mini-OS。最早期DPDK,可以完全运行在没有操作系统的物理核(bare-metal)上,这部分代码现在不在主流的开源包中。
2)skeleton,最精简的单核报文收发骨架,也许这是当前世界上运行最快的报文进出测试程序。
3)l3fwd,三层转发是DPDK用于发布性能测试指标的主要应用。
1.7.1 HelloWorld
DPDK里的HelloWorld是最基础的入门程序,代码简短,功能也不复杂。它建立了一个多核(线程)运行的基础环境,每个线程会打印“hello from core #”,core #是由操作系统管理的。如无特别说明,本文里的DPDK线程与硬件线程是一一对应的关系。从代码角度,rte是指runtime environment,eal是指environment abstraction layer。DPDK的主要对外函数接口都以rte_作为前缀,抽象化函数接口是典型软件设计思路,可以帮助DPDK运行在多个操作系统上,DPDK官方支持Linux与FreeBSD。和多数并行处理系统类似,DPDK也有主线程、从线程的差异。
int
main(int argc, char **argv)
{
int ret;
unsigned lcore_id;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_panic(“Cannot init EAL\n”);
/* call lcore_hello() on every slave lcore */
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
rte_eal_remote_launch(lcore_hello, NULL, lcore_id);
}
/* call it on master lcore too */
lcore_hello(NULL);
rte_eal_mp_wait_lcore();
return 0;
}
1.初始化基础运行环境
主线程运行入口是main函数,调用了rte_eal_init入口函数,启动基础运行环境。
int rte_eal_init(int argc, char **argv);
入口参数是启动DPDK的命令行,可以是长长的一串很复杂的设置,需要深入了解的读者可以查看DPDK相关的文档与源代码liblibrte_ealcommoneal_common_options.c。对于HelloWorld这个实例,最需要的参数是“-c ”,线程掩码(core mask)指定了需要参与运行的线程(核)集合。rte_eal_init本身所完成的工作很复杂,它读取入口参数,解析并保存作为DPDK运行的系统信息,依赖这些信息,构建一个针对包处理设计的运行环境。主要动作分解如下
- 配置初始化
- 内存初始化
- 内存池初始化
- 队列初始化
- 告警初始化
- 中断初始化
- PCI初始化
- 定时器初始化
- 检测内存本地化(NUMA)
- 插件初始化
- 主线程初始化
- 轮询设备初始化
- 建立主从线程通道
- 将从线程设置在等待模式
- PCI设备的探测与初始化
对于DPDK库的使用者,这些操作已经被EAL封装起来,接口清晰。如果需要对DPDK进行深度定制,二次开发,需要仔细研究内部操作,这里不做详解。
2.多核运行初始化
DPDK面向多核设计,程序会试图独占运行在逻辑核(lcore)上。main函数里重要的是启动多核运行环境,RTE_LCORE_FOREACH_SLAVE(lcore_id)如名所示,遍历所有EAL指定可以使用的lcore,然后通过rte_eal_remote_launch在每个lcore上,启动被指定的线程。
int rte_eal_remote_launch(int (f)(void ),
void *arg, unsigned slave_id);
第一个参数是从线程,是被征召的线程;
第二个参数是传给从线程的参数;
第三个参数是指定的逻辑核,从线程会执行在这个core上。
具体来说,int rte_eal_remote_launch(lcore_hello, NULL, lcore_id);
参数lcore_id指定了从线程ID,运行入口函数lcore_hello。
运行函数lcore_hello,它读取自己的逻辑核编号(lcore_id),打印出“hello from core #”
static int
lcore_hello(__attribute__((unused)) void *arg)
{
unsigned lcore_id;
lcore_id = rte_lcore_id();
printf("hello from core %u\n", lcore_id);
return 0;
}
这是个简单示例,从线程很快就完成了指定工作,在更真实的场景里,这个从线程会是一个循环运行的处理过程。
1.7.2 Skeleton
DPDK为多核设计,但这是单核实例,设计初衷是实现一个最简单的报文收发示例,对收入报文不做任何处理直接发送。整个代码非常精简,可以用于平台的单核报文出入性能测试。
主要处理函数main的处理逻辑如下(伪码),调用rte_eal_init初始化运行环境,检查网络接口数,据此分配内存池rte_pktmbuf_pool_create,入口参数是指定rte_socket_id(),考虑了本地内存使用的范例。调用port_init(portid, mbuf_pool)初始化网口的配置,最后调用lcore_main()进行主处理流程。
int main(int argc, char *argv[])
{
struct rte_mempool *mbuf_pool;
unsigned nb_ports;
uint8_t portid;
/* Initialize the Environment Abstraction Layer (EAL). */
int ret = rte_eal_init(argc, argv);
/* Check that there is an even number of ports t send/receive on. */
nb_ports = rte_eth_dev_count();
if (nb_ports < 2 || (nb_ports & 1))
rte_exit(EXIT_FAILURE, "Error: number of ports must be even\n");
/* Creates a new mempool in memory to hold the mbufs. */
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
/* Initialize all ports. */
for (portid = 0; portid < nb_ports; portid++)
if (port_init(portid, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
portid);
/* Call lcore_main on the master core only. */
lcore_main();
return 0;
}
网口初始化流程:
port_init(uint8_t port, struct rte_mempool *mbuf_pool)
首先对指定端口设置队列数,基于简单原则,本例只指定单队列。在收发两个方向上,基于端口与队列进行配置设置,缓冲区进行关联设置。如不指定配置信息,则使用默认配置。
网口设置:对指定端口设置接收、发送方向的队列数目,依据配置信息来指定端口功能
int rte_eth_dev_configure(uint8_t port_id, uint16_t nb_rx_q,
uint16_t nb_tx_q, const struct rte_eth_conf *dev_conf)
队列初始化:对指定端口的某个队列,指定内存、描述符数量、报文缓冲区,并且对队列进行配置
int rte_eth_rx_queue_setup(uint8_t port_id, uint16_t rx_queue_id,
uint16_t nb_rx_desc, unsigned int socket_id,
const struct rte_eth_rxconf *rx_conf,
struct rte_mempool *mp)
int rte_eth_tx_queue_setup(uint8_t port_id, uint16_t tx_queue_id,
uint16_t nb_tx_desc, unsigned int socket_id,
const struct rte_eth_txconf *tx_conf)
网口设置:初始化配置结束后,启动端口int rte_eth_dev_start(uint8_t port_id);
完成后,读取MAC地址,打开网卡的混杂模式设置,允许所有报文进入。
static inline int
port_init(uint8_t port, struct rte_mempool *mbuf_pool)
{
struct rte_eth_conf port_conf = port_conf_default;
const uint16_t rx_rings = 1, tx_rings = 1;
/* Configure the Ethernet device. */
retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
rte_eth_dev_socket_id(port), NULL, mbuf_pool);
}
/* Allocate and set up 1 TX queue per Ethernet port. */
for (q = 0; q < tx_rings; q++) {
retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
rte_eth_dev_socket_id(port), NULL);
}
/* Start the Ethernet port. */
retval = rte_eth_dev_start(port);
/* Display the port MAC address. */
struct ether_addr addr;
rte_eth_macaddr_get(port, &addr);
/* Enable RX in promiscuous mode for the Ethernet device. */
rte_eth_promiscuous_enable(port);
return 0;
}
网口收发报文循环收发在lcore_main中有个简单实现,因为是示例,为保证性能,首先检测CPU与网卡的Socket是否最优适配,建议使用本地CPU就近操作网卡,后续章节有详细说明。数据收发循环非常简单,为高速报文进出定义了burst的收发函数如下,4个参数意义非常直观:端口,队列,报文缓冲区以及收发包数。
基于端口队列的报文收发函数:
static inline uint16_t rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
static inline uint16_t rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
这就构成了最基本的DPDK报文收发展示。可以看到,此处不涉及任何具体网卡形态,软件接口对硬件没有依赖。
static __attribute__((noreturn)) void lcore_main(void)
{
const uint8_t nb_ports = rte_eth_dev_count();
uint8_t port;
for (port = 0; port < nb_ports; port++)
if (rte_eth_dev_socket_id(port) > 0 &&
rte_eth_dev_socket_id(port) !=
(int)rte_socket_id())
printf("WARNING, port %u is on remote NUMA node to "
"polling thread.\n\tPerformance will "
"not be optimal.\n", port);
/* Run until the application is quit or killed. */
for (;;) {
/*
* Receive packets on a port and forward them on the paired
* port. The mapping is 0 -> 1, 1 -> 0, 2 -> 3, 3 -> 2, etc.
*/
for (port = 0; port < nb_ports; port++) {
/* Get burst of RX packes, from first port of pair. */
struct rte_mbuf *bufs[BURST_SIZE];
const uint16_t nb_rx = rte_eth_rx_burst(port, 0,
bufs, BURST_SIZE);
if (unlikely(nb_rx == 0))
continue;
/* Send burst of TX packets, to second port of pair. */
const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,
bufs, nb_rx);
/* Free any unsent packets. */
if (unlikely(nb_tx < nb_rx)) {
uint16_t buf;
for (buf = nb_tx; buf < nb_rx; buf++)
rte_pktmbuf_free(bufs[buf]);
}
}
}
}
}
1.7.3 L3fwd
这是DPDK中最流行的例子,也是发布DPDK性能测试的例子。如果将PCIE插槽上填满高速网卡,将网口与大流量测试仪表连接,它能展示在双路服务器平台具备200Gbit/s的转发能力。数据包被收入系统后,会查询IP报文头部,依据目标地址进行路由查找,发现目的端口,修改IP头部后,将报文从目的端口送出。路由查找有两种方式,一种方式是基于目标IP地址的完全匹配(exact match),另一种方式是基于路由表的最长掩码匹配(Longest Prefix Match,LPM)。三层转发的实例代码文件有2700多行(含空行与注释行),整体逻辑其实很简单,是前续HelloWorld与Skeleton的结合体。
启动这个例子,指定命令参数格式如下:
./build/l3fwd [EAL options] -- -p PORTMASK [-P]
--config(port,queue,lcore)[,(port,queue,lcore)]
命令参数分为两个部分,以“--”为分界线,分界线右边的参数是三层转发的私有命令选项。左边则是DPDK的EAL Options。
- [EAL Options]是DPDK运行环境的输入配置选项,输入命令会交给rte_eal_init处理;
- PORTMASK依据掩码选择端口,DPDK启动时会搜索系统认识的PCIe设备,依据黑白名单原则来决定是否接管,早期版本可能会接管所有端口,断开网络连接。
- config选项指定(port,queue,lcore),用指定线程处理对应的端口的队列。要实现200Gbit/s的转发,需要大量线程(核)参与,并行转发。
先来看主线程流程main的处理流程,因为和HelloWorld与Skeleton类似,不详细叙述。
初始化运行环境: rte_eal_init(argc, argv);
分析入参: parse_args(argc, argv)
初始化lcore与port配置
端口与队列初始化,类似Skeleton处理
端口启动,使能混杂模式
启动从线程,令其运行main_loop()
从线程执行main_loop()的主要步骤如下:
读取自己的lcore信息完成配置;
读取关联的接收与发送队列信息;
进入循环处理:
{
向指定队列批量发送报文;
从指定队列批量接收报文;
批量转发接收到报文;
}
向指定队列批量发送报文,从指定队列批量接收报文,此前已经介绍了DPDK的收发函数。批量转发接收到的报文是处理的主体,提供了基于Hash的完全匹配转发,也可以基于最长匹配原则(LPM)进行转发。转发路由查找方式可以由编译配置选择。除了路由转发算法的差异,下面的例子还包括基于multi buffer原理的代码实现。在#if (ENABLE_MULTI_BUFFER_OPTIMIZE == 1)的路径下,一次处理8个报文。和普通的软件编程不同,初次见到的程序员会觉得奇怪。它的实现有效利用了处理器内部的乱序执行和并行处理能力,能显著提高转发性能。for (j = 0; j < n; j += 8) { uint32_t pkt_type = pkts_burst[j]->packet_type & pkts_burst[j+1]->packet_type & pkts_burst[j+2]->packet_type & pkts_burst[j+3]->packet_type & pkts_burst[j+4]->packet_type & pkts_burst[j+5]->packet_type & pkts_burst[j+6]->packet_type & pkts_burst[j+7]->packet_type; if (pkt_type & RTE_PTYPE_L3_IPV4) { simple_ipv4_fwd_8pkts(&pkts_burst[j], portid, qconf); } else if (pkt_type & RTE_PTYPE_L3_IPV6) { simple_ipv6_fwd_8pkts(&pkts_burst[j], portid, qconf); } else { l3fwd_simple_forward(pkts_burst[j],portid, qconf); l3fwd_simple_forward(pkts_burst[j+1],portid, qconf); l3fwd_simple_forward(pkts_burst[j+2],portid, qconf); l3fwd_simple_forward(pkts_burst[j+3],portid, qconf); l3fwd_simple_forward(pkts_burst[j+4],portid, qconf); l3fwd_simple_forward(pkts_burst[j+5],portid, qconf); l3fwd_simple_forward(pkts_burst[j+6],portid, qconf); l3fwd_simple_forward(pkts_burst[j+7],portid, qconf); } } for (; j < nb_rx ; j++) { l3fwd_simple_forward(pkts_burst[j],portid, qconf); }
依据IP头部的五元组信息,利用rte_hash_lookup来查询目标端口。
mask0 = _mm_set_epi32(ALL_32_BITS, ALL_32_BITS, ALL_32_BITS, BIT_8_TO_15); ipv4_hdr = (uint8_t *)ipv4_hdr + offsetof(struct ipv4_hdr, time_to_live); __m128i data = _mm_loadu_si128((__m128i*)(ipv4_hdr)); /* Get 5 tuple: dst port, src port, dst IP address, src IP address and protocol */ key.xmm = _mm_and_si128(data, mask0); /* Find destination port */ ret = rte_hash_lookup(ipv4_l3fwd_lookup_struct, (const void *)&key); return (uint8_t)((ret < 0)? portid : ipv4_l3fwd_out_if[ret]);
这段代码在读取报文头部信息时,将整个头部导入了基于SSE的矢量寄存器(128位宽),并对内部进行了掩码mask0运算,得到key,然后把key作为入口参数送入rte_hash_lookup运算。同样的操作运算还展示在对IPv6的处理上,可以在代码中参考。
我们并不计划在本节将读者带入代码陷阱中,实际上本书总体上也没有偏重代码讲解,而是在原理上进行解析。如果读者希望了解详细完整的编程指南,可以参考DPDK的网站。