第3章
Ceph网络通信
本章介绍Ceph网络通信模块,这是客户端和服务器通信的底层模块,用来在客户端和服务器之间接收和发送请求。其实现功能比较清晰,是一个相对较独立的模块,理解起来比较容易,所以首先介绍它。
3.1 Ceph网络通信框架
一个分布式存储系统需要一个稳定的底层网络通信模块,用于各节点之间的互联互通。对于一个网络通信系统,要求如下:
高性能。性能评价的两个指标:带宽和延迟。
稳定可靠。数据不丢包,在网络中断时,实现重连等异常处理。
网络通信模块的实现在源代码src/msg的目录下,其首先定义了一个网络通信的框架,三个子目录里分别对应:Simple、Async、 XIO 三种不同的实现方式。
Simple是比较简单,目前比较稳定的实现,系统默认的用于生产环境的方式。 它最大的特点是:每一个网络链接,都会创建两个的线程,一个专门用于接收,一个专门用于发送。这种模式实现比较简单,但是对于大规模的集群部署,大量的链接会产生大量的线程,会消耗CPU资源,影响性能。
Async模式使用了基于事件的I/O多路复用模式。这是目前网络通信中广泛采用的方式,但是在Ceph中,官方宣称这种方式还处于试验阶段,不够稳定,还不能用于生产环境。
XIO方式使用了开源的网络通信库accelio来实现。这种方式需要依赖第三方的库accelio稳定性,需要对accelio的使用方式以及代码实现都比较熟悉。目前也处于试验阶段。特别注意的是,前两种方式只支持TCP/IP协议,XIO可以支持Infiniband网络。
在msg目录下定义了网络通信的抽象框架,它完成了通信接口和具体实现的分离。在其下分别有msg/simple子目录、msg/Async子目录、msg/xio子目录,分别对应三种不同的实现。
3.1.1 Message
类Message 是所有消息的基类,任何要发送的消息,都要继承该类,格式如图3-1所示。
`
`
图3-1 消息发送格式
消息的结构如下:header是消息头,类似一个消息的信封(envelope),user_data是用于要发送的实际数据,footer是一个消息的结束标记,如下所示:
class Message : public RefCountedObject {
ceph_msg_header header; // 消息头
ceph_msg_footer footer; // 消息尾
//用户数据
bufferlist payload; // "front" unaligned blob
bufferlist middle; // "middle" unaligned blob
bufferlist data; // data payload (page-alignment will be preserved where possible)
//消息相关的时间戳
utime_t recv_stamp; //开始接收数据的时间戳
utime_t dispatch_stamp; // dispatch的时间戳
utime_t throttle_stamp; //获取throttle的slot的时间戳
utime_t recv_complete_stamp; //接收完成的时间戳
ConnectionRef connection; //网络连接类
uint32_t magic; //消息的魔术字
bi::list_member_hook<> dispatch_q; //boost::intrusive需要的字段
}
下面分别介绍其中的重要参数。
ceph_msg_header为消息头,它定义了消息传输相关的元数据:
struct ceph_msg_header {
__le64 seq; //当前session内消息的唯一序号
__le64 tid; //消息的全局唯一的id
__le16 type; //消息类型
__le16 priority; //优先级
__le16 version; //消息编码的版本
__le32 front_len; // payload的长度
__le32 middle_len; // middle的长度
__le32 data_len; // data的长度
__le16 data_off; //对象的数据偏移量
struct ceph_entity_name src;//消息源
//一些旧的代码,用于兼容,如果为零就忽略
__le16 compat_version;
__le16 reserved;
__le32 crc; //消息头的crc32c校验信息
} attribute ((packed));
ceph_msg_footer为消息的尾部,附加了一些crc校验数据和消息结束标志:
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
//分别对应crc效验码
__le64 sig; //消息的64位signature
__u8 flags; //结束标志
} attribute ((packed));
消息带的数据分别保存在payload、middle、data这三个bufferlist中。payload一般保存Ceph操作相关的元数据,middle目前没有使用到,data一般为读写的数据。
在源代码src/messages下定义了系统需要的相关消息,其都是Message类的子类。
3.1.2 Connection
类Connection对应端(port)对端的socket链接的封装。其最重要的接口是可以发送消息:
struct Connection : public RefCountedObject {
mutable Mutex lock; //锁保护Connection的所有字段
Messenger *msgr;
RefCountedObject *priv; //链接的私有数据
int peer_type; //链接的peer类型
entity_addr_t peer_addr; //peer的地址
utime_t last_keepalive, last_keepalive_ack;
//最后一次发送keeplive的时间和最后一次接收keepalive的ACK的时间
private:
uint64_t features; //一些feature的标志位
public:
bool failed; //当值为true时,该链接为lossy链接已经失效了
int rx_buffers_version; //接收缓存区的版本
map<ceph_tid_t,pair<bufferlist,int> > rx_buffers; //接收缓冲区
消息的标识ceph_tid --> (buffer, rx_buffers_version)的映射
}
其最重要的功能就是发送消息的接口:
virtual int send_message(Message *m) = 0;
3.1.3 Dispatcher
类Dispatcher是消息分发的接口,其分发消息的接口为:
`virtual bool ms_dispatch(Message *m) = 0;
virtual void ms_fast_dispatch(Message *m);`
Server端注册该Dispatcher类用于把接收到的Message请求分发给具体处理的应用层。Client端需要实现一个Dispatcher函数,用于处理收到的ACK应对消息。
3.1.4 Messenger
Messenger是整个网络抽象模块,定义了网络模块的基本API接口。网络模块对外提供的基本功能,就是能在节点之间发送和接受消息。
向一个节点发送消息的命令如下:virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
注册一个Dispatcher用来分发消息的命令如下:void add_dispatcher_head(Dispatcher *d)
3.1.5 网络连接的策略
Policy定义了Messenger处理Connection的一些策略:
struct Policy {
bool lossy; //如果为true,该当该连接出现错误时就删除
bool server; //如果为true,为服务端,都是被动连接
bool standby; //如果为true,该连接处于等待状态
bool resetcheck; //如果为true,该连接出错后重连
//该connection相关的流控操作
Throttle *throttler_bytes;
Throttle *throttler_messages;
//本地端的一些feature标志
uint64_t features_supported;
//远程端需要的一些feature标志
uint64_t features_required;
}
3.1.6 网络模块的使用
通过下面最基本的服务器和客户端的实例程序,了解如何调用网络通信模块提供的接口来完成收发请求消息的功能。
- Server程序分析
Server程序源代码在test/simple_server.cc里,这里只展示有关网络部分的核心流程。
1)调用Messenger的函数create 创建一个Messenger的实例,配置选项g_conf->ms_type为配置的实现类型,目前有三种方式:
simple、async、xio:
messenger = Messenger::create(g_ceph_context,
g_conf->ms_type, entity_name_t::MON(-1),
"simple_server",
0 /* nonce */);
2)设置Messenger的属性:
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
messenger->set_default_policy(
Messenger::Policy::stateless_server(CEPH_FEATURES_ALL, 0));
3)对于Server,需要bind服务端地址:
r = messenger->bind(bind_addr);
if (r < 0)
goto out;
common_init_finish(g_ceph_context);
4)创建一个Dispatcher,并添加到Messenger:
dispatcher = new SimpleDispatcher(messenger);
messenger->add_dispatcher_head(dispatcher);
5)启动Messenger:
messenger->start();
messenger->wait(); //本函数必须等start完成才能调用
SimpleDispatcher函数里实现了ms_dispatch,用于把接收到的各种请求消息分发给相关的处理函数。
- Client程序分析
源代码在test/simple_client.cc里,这里只展示有关网络部分的核心流程。
1)调用Messenger的函数create创建一个Messenger的实例:
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"client",
getpid());
2)设置相关的策略:
`messenger->set_magic(MSG_MAGIC_TRACE_CTR);
messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));`
3)创建Dispatcher类并添加,用于接收消息:
`dispatcher = new SimpleDispatcher(messenger);
messenger->add_dispatcher_head(dispatcher);
dispatcher->set_active(); `
4)启动消息:
`r = messenger->start();
if (r < 0)
goto out;`
5)下面开始发送请求,先获取目标Server的链接:conn = messenger->get_connection(dest_server);
6)通过Connection来发送请求消息。这里的消息发送方式都是异步发送,接收到请求消息的ACK应答消息后,将在Dispatcher的 ms_dispatch或者ms_fast_dispatch处理函数里做相关的处理:
`Message *m;
for (msg_ix = 0; msg_ix < n_msgs; ++msg_ix) {
//如果需要,这里要添加实际的数据
if (! n_dsize) {
m = new MPing();
} else {
m = new_simple_ping_with_data("simple_client", n_dsize);
}
conn->send_message(m);
}`
综上所述,通过Ceph的网络框架发送消息比较简单。在Server端,只需要创建一个Messenger实例,设置相应的策略并绑定服务端口,然后就设置一个Dispatcher来处理接收到的请求。在Client端,只需要创建一个Messenger实例,设置相关的策略和Dispatcher用于处理返回的应答消息。通过获取对应Server的connection来发送消息即可。