AsyncMessenger介绍

说到Ceph的通讯一定绕不开Messenger,无论是客户端到OSD,还是OSD到MON,或者OSD到OSD,都需要Messenger来协助完成各个模块间消息的发送、接收。Messenger有三种实现,分别是SimpleMessenger、AsyncMessenger、XioMessenger,本文以AsyncMessenger为例简单介绍一下其工作原理。

原理

asyncmessenger

  • processors线程数量由cct->_conf->ms_async_op_threads决定
  • NetworkStackworkersprocessors一一对应。
  • processors收到请求会调创建AsyncConnection,并存入调用AsyncConnection实例的accept方法,accept通过EventCenter将由NetworkStackworkers调用AsyncConnection实例的process方法。(哈哈哈,绕吧,有点儿晕了吧~~~)
  • processors处理完accept请求后,将AsyncConnection实例存入accepting_conns,等待NetworkStack处理完成。
  • AsyncConnectionprocessNetworkStackworkers线程调用,并构建Message消息通过dispatch_queuems_fast_dispatch发送到fast_dispatchers

Message 格式

CEPH_MSGR_TAG_MSG

message_format

  • tag

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    #define CEPH_MSGR_TAG_READY         1  /* server->client: ready for messages */
    #define CEPH_MSGR_TAG_RESETSESSION 2 /* server->client: reset, try again */
    #define CEPH_MSGR_TAG_WAIT 3 /* server->client: wait for racing incoming connection */
    #define CEPH_MSGR_TAG_RETRY_SESSION 4 /* server->client + cseq: try again with higher cseq */
    #define CEPH_MSGR_TAG_RETRY_GLOBAL 5 /* server->client + gseq: try again with higher gseq */
    #define CEPH_MSGR_TAG_CLOSE 6 /* closing pipe */
    #define CEPH_MSGR_TAG_MSG 7 /* message */
    #define CEPH_MSGR_TAG_ACK 8 /* message ack */
    #define CEPH_MSGR_TAG_KEEPALIVE 9 /* just a keepalive byte! */
    #define CEPH_MSGR_TAG_BADPROTOVER 10 /* bad protocol version */
    #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
    #define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */
    #define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */
    #define CEPH_MSGR_TAG_KEEPALIVE2 14
    #define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive reply */
    #define CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER 16 /* ceph v2 doing server challenge */

    我觉得注释处的说明写的很清楚了,此处不做过多说明了。

  • header

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    struct ceph_msg_header {
    __le64 seq; /* message seq# for this session */
    __le64 tid; /* transaction id */
    __le16 type; /* message type */
    __le16 priority; /* priority. higher value == higher priority */
    __le16 version; /* version of message encoding */

    __le32 front_len; /* bytes in main payload */
    __le32 middle_len;/* bytes in middle payload */
    __le32 data_len; /* bytes of data payload */
    __le16 data_off; /* sender: include full offset;
    receiver: mask against ~PAGE_MASK */

    struct ceph_entity_name src;

    /* oldest code we think can decode this. unknown if zero. */
    __le16 compat_version;
    __le16 reserved;
    __le32 crc; /* header crc32c */
    } __attribute__ ((packed));
  • payload
    未知

  • middle
    未知
  • data
    具体传递的数据内容,数据大小由header中的data_len决定。
  • footer/old_footer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /*
    * follows data payload
    * ceph_msg_footer_old does not support digital signatures on messages PLR
    */
    struct ceph_msg_footer_old {
    __le32 front_crc, middle_crc, data_crc;
    __u8 flags;
    } __attribute__ ((packed));

    struct ceph_msg_footer {
    __le32 front_crc, middle_crc, data_crc;
    // sig holds the 64 bits of the digital signature for the message PLR
    __le64 sig;
    __u8 flags;
    } __attribute__ ((packed));

    footerold_footer之差一个签名sig