mosquitto是一款实现了消息推送协议MQTT v3.1的开源消息代理软件,提供轻量级的,可支持发布/可订阅的消息推送模式。
出版/订阅模式
出版/订阅模式定义了如何向一 个节点发布和订阅消息,这些节点被称作主题(topic)。主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者 (subscriber) 从主题订阅消息。这种模式使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。
该模式使用场景
TCP协议提供一对一的可靠传输,每个TCP连接由五个元素唯一确定:源IP地址、源端口号、目的IP地址、目的端口、通信协议。但在实际过程中,通信的终端数目是多个,需要维持的通信关系是多对多的。此时,每个参与通信的客户端所需维持的链接数将很庞大,而出版/订阅模式就是一种解决方法。它通过增加一个中间件的方式简化问题,让中间件维护这种多对多的关系。
mosquitto源码中重要的数据结构
1)struct mosquitto用来保存一个客户端连接的所有信息,如用户名、密码、用户ID、向该客户端发送的消息等。(在mosquitto_internal.h定义)
struct mosquitto { #ifndef WIN32 int sock;
# ifndef WITH_BROKER int sockpairR, sockpairW;
# endif #else
SOCKET sock;
# ifndef WITH_BROKER
SOCKET sockpairR, sockpairW; //sockpairR用来在超市前跳出select()
# endif #endif
enum _mosquitto_protocol protocol; char *address; char *id; char *username; char *password; uint16_t keepalive; bool clean_session;
enum mosquitto_client_state state; time_t last_msg_in; time_t last_msg_out; time_t ping_t; uint16_t last_mid;
struct _mosquitto_packet in_packet;
struct _mosquitto_packet *current_out_packet; //当前处理(发送)的节点
struct _mosquitto_packet *out_packet; // 待发送的packet队列的头结点
struct mosquitto_message *will; #ifdef WITH_TLS SSL *ssl;
SSL_CTX *ssl_ctx; char *tls_cafile;
char *tls_capath; char *tls_certfile; char *tls_keyfile;
int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);
int tls_cert_reqs; char *tls_version; char *tls_ciphers; char *tls_psk;
char *tls_psk_identity; bool tls_insecure; #endif
bool want_write;
#if defined(WITH_THREADING) && !defined(WITH_BROKER) pthread_mutex_t callback_mutex; pthread_mutex_t log_callback_mutex; pthread_mutex_t msgtime_mutex;
pthread_mutex_t out_packet_mutex; //标记是否存在待发送的packet队列
pthread_mutex_t current_out_packet_mutex;//标记当前是否有packet要发送
pthread_mutex_t state_mutex; pthread_mutex_t in_message_mutex; pthread_mutex_t out_message_mutex; pthread_t thread_id; #endif
#ifdef WITH_BROKER bool is_bridge;
struct _mqtt3_bridge *bridge; struct mosquitto_client_msg *msgs;
struct mosquitto_client_msg *last_msg; int msg_count; int msg_count12;
struct _mosquitto_acl_user *acl_list; struct _mqtt3_listener *listener; time_t disconnect_t; int pollfd_index; int db_index;
struct _mosquitto_packet *out_packet_last; bool is_dropping; #else
void *userdata; bool in_callback;
unsigned int message_retry; time_t last_retry_check;
struct mosquitto_message_all *in_messages; struct mosquitto_message_all *in_messages_last; struct mosquitto_message_all *out_messages; struct mosquitto_message_all *out_messages_last;
void (*on_connect)(struct mosquitto *, void *userdata, int rc); void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_publish)(struct mosquitto *, void *userdata, int mid);
void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
//void (*on_error)(); char *host; int port;
int in_queue_len; int out_queue_len; char *bind_address;
unsigned int reconnect_delay; unsigned int reconnect_delay_max; bool reconnect_exponential_backoff; bool threaded;
struct _mosquitto_packet *out_packet_last; //标记待发送packet队列的末尾一个元素
int inflight_messages; int max_inflight_messages; # ifdef WITH_SRV
ares_channel achan; // ares_init函数中初始化,返回一个交流通道
# endif #endif };
2)struct mosquitto_db在mosquitto_broker.h中定义,是mosquitto对所有内部数据的统一管理结构,可以认为是其内部的一个内存数据库,它保存了所有客户端,所有客户端的订阅关系等。
struct mosquitto_db{
dbid_t last_db_id;
struct _mosquitto_subhier subs; struct _mosquitto_unpwd *unpwd; struct _mosquitto_acl_user *acl_list; struct _mosquitto_acl *acl_patterns; struct _mosquitto_unpwd *psk_id;
struct mosquitto **contexts; //各个客户的情况,相当于数组,每创建一个客户就增加空间
struct _clientid_index_hash *clientid_index_hash; int context_count; //初始值为1,与客户端的数量相关 struct mosquitto_msg_store *msg_store; int msg_store_count; struct mqtt3_config *config; int persistence_changes;
struct _mosquitto_auth_plugin auth_plugin; int subscription_count;
int retained_count; //当前系统中retain标记的消息的条数 };
3)struct _mosquitto_subhier在mosquitto_broker.h中定义, 用于保存订阅树的节点(包括叶子节点和中间节点),mosquitto对订阅树采用孩子-兄弟链表法的方式进行存储,该存储方式主要借助于数据结构struct _mosquitto_subhier来完成。
struct _mosquitto_subhier {
struct _mosquitto_subhier *children; //第一个孩子节点 struct _mosquitto_subhier *next; //下一个兄弟节点 struct _mosquitto_subleaf *subs; //订阅列表 char *topic; //该节点对应的Topic片段 struct mosquitto_msg_store *retained;//该topic下被retain标
记的消息
};
4)struct _mosquitto_subleaf 在mosquitto_broker.h中定义,程序中,对某一个Topic的所有订阅者被组织成一个订阅列表,该订阅列表是一个双向链表,链表的每一个节点都保存有一个订阅者,该链表的节点即是:struct _mosquitto_subleaf
struct _mosquitto_subleaf { struct _mosquitto_subleaf *prev; struct _mosquitto_subleaf *next; struct mosquitto *context; int qos; };
5)struct mqtt3_config在mosquitto_broker.h中定义, 用于保存mosquitto 的所有配置信息, mosquitto程序在启动时将初始化该结构体并从配置文件中读取配置信息保存于该结构体变量内。
struct mqtt3_config { char *config_file; char *acl_file;
bool allow_anonymous; bool allow_duplicate_messages; bool allow_zero_length_clientid; char *auto_id_prefix; int auto_id_prefix_len; int autosave_interval; bool autosave_on_changes; char *clientid_prefixes;
bool connection_messages; bool daemon;
struct _mqtt3_listener default_listener; struct _mqtt3_listener *listeners; int listener_count; int log_dest; int log_type; bool log_timestamp; char *log_file; FILE *log_fptr;
int message_size_limit; char *password_file; bool persistence;
char *persistence_location; char *persistence_file; char *persistence_filepath; time_t persistent_client_expiration; char *pid_file; char *psk_file;
bool queue_qos0_messages; int retry_interval; int store_clean_interval; int sys_interval;
bool upgrade_outgoing_qos; char *user; bool verbose; #ifdef WITH_BRIDGE
struct _mqtt3_bridge *bridges; int bridge_count; #endif
char *auth_plugin;
struct mosquitto_auth_opt *auth_options; int auth_option_count; };
6)struct _sub_token 在src/subs.c文件中定义,是将Topic分成Topic片段组成链表节点的基本结构。
struct _sub_token { struct _sub_token *next; char *topic; };
因篇幅问题不能全部显示,请点此查看更多更全内容