说到Ceph的通讯一定绕不开Messenger,无论是客户端到OSD,还是OSD到MON,或者OSD到OSD,都需要Messenger来协助完成各个模块间消息的发送、接收。Messenger有三种实现,分别是SimpleMessenger、AsyncMessenger、XioMessenger,本文以AsyncMessenger为例简单介绍一下其工作原理。
原理
processors
线程数量由cct->_conf->ms_async_op_threads
决定NetworkStack
中workers
与processors
一一对应。processors
收到请求会调创建AsyncConnection
,并存入调用AsyncConnection
实例的accept
方法,accept
通过EventCenter
将由NetworkStack
的workers
调用AsyncConnection
实例的process
方法。(哈哈哈,绕吧,有点儿晕了吧~~~)processors
处理完accept请求后,将AsyncConnection
实例存入accepting_conns
,等待NetworkStack
处理完成。AsyncConnection
的process
被NetworkStack
的workers
线程调用,并构建Message
消息通过dispatch_queue
或ms_fast_dispatch
发送到fast_dispatchers
。
Message 格式
CEPH_MSGR_TAG_MSG
tag
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16我觉得注释处的说明写的很清楚了,此处不做过多说明了。
header
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload */
__le32 middle_len;/* bytes in middle payload */
__le32 data_len; /* bytes of data payload */
__le16 data_off; /* sender: include full offset;
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
/* oldest code we think can decode this. unknown if zero. */
__le16 compat_version;
__le16 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));payload
未知- middle
未知 - data
具体传递的数据内容,数据大小由header
中的data_len
决定。 footer/old_footer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/*
* follows data payload
* ceph_msg_footer_old does not support digital signatures on messages PLR
*/
struct ceph_msg_footer_old {
__le32 front_crc, middle_crc, data_crc;
__u8 flags;
} __attribute__ ((packed));
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
// sig holds the 64 bits of the digital signature for the message PLR
__le64 sig;
__u8 flags;
} __attribute__ ((packed));footer
与old_footer
之差一个签名sig