MinieyeDDS
注意
本文档仅适用于 v1.18.1
及后续版本,之前版本请阅读: MinieyeDDS 归档文档
📦 发布文件
- libdds.so
- dds_api.h
- dds_capi.h
- dds_common.h
- config.json
原理介绍
MinieyeDDS 是一个基于 Rust 开发的消息中间件组件,它提供了包括 C、C++、Python、Rust 等接口,拥有良好的跨平台特性。
通信协议
MinieyeDDS 底层是通过共享内存进行通信的,同时引入了 Nanomsg 作为实现数据的发布订阅功能的组件。
消息传输
一个比较典型的传输场景是这样的:
- 启动生产者,初始化 MinieyeDDS,创建共享内存和 nanomsg 发布者
- 启动消费者,初始化 MinieyeDDS,尝试连接到共享内存、创建 nanomsg 订阅者并且订阅相关 topic
- 生产者发送数据,将数据写入共享内存,然后通过 nanomsg 发布消息
- 消费者通过 nanomsg 收到消息,访问共享内存以获取相关数据
架构设计
如下图所示,MinieyeDDS 在底层提供对不同的共享内存类型的封装,对上层提供统一的接口。上层可以基于共享内存的统一接口实现各种不同的数据结构。目前常见的数据结构有队列,数组,原子变量等等。再向上一层,是 MinieyeDDS 为了方便使用者快速入门而提供的一套接口,也可以称为 DDS API。DDS API 提供不同语言的封装。DDS API 目前是基于 nanomsg queue 这个数据结构实现的。
如下图所示,生产者会创建一个共享内存,同时会创建一个 nanomsg 的发布者。消费者会连接到共享内存上,同时创建一个 nanomsg 的订阅者。
MinieyeDDS 的共享内存分为两个部分:
- 索引数组:存储每个数据的唯一标识、数据的下标和数据的长度等信息
- 数据数组:存储用户写入的具体数据
开发指南
引入依赖
使用 C++ 开发时,需要引入以下头文件:
#include "dds_api.h"
同理,使用 C 开发时,需要引入以下头文件:
#include "dds_capi.h"
除了引入头文件,还需要链接动态库 libdds.so
服务端
与 Libflow 类似,DDS 的 Reader 和 Writer 都是由 Context 统一管理,当 Context 析构时,所有的 Reader 和 Writer 都将失效。
启动一个 MinieyeDDS 服务端之前,我们需要为其添加一个配置文件,配置文件应为 JSON 格式:
// config.json
{
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
}
有了配置文件以后,我们可以利用它实例化一个 MinieyeDDS 服务端程序:
using namespace minieye::DDS;
// 配置文件的路径
const std::string configPath = "/path/to/config.json";
// 通过配置文件实例化一个 Context 对象
Context ctx(configPath);
// 通过 Context 对象实例化一个 Writer 对象
// 第二个参数表示这个 writer 对象负责向话题 test 写入消息
Writer writer(&ctx, "test");
也可以不额外使用 json 文件进行配置:
using namespace minieye::DDS;
// 直接写配置
const std::string config = R"({
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
})";
// 实例化 ctx 对象,第一个参数是配置参数
// 第二个参数是一个占位符,表示直接从字符串读取配置
Context ctx(config, ContentHint());
Writer writer(&ctx, "test");
拥有 minieye::DDS::Writer
对象以后,就可以向客户端写入消息了:
// 写线程,负责写入数据到客户端
void funcWriter(Writer* writer) {
for (int i = 0; i < 30; i++) {
// 生产需要向客户端发送的消息
std::string payload = "Hello world " + std::to_string(i);
// 向 queue 写入数据
int ret = writer->Write(payload.c_str(), payload.size());
if (ret != DDSSuccess) {
printf("Write failed, ret = %d\n", ret);
}
std::cout << "Write data = " << payload << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
客户端
与服务端相同,客户端也需要使用一个 Context 来管理:
/** config.json
{
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
} */
using namespace minieye::DDS;
// 配置文件的路径
const std::string configPath = "/path/to/config.json";
// 通过配置文件实例化一个 Context 对象
Context ctx(configPath, true);
// 通过 Context 对象实例化一个 Reader 对象
// 第二个参数表示这个 reader 对象负责接收来自话题 test 的消息
Reader reader(&ctx, "test");
using namespace minieye::DDS;
// 直接写配置
const std::string config = R"({
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
})";
// 实例化 ctx 对象,第一个参数是配置参数
// 第二个参数是一个占位符,表示直接从字符串读取配置
Context ctx(config, ContentHint(), true);
Reader reader(&ctx, "test");
INFO
- 由于 MinieyeDDS 提供了最低限度的服务发现能力,所以实际上客户端并不会实际使用
buff_num
字段和elem_max_size
字段,但是本文还是建议您在生产环境中为Reader
和Writer
提供相同的配置。 - 如果
Context
类的最后一个参数被设置为true
,那么 DDS 会在后台启动一个独立的线程,这个线程会用来处理 DDS 的回调函数,因此无论何时,都请不要在回调函数中执行耗时任务,否则会导致意料外的丢帧
MinieyeDDS 客户端支持您通过传入回调函数的方式接收来自服务端的消息,当客户端收到来自服务端的消息时,将会触发回调函数:
// 使用回调之前,需要首先定义一个回调函数
void on_read_data(
const char* topic, // 主题
size_t index, // array 索引
void* ptr, // 接收到的数据地址
size_t size, // 接收到的数据长度
void* user // 用户自定义数据
) {
std::string content((char*)ptr, size);
std::cout //
<< "package received, idx: "
<< index
<< ", frame size: "
<< size
<< ", content: "
<< content
<< std::endl;
}
// 使用回调函数还需要在 Context 初始化的时候说明
int main() {
// ...
Context ctx("config.json", true);
// 从字符串读取配置文件的情况
// Context ctx(config, ContentHint(), true);
Reader reader(&ctx, topic, on_read_data);
// ...
return 0;
}
配置
MinieyeDDS 的配置采用 JSON 格式,包含两个字段:
log_level
一个数字,表示需要启用的日志等级,可选:
1: Error(推荐), 2: Warn, 3: Info, 4: Debug, 5: Trace, 0: close
topics
一个数组,数组中每个元素都记录对应 topic 的信息:
{
"topic": "topic_name", // topic 名称,需要确保唯一
"dds_mode": "shm", // dds 模式,默认 shm
"buff_num": 200, // node 的数量
"elem_max_size": 1000 // 每个 node 的体积(bytes)
}
注意
DDS 的共享内存是惰性申请惰性释放的,意味着:
- 如果不主动实例化 Writer,DDS 不会申请共享内存
- 如果实例化时共享内存已存在,DDS 会服用共享内存
- 如果不主动释放共享内存,DDS 不会释放共享内存
每个 topic 使用的共享内存大小 = 固定协议头部大小 + buff_num * elem_max_size + buff_num * 索引头部大小
因此使用 DDS 之前请确保剩余共享内存容量充裕 🚀
完整配置
一个完整的 DDS 配置如下:
{
"log_level": 1,
"topics": [{
"topic": "test-1",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}, {
"topic": "test-2",
"dds_mode": "shm",
"buf_num": 200,
"elem_max_size": 10000
}]
}
最佳实践
发布&订阅
在 发布&订阅 中:
- 我们创建一个服务端,服务端会尝试创建话题 test,并且 Publish 到所有订阅了该话题的客户端
- 我们创建一个客户端,这个客户端会订阅话题 test,并且输出所有从服务端接收到的话题
配置文件如下:
// config.json
{
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
}
代码如下:
#include <iostream>
#include "dds_api.h"
using namespace minieye::DDS;
const std::string config = R"({
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
})";
// 写线程,负责写入数据到客户端
void ThreadWriter(Writer* writer) {
for (int i = 0; i < 30; i++) {
std::string payload = "Hello world " + std::to_string(i);
// 向 queue 写入数据
int ret = writer->Write(payload.c_str(), payload.size());
if (ret != DDSSuccess) {
printf("Write failed, ret = %d\n", ret);
}
std::cout << "Write data = " << payload << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
int main() {
// 从文件路径中载入
// const std::string configPath = "/path/to/config.json";
// Context ctx(configPath);
// 使用文件内容载入
Context ctx(config, ContentHint());
Writer writer(&ctx, "test");
ThreadWriter(&writer);
return 0;
}
#include <signal.h>
#include <iostream>
#include <thread>
#include "dds_api.h"
using namespace minieye::DDS;
void on_read_data(const char* topic, size_t index, void* ptr, size_t size, void* user) {
std::string content((char*)ptr, size);
std::cout //
<< "package received, idx: "
<< index
<< ", frame size: "
<< size
<< ", content: "
<< content
<< std::endl;
}
int main() {
// 从文件路径中载入
// const std::string configPath = "/path/to/config.json";
// Context ctx(configPath, true);
// 使用文件内容载入
Context ctx(config, ContentHint(), true);
Reader reader(&ctx, topic, on_read_data);
std::this_thread::sleep_for(std::chrono::seconds(30));
return 0;
}
关闭 Context
关闭上下文是一种独特的优化手段,关闭 Context 后,DDS Reader 将不再接收来自 Writer 的数据,同时会退出后台线程,直到再次打开 Context。
- 我们首先创建一个服务端,这个服务端会尝试创建话题 test,并且 Publish 到所有订阅了该话题的客户端
- 我们创建一个客户端,这个客户端会尝试订阅话题 test,并且显示所有从服务端接收到的消息,在第 10s 的时候,客户端会尝试关闭 Context,意味着客户端会断开与服务端之间的连接,在第 20s 的时候,客户端会重新打开 Context
#include <signal.h>
#include <string.h>
#include <iostream>
#include <thread>
#include "dds_api.h"
using namespace minieye::DDS;
const std::string config = R"({
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
})";
void ThreadWriter(Writer* writer) {
int idx = 0;
while (true) {
char acData[1024] = {0};
sprintf(acData, "shared memory, hello dds, %d", idx++);
// 向 queue 写入数据
int ret = writer->Write(acData, strlen(acData) + 1);
if (ret != DDSSuccess) {
printf("Write failed, iRet = %d\n", ret);
}
printf("Write data = %s\n", acData);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
int main() {
Context ctx(config, ContentHint());
Writer writer(&ctx, "test");
std::thread t1(ThreadWriter, &writer);
for (int i = 0; i < 60; i++) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
t1.join();
return 0;
}
#include <signal.h>
#include <iostream>
#include <thread>
#include "dds_api.h"
using namespace minieye::DDS;
const std::string config = R"({
"log_level": 1,
"topics": [{
"topic": "test",
"dds_mode": "shm",
"buff_num": 200,
"elem_max_size": 10000
}]
})";
void onData(const char* topic, size_t index, void* ptr, size_t size, void* user) {
printf("[topic: %s] array[%lu] = %d\n", topic, index, *(int*)ptr);
}
int main() {
Context ctx(config, ContentHint(), true);
Reader reader(&ctx, "test", onData);
for (int i = 0; i < 60; i++) {
if (i == 10) {
ctx.DisableEventLoop();
std::cout << "disable" << std::endl;
}
if (i == 20) {
ctx.EnableEventLoop();
std::cout << "enable" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}