Skip to content

ConnectX

简介

ConnectX 是一个提供零成本抽象(zero-cost abstraction)的统一 pub/sub 接口。

开发指南

头文件

在开发一个最简单的 ConnectX 程序的时候,我们可以引入如下的 4 个头文件

cpp
// std::string 类消息的序列化类
#include "connectx/message/string.hpp"

// publisher: 消息发布者封装
#include "connectx/pubsub/publisher.h"

// subscriber:消息订阅者封装
#include "connectx/pubsub/subscriber.h"

// types: 通信协议类型
#include "connectx/types.h"

动态库

// TODO

使用 ConnectX 的时候需要导入动态库

命名空间

为了减少代码的长度,本文档默认使用了如下的命名空间

cpp
using minieye::connectx::ProtocolType;
using minieye::connectx::Publisher;
using minieye::connectx::Subscriber;
using minieye::connectx::message::StringSerializer;

序列化

为了让 ConnectX 服务端和客户端可以编码数据并且解码接收到的数据,ConnectX 提供了一个接口类 SerializerInterface,你需要手动实现这个类下面的两个方法:

cpp
/**
* @brief 序列化方法(接口函数,需要派生类实现)。
*
* @param data 用户指定的 DataType 类型的消息数据。
* @return std::shared_ptr<SerializedDataInterface> 序列化结果数据的智能指针
*/
virtual std::shared_ptr<SerializedDataRefInterface> Serialize(
    const DataType &data
) = 0;


/**
* @brief 反序列化方法(接口函数,需要派生类实现)。
*
* @param data 从下层通信协议接收到的消息数据。
* @param size 接收到的消息数据大小。
* @return std::shared_ptr<DataType> 反序列化后的用户消息数据。用智能指针封装,
                                    方便后续用户使用时数据所有权的转移。
*/
virtual std::shared_ptr<DataType> Deserialize(
    const void *data, 
    size_t size
) = 0;

ConnectX 内置实现了部分常用的数据类型的序列化类:

cpp
// std::string 类消息的序列化类
class StringSerializer : public SerializerInterface<std::string> {
    // ...
}

// ProtoBuffer 消息的序列化类。
template <typename ProtoClass>
class ProtoSerializer : public SerializerInterface<ProtoClass> {
    // ...
}

// FastddsSerializer 类,协助封装 Fastdds 协议
template <typename FastddsClass>
class FastddsSerializer : public SerializerInterface
<typename FastddsClass::type> {
    // ...
}

我们会在之后的章节详细介绍这些类

服务端

ConnectX 的服务端和客户端都由一个 Context 管理,每个 Context 都需要传入一个配置,我们假设有如下的配置文件:

json
// /path/to/config.json
{
    "participants": [{
        "id": 1,
        "protocol": "nanomsg",
        "link_info": "tcp://127.0.0.1:12345"
    }, {
        "id": 2,
        "protocol": "nanomsg",
        "link_info": "ipc:///tmp/pubsub.ipc"
    }]
}

我们可以用以下的方式获取一个服务端程序:

cpp
// 配置文件所在路径
const std::string configPath = "/path/to/config.json";
// 需要发布的话题
const std::string topic = "foo.test";
// 通信协议类型
const ProtocolType protoType = ProtocolType::kProtocolLibflow;

// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
//         额外的初始化会返回错误
auto errCode = minieye::connectx::Init(configPath);
if (errCode != ErrorCode::kCodeOk) {
    std::cout << "failed to init context, error code is: " 
              << errCode << std::endl;
    return 0;
}

// 参数说明:
//      第一个参数:topic 发布消息的话题
//      第二个参数:通信域 id,虚拟 namespace
//                只有在同一个通信域中才可以通信
//      第三个参数:通信实体 id 
//                对应 Init 时所传的配置文件中的其中一个 participant id。
//                如果不传或者 id <= 0,则系统从配置文件的节点列表中选择
//                与通信协议匹配的第一个节点
//      第四个参数:通信协议类型
//                如果传了 participant_id > 0,
//                则忽略 protocol 参数,使用配置文件中对应节点的通信类型
//      第五个参数:接收错误码的指针。如果为 nullptr,则不设置错误码。
//      模板参数:  StringSerializer 提供了对 std::string 的序列化接口实现
Publisher<StringSerializer> publisher(topic, 0, 1, protoType, nullptr);

同样的,如果你不希望额外使用配置文件,也可以在代码中直接写配置:

cpp
// 配置文件所在路径
const std::string config = R"({
    "participants": [{
        "id": 1,
        "protocol": "nanomsg",
        "link_info": "tcp://127.0.0.1:12345"
    }, {
        "id": 2,
        "protocol": "nanomsg",
        "link_info": "ipc:///tmp/pubsub.ipc"
    }]
})";

// 需要发布的主题
const std::string topic = "foo.test";

// 通信协议类型
const ProtocolType protoType = ProtocolType::kProtocolLibflow;

// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
//         额外的初始化会返回错误
auto errCode = minieye::connectx::Init(config);
if (errCode != ErrorCode::kCodeOk) {
    std::cout << "failed to init context, error code is: " 
              << errCode << std::endl;
    return 0;
}

// 参数说明:
//      第一个参数:topic 发布消息的话题
//      第二个参数:通信域 id,虚拟 namespace
//                只有在同一个通信域中才可以通信
//      第三个参数:通信实体 id 
//                对应 Init 时所传的配置文件中的其中一个 participant id。
//                如果不传或者 id <= 0,则系统从配置文件的节点列表中选择
//                与通信协议匹配的第一个节点
//      第四个参数:通信协议类型
//                如果传了 participant_id > 0,
//                则忽略 protocol 参数,使用配置文件中对应节点的通信类型
//      第五个参数:接收错误码的指针。如果为 nullptr,则不设置错误码。
//      模板参数:  StringSerializer 提供了对 std::string 的序列化接口实现
Publisher<StringSerializer> publisher(topic, 0, 1, protoType, nullptr);

客户端

与服务端相同,客户端也需要使用一个 Context 来管理

cpp
/** /path/to/config.json
{
    "participants": [{
        "id": 1,
        "protocol": "nanomsg",
        "link_info": "tcp://127.0.0.1:12345"
    }, {
        "id": 2,
        "protocol": "nanomsg",
        "link_info": "ipc:///tmp/pubsub.ipc"
    }]
}
*/

// 配置文件路径
const std::string configPath = "/path/to/config.json";

// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
//         额外的初始化会返回错误
auto errCode = minieye::connectx::Init(configPath);
if (errCode != ErrorCode::kCodeOk) {
    std::cout << "failed to init context, error code is: " 
              << errCode << std::endl;
    return 0;
}

// ...
cpp
// 配置文件所在路径
const std::string config = R"({
    "participants": [{
        "id": 1,
        "protocol": "nanomsg",
        "link_info": "tcp://127.0.0.1:12345"
    }, {
        "id": 2,
        "protocol": "nanomsg",
        "link_info": "ipc:///tmp/pubsub.ipc"
    }]
})";

// 初始化 Context
// !! 注意:在任何时候都不应该在同一个进程中多次初始化
//         额外的初始化会返回错误
auto errCode = minieye::connectx::Init(config);
if (errCode != ErrorCode::kCodeOk) {
    std::cout << "failed to init context, error code is: " 
              << errCode << std::endl;
    return 0;
}

// ...

为了能对服务端发送过来的数据做出相应的反馈,客户端使用了回调机制:

cpp
void callback(
    const std::shared_ptr<std::string>& data, 
    const std::string& topic, 
    void* /*user_data*/
) {
  std::cout << "Received data (topic: " << topic << "): " 
            << *data 
            << std::endl;
}

接下来我们就可以实例化一个客户端了:

cpp
// 需要发布的主题
const std::string topic = "test.foo";
// 通信协议类型
const ProtocolType protoType = ProtocolType::kProtocolLibflow;

// 参数说明:
//      第一个参数:topic 发布消息的话题
//      第二个参数:通信域 id,虚拟 namespace
//                只有在同一个通信域中才可以通信
//      第三个参数:用户定义的订阅回调函数,用户在回调函数中处理接收到的消息
//      第四个参数:通信实体 id 
//                对应 Init 时所传的配置文件中的其中一个 participant id。
//                如果不传或者 id <= 0,则系统从配置文件的节点列表中选择
//                与通信协议匹配的第一个节点
//      第五个参数:通信协议类型
//                如果传了 participant_id > 0,
//                则忽略 protocol 参数,使用配置文件中对应节点的通信类型
//      第六个参数:接收错误码的指针。如果为 nullptr,则不设置错误码。
//      第七个参数:用户自定义数据。该数据会在回调函数中作为最后一个参数传递给用户
//      模板参数:  StringSerializer 提供了对 std::string 的序列化接口实现
Subscriber<StringSerializer> subscriber(
    topic, 0, callback, 1, protoType, nullptr, nullptr
);

客户端启动以后,会自动创建一个线程负责接收客户端传来的消息,并且触发回调函数

配置文件

ConnectX 的完整配置如下:

cpp
// 根据各种协议要求的配置格式(待补充)
{
    // minieyedds 配置(可选)
    "minieyedds_options": {
        "topics": [
            {
                "topic": "adas",
                "dds_mode": "shm",
                "buff_num": 100,
                "elem_max_size": 1000000
            }
        ]
    },
    // fastdds 配置(可选)
    "fastdds_option": {
        "dds_qos_profiles_file": "./hb_dds_qos_profiles.xml"
    },
    "debug": 0, // debug模式开关 0-关闭 1-打开 默认为关闭状态(可选)
    // 通信节点列表(必填)
    "participants": [
        {
            "id": 1, // 节点ID(要求大于0)
            "protocol": "nanomsg", // 通信协议:libflow, minieyedds, fastdds, nanomsg
            "link_info": "tcp://127.0.0.1:12345" // 通信连接地址,nanomsg tcp 通信
        },
        {
            "id": 2,
            "protocol": "nanomsg",
            "link_info": "ipc:///tmp/pubsub.ipc" // 通信连接地址,nanomsg ipc 通信
        },
        {
            "id": 3,
            "protocol": "libflow",
            "link_info": "127.0.0.1:24012", // 通信连接地址,libflow 通信
            // 可能在节点定义中支持协议相关配置(待定)
            "protocol_options": {
                "servers": "none"
            }
        },
        {
            "id": 4,
            "protocol": "minieyedds",
            "link_info": "", // 不需要填写,空字符串即可
            "protocol_options": {
                "log_level": 1, // log_level: 1: Error, 2: Warn, 3: Info, 4: Debug, 5: Trace
                "topics": [
                    {
                        "topic": "adas",
                        "dds_mode": "shm", // dds_mode: "shm" or "redis"
                        "url": "tcp://127.0.0.1:8088", // 通信连接地址nanomsg url(非必须)
                        "buff_num": 100, // node numbers in fixed-size array
                        "elem_max_size": 1000000 // every node's max size (bytes)
                    },
                    {
                        "topic": "dms",
                        "dds_mode": "shm",
                        "url": "ipc:///tmp/demo_dds.ipc",
                        "buff_num": 100,
                        "elem_max_size": 1000000
                    }
                ]
            }
        }
    ]
}

智能汽车赋能者