说到Ceph的通讯一定绕不开Messenger,无论是客户端到OSD,还是OSD到MON,或者OSD到OSD,都需要Messenger来协助完成各个模块间消息的发送、接收。Messenger有三种实现,分别是SimpleMessenger、AsyncMessenger、XioMessenger,本文以AsyncMessenger为例简单介绍一下其工作原理。
原理

processors线程数量由cct->_conf->ms_async_op_threads决定NetworkStack中workers与processors一一对应。processors收到请求会调创建AsyncConnection,并存入调用AsyncConnection实例的accept方法,accept通过EventCenter将由NetworkStack的workers调用AsyncConnection实例的process方法。(哈哈哈,绕吧,有点儿晕了吧~~~)processors处理完accept请求后,将AsyncConnection实例存入accepting_conns,等待NetworkStack处理完成。AsyncConnection的process被NetworkStack的workers线程调用,并构建Message消息通过dispatch_queue或ms_fast_dispatch发送到fast_dispatchers。
Message 格式
CEPH_MSGR_TAG_MSG

- tag ** 我觉得注释处的说明写的很清楚了,此处不做过多说明了。 **
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 - header
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload */
__le32 middle_len;/* bytes in middle payload */
__le32 data_len; /* bytes of data payload */
__le16 data_off; /* sender: include full offset;
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
/* oldest code we think can decode this. unknown if zero. */
__le16 compat_version;
__le16 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed)); - payload
*** 未知 *** - middle
*** 未知 *** - data
具体传递的数据内容,数据大小由header中的data_len决定。 - footer/old_footer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/*
* follows data payload
* ceph_msg_footer_old does not support digital signatures on messages PLR
*/
struct ceph_msg_footer_old {
__le32 front_crc, middle_crc, data_crc;
__u8 flags;
} __attribute__ ((packed));
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
// sig holds the 64 bits of the digital signature for the message PLR
__le64 sig;
__u8 flags;
} __attribute__ ((packed));footer与old_footer之差一个签名sig