ConnectX
简介
ConnectX 是一个提供零成本抽象(zero-cost abstraction)的统一 pub/sub 接口。
开发指南
头文件
在开发一个最简单的 ConnectX 程序的时候,我们可以引入如下的 4 个头文件
cpp
// std::string 类消息的序列化类
#include "connectx/message/string.hpp"
// publisher: 消息发布者封装
#include "connectx/pubsub/publisher.h"
// subscriber:消息订阅者封装
#include "connectx/pubsub/subscriber.h"
// types: 通信协议类型
#include "connectx/types.h"
动态库
// TODO
使用 ConnectX 的时候需要导入动态库
命名空间
为了减少代码的长度,本文档默认使用了如下的命名空间
cpp
using minieye::connectx::ProtocolType;
using minieye::connectx::Publisher;
using minieye::connectx::Subscriber;
using minieye::connectx::message::StringSerializer;
序列化
为了让 ConnectX 服务端和客户端可以编码数据并且解码接收到的数据,ConnectX 提供了一个接口类 SerializerInterface
,你需要手动实现这个类下面的两个方法:
cpp
/**
* @brief 序列化方法(接口函数,需要派生类实现)。
*
* @param data 用户指定的 DataType 类型的消息数据。
* @return std::shared_ptr<SerializedDataInterface> 序列化结果数据的智能指针
*/
virtual std::shared_ptr<SerializedDataRefInterface> Serialize(
const DataType &data
) = 0;
/**
* @brief 反序列化方法(接口函数,需要派生类实现)。
*
* @param data 从下层通信协议接收到的消息数据。
* @param size 接收到的消息数据大小。
* @return std::shared_ptr<DataType> 反序列化后的用户消息数据。用智能指针封装,
方便后续用户使用时数据所有权的转移。
*/
virtual std::shared_ptr<DataType> Deserialize(
const void *data,
size_t size
) = 0;
ConnectX 内置实现了部分常用的数据类型的序列化类:
cpp
// std::string 类消息的序列化类
class StringSerializer : public SerializerInterface<std::string> {
// ...
}
// ProtoBuffer 消息的序列化类。
template <typename ProtoClass>
class ProtoSerializer : public SerializerInterface<ProtoClass> {
// ...
}
// FastddsSerializer 类,协助封装 Fastdds 协议
template <typename FastddsClass>
class FastddsSerializer : public SerializerInterface
<typename FastddsClass::type> {
// ...
}
我们会在之后的章节详细介绍这些类
服务端
ConnectX 的服务端和客户端都由一个 Context 管理,每个 Context 都需要传入一个配置,我们假设有如下的配置文件:
json
// /path/to/config.json
{
"participants": [{
"id": 1,
"protocol": "nanomsg",
"link_info": "tcp://127.0.0.1:12345"
}, {
"id": 2,
"protocol": "nanomsg",
"link_info": "ipc:///tmp/pubsub.ipc"
}]
}
我们可以用以下的方式获取一个服务端程序:
cpp
// 配置文件所在路径
const std::string configPath = "/path/to/config.json";
// 需要发布的话题
const std::string topic = "foo.test";
// 通信协议类型
const ProtocolType protoType = ProtocolType::kProtocolLibflow;
// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
// 额外的初始化会返回错误
auto errCode = minieye::connectx::Init(configPath);
if (errCode != ErrorCode::kCodeOk) {
std::cout << "failed to init context, error code is: "
<< errCode << std::endl;
return 0;
}
// 参数说明:
// 第一个参数:topic 发布消息的话题
// 第二个参数:通信域 id,虚拟 namespace
// 只有在同一个通信域中才可以通信
// 第三个参数:通信实体 id
// 对应 Init 时所传的配置文件中的其中一个 participant id。
// 如果不传或者 id <= 0,则系统从配置文件的节点列表中选择
// 与通信协议匹配的第一个节点
// 第四个参数:通信协议类型
// 如果传了 participant_id > 0,
// 则忽略 protocol 参数,使用配置文件中对应节点的通信类型
// 第五个参数:接收错误码的指针。如果为 nullptr,则不设置错误码。
// 模板参数: StringSerializer 提供了对 std::string 的序列化接口实现
Publisher<StringSerializer> publisher(topic, 0, 1, protoType, nullptr);
同样的,如果你不希望额外使用配置文件,也可以在代码中直接写配置:
cpp
// 配置文件所在路径
const std::string config = R"({
"participants": [{
"id": 1,
"protocol": "nanomsg",
"link_info": "tcp://127.0.0.1:12345"
}, {
"id": 2,
"protocol": "nanomsg",
"link_info": "ipc:///tmp/pubsub.ipc"
}]
})";
// 需要发布的主题
const std::string topic = "foo.test";
// 通信协议类型
const ProtocolType protoType = ProtocolType::kProtocolLibflow;
// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
// 额外的初始化会返回错误
auto errCode = minieye::connectx::Init(config);
if (errCode != ErrorCode::kCodeOk) {
std::cout << "failed to init context, error code is: "
<< errCode << std::endl;
return 0;
}
// 参数说明:
// 第一个参数:topic 发布消息的话题
// 第二个参数:通信域 id,虚拟 namespace
// 只有在同一个通信域中才可以通信
// 第三个参数:通信实体 id
// 对应 Init 时所传的配置文件中的其中一个 participant id。
// 如果不传或者 id <= 0,则系统从配置文件的节点列表中选择
// 与通信协议匹配的第一个节点
// 第四个参数:通信协议类型
// 如果传了 participant_id > 0,
// 则忽略 protocol 参数,使用配置文件中对应节点的通信类型
// 第五个参数:接收错误码的指针。如果为 nullptr,则不设置错误码。
// 模板参数: StringSerializer 提供了对 std::string 的序列化接口实现
Publisher<StringSerializer> publisher(topic, 0, 1, protoType, nullptr);
客户端
与服务端相同,客户端也需要使用一个 Context 来管理
cpp
/** /path/to/config.json
{
"participants": [{
"id": 1,
"protocol": "nanomsg",
"link_info": "tcp://127.0.0.1:12345"
}, {
"id": 2,
"protocol": "nanomsg",
"link_info": "ipc:///tmp/pubsub.ipc"
}]
}
*/
// 配置文件路径
const std::string configPath = "/path/to/config.json";
// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
// 额外的初始化会返回错误
auto errCode = minieye::connectx::Init(configPath);
if (errCode != ErrorCode::kCodeOk) {
std::cout << "failed to init context, error code is: "
<< errCode << std::endl;
return 0;
}
// ...
cpp
// 配置文件所在路径
const std::string config = R"({
"participants": [{
"id": 1,
"protocol": "nanomsg",
"link_info": "tcp://127.0.0.1:12345"
}, {
"id": 2,
"protocol": "nanomsg",
"link_info": "ipc:///tmp/pubsub.ipc"
}]
})";
// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
// 额外的初始化会返回错误
auto errCode = minieye::connectx::Init(config);
if (errCode != ErrorCode::kCodeOk) {
std::cout << "failed to init context, error code is: "
<< errCode << std::endl;
return 0;
}
// ...
为了能对服务端发送过来的数据做出相应的反馈,客户端使用了回调机制:
cpp
void callback(
const std::shared_ptr<std::string>& data,
const std::string& topic,
void* /*user_data*/
) {
std::cout << "Received data (topic: " << topic << "): "
<< *data
<< std::endl;
}
接下来我们就可以实例化一个客户端了:
cpp
// 需要发布的主题
const std::string topic = "test.foo";
// 通信协议类型
const ProtocolType protoType = ProtocolType::kProtocolLibflow;
// 参数说明:
// 第一个参数:topic 发布消息的话题
// 第二个参数:通信域 id,虚拟 namespace
// 只有在同一个通信域中才可以通信
// 第三个参数:用户定义的订阅回调函数,用户在回调函数中处理接收到的消息
// 第四个参数:通信实体 id
// 对应 Init 时所传的配置文件中的其中一个 participant id。
// 如果不传或者 id <= 0,则系统从配置文件的节点列表中选择
// 与通信协议匹配的第一个节点
// 第五个参数:通信协议类型
// 如果传了 participant_id > 0,
// 则忽略 protocol 参数,使用配置文件中对应节点的通信类型
// 第六个参数:接收错误码的指针。如果为 nullptr,则不设置错误码。
// 第七个参数:用户自定义数据。该数据会在回调函数中作为最后一个参数传递给用户
// 模板参数: StringSerializer 提供了对 std::string 的序列化接口实现
Subscriber<StringSerializer> subscriber(
topic, 0, callback, 1, protoType, nullptr, nullptr
);
客户端启动以后,会自动创建一个线程负责接收客户端传来的消息,并且触发回调函数
配置文件
ConnectX 的完整配置如下:
cpp
// 根据各种协议要求的配置格式(待补充)
{
// minieyedds 配置(可选)
"minieyedds_options": {
"topics": [
{
"topic": "adas",
"dds_mode": "shm",
"buff_num": 100,
"elem_max_size": 1000000
}
]
},
// fastdds 配置(可选)
"fastdds_option": {
"dds_qos_profiles_file": "./hb_dds_qos_profiles.xml"
},
"debug": 0, // debug模式开关 0-关闭 1-打开 默认为关闭状态(可选)
// 通信节点列表(必填)
"participants": [
{
"id": 1, // 节点ID(要求大于0)
"protocol": "nanomsg", // 通信协议:libflow, minieyedds, fastdds, nanomsg
"link_info": "tcp://127.0.0.1:12345" // 通信连接地址,nanomsg tcp 通信
},
{
"id": 2,
"protocol": "nanomsg",
"link_info": "ipc:///tmp/pubsub.ipc" // 通信连接地址,nanomsg ipc 通信
},
{
"id": 3,
"protocol": "libflow",
"link_info": "127.0.0.1:24012", // 通信连接地址,libflow 通信
// 可能在节点定义中支持协议相关配置(待定)
"protocol_options": {
"servers": "none"
}
},
{
"id": 4,
"protocol": "minieyedds",
"link_info": "", // 不需要填写,空字符串即可
"protocol_options": {
"log_level": 1, // log_level: 1: Error, 2: Warn, 3: Info, 4: Debug, 5: Trace
"topics": [
{
"topic": "adas",
"dds_mode": "shm", // dds_mode: "shm" or "redis"
"url": "tcp://127.0.0.1:8088", // 通信连接地址nanomsg url(非必须)
"buff_num": 100, // node numbers in fixed-size array
"elem_max_size": 1000000 // every node's max size (bytes)
},
{
"topic": "dms",
"dds_mode": "shm",
"url": "ipc:///tmp/demo_dds.ipc",
"buff_num": 100,
"elem_max_size": 1000000
}
]
}
}
]
}