Skip to content

MinieyeDDS

注意

本文档仅适用于 v1.18.1 及后续版本,之前版本请阅读: MinieyeDDS 归档文档

📦 发布文件

  • libdds.so
  • dds_api.h
  • dds_capi.h
  • dds_common.h
  • config.json

原理介绍

MinieyeDDS 是一个基于 Rust 开发的消息中间件组件,它提供了包括 C、C++、Python、Rust 等接口,拥有良好的跨平台特性。

通信协议

MinieyeDDS 底层是通过共享内存进行通信的,同时引入了 Nanomsg 作为实现数据的发布订阅功能的组件。

消息传输

一个比较典型的传输场景是这样的:

  1. 启动生产者,初始化 MinieyeDDS,创建共享内存和 nanomsg 发布者
  2. 启动消费者,初始化 MinieyeDDS,尝试连接到共享内存、创建 nanomsg 订阅者并且订阅相关 topic
  3. 生产者发送数据,将数据写入共享内存,然后通过 nanomsg 发布消息
  4. 消费者通过 nanomsg 收到消息,访问共享内存以获取相关数据

架构设计

如下图所示,MinieyeDDS 在底层提供对不同的共享内存类型的封装,对上层提供统一的接口。上层可以基于共享内存的统一接口实现各种不同的数据结构。目前常见的数据结构有队列,数组,原子变量等等。再向上一层,是 MinieyeDDS 为了方便使用者快速入门而提供的一套接口,也可以称为 DDS API。DDS API 提供不同语言的封装。DDS API 目前是基于 nanomsg queue 这个数据结构实现的。


image2


如下图所示,生产者会创建一个共享内存,同时会创建一个 nanomsg 的发布者。消费者会连接到共享内存上,同时创建一个 nanomsg 的订阅者。

MinieyeDDS 的共享内存分为两个部分:

  1. 索引数组:存储每个数据的唯一标识、数据的下标和数据的长度等信息
  2. 数据数组:存储用户写入的具体数据

Alt text


开发指南

引入依赖

使用 C++ 开发时,需要引入以下头文件:

cpp
#include "dds_api.h"

同理,使用 C 开发时,需要引入以下头文件:

cpp
#include "dds_capi.h"

除了引入头文件,还需要链接动态库 libdds.so

服务端

与 Libflow 类似,DDS 的 Reader 和 Writer 都是由 Context 统一管理,当 Context 析构时,所有的 Reader 和 Writer 都将失效。

启动一个 MinieyeDDS 服务端之前,我们需要为其添加一个配置文件,配置文件应为 JSON 格式:

json
// config.json

{
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
}

有了配置文件以后,我们可以利用它实例化一个 MinieyeDDS 服务端程序:

cpp
using namespace minieye::DDS;

// 配置文件的路径
const std::string configPath = "/path/to/config.json";

// 通过配置文件实例化一个 Context 对象
Context ctx(configPath);

// 通过 Context 对象实例化一个 Writer 对象
// 第二个参数表示这个 writer 对象负责向话题 test 写入消息
Writer writer(&ctx, "test");

也可以不额外使用 json 文件进行配置:

cpp
using namespace minieye::DDS;

// 直接写配置
const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 实例化 ctx 对象,第一个参数是配置参数
// 第二个参数是一个占位符,表示直接从字符串读取配置
Context ctx(config, ContentHint());
Writer writer(&ctx, "test");

拥有 minieye::DDS::Writer 对象以后,就可以向客户端写入消息了:

cpp
// 写线程,负责写入数据到客户端
void funcWriter(Writer* writer) {
  for (int i = 0; i < 30; i++) {
    // 生产需要向客户端发送的消息
    std::string payload = "Hello world " + std::to_string(i);

    // 向 queue 写入数据
    int ret = writer->Write(payload.c_str(), payload.size());
    if (ret != DDSSuccess) {
      printf("Write failed, ret = %d\n", ret);
    }

    std::cout << "Write data = " << payload << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

客户端

与服务端相同,客户端也需要使用一个 Context 来管理:

cpp
/** config.json
{
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
} */

using namespace minieye::DDS;

// 配置文件的路径
const std::string configPath = "/path/to/config.json";

// 通过配置文件实例化一个 Context 对象
Context ctx(configPath, true);

// 通过 Context 对象实例化一个 Reader 对象
// 第二个参数表示这个 reader 对象负责接收来自话题 test 的消息
Reader reader(&ctx, "test");
cpp
using namespace minieye::DDS;

// 直接写配置
const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 实例化 ctx 对象,第一个参数是配置参数
// 第二个参数是一个占位符,表示直接从字符串读取配置
Context ctx(config, ContentHint(), true);
Reader reader(&ctx, "test");

INFO

  1. 由于 MinieyeDDS 提供了最低限度的服务发现能力,所以实际上客户端并不会实际使用 buff_num 字段和 elem_max_size 字段,但是本文还是建议您在生产环境中为 ReaderWriter 提供相同的配置。
  2. 如果 Context 类的最后一个参数被设置为 true,那么 DDS 会在后台启动一个独立的线程,这个线程会用来处理 DDS 的回调函数,因此无论何时,都请不要在回调函数中执行耗时任务,否则会导致意料外的丢帧

MinieyeDDS 客户端支持您通过传入回调函数的方式接收来自服务端的消息,当客户端收到来自服务端的消息时,将会触发回调函数:

cpp
// 使用回调之前,需要首先定义一个回调函数
void on_read_data(
    const char* topic,  // 主题
    size_t index,       // array 索引
    void* ptr,          // 接收到的数据地址
    size_t size,        // 接收到的数据长度
    void* user          // 用户自定义数据
) {
  std::string content((char*)ptr, size);
  std::cout //
    << "package received, idx: " 
    << index 
    << ", frame size: " 
    << size 
    << ", content: " 
    << content 
    << std::endl;
}

// 使用回调函数还需要在 Context 初始化的时候说明
int main() {
  // ...  
  
  Context ctx("config.json", true);
  // 从字符串读取配置文件的情况
  // Context ctx(config, ContentHint(), true);
  Reader reader(&ctx, topic, on_read_data);
  
  // ...
  return 0;
}

配置

MinieyeDDS 的配置采用 JSON 格式,包含两个字段:

log_level

一个数字,表示需要启用的日志等级,可选:

1: Error(推荐), 2: Warn, 3: Info, 4: Debug, 5: Trace, 0: close

topics

一个数组,数组中每个元素都记录对应 topic 的信息:

json
{
    "topic":            "topic_name", // topic 名称,需要确保唯一
    "dds_mode":         "shm",        // dds 模式,默认 shm
    "buff_num":         200,          // node 的数量
    "elem_max_size":    1000          // 每个 node 的体积(bytes)
}

注意

DDS 的共享内存是惰性申请惰性释放的,意味着:

  1. 如果不主动实例化 Writer,DDS 不会申请共享内存
  2. 如果实例化时共享内存已存在,DDS 会服用共享内存
  3. 如果不主动释放共享内存,DDS 不会释放共享内存

每个 topic 使用的共享内存大小 = 固定协议头部大小 + buff_num * elem_max_size + buff_num * 索引头部大小

因此使用 DDS 之前请确保剩余共享内存容量充裕 🚀

完整配置

一个完整的 DDS 配置如下:

json
{
    "log_level": 1,
    "topics": [{
        "topic": "test-1",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }, {
        "topic": "test-2",
        "dds_mode": "shm",
        "buf_num": 200,
        "elem_max_size": 10000
    }]
}

最佳实践

发布&订阅

在 发布&订阅 中:

  1. 我们创建一个服务端,服务端会尝试创建话题 test,并且 Publish 到所有订阅了该话题的客户端
  2. 我们创建一个客户端,这个客户端会订阅话题 test,并且输出所有从服务端接收到的话题

配置文件如下:

json
// config.json

{
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
}

代码如下:

cpp
#include <iostream>

#include "dds_api.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 写线程,负责写入数据到客户端
void ThreadWriter(Writer* writer) {
  for (int i = 0; i < 30; i++) {
    std::string payload = "Hello world " + std::to_string(i);

    // 向 queue 写入数据
    int ret = writer->Write(payload.c_str(), payload.size());
    if (ret != DDSSuccess) {
      printf("Write failed, ret = %d\n", ret);
    }

    std::cout << "Write data = " << payload << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

int main() {
  // 从文件路径中载入
  // const std::string configPath = "/path/to/config.json";
  // Context ctx(configPath);

  // 使用文件内容载入
  Context ctx(config, ContentHint());

  Writer writer(&ctx, "test");

  ThreadWriter(&writer);
  return 0;
}
cpp
#include <signal.h>

#include <iostream>
#include <thread>

#include "dds_api.h"

using namespace minieye::DDS;

void on_read_data(const char* topic, size_t index, void* ptr, size_t size, void* user) {
  std::string content((char*)ptr, size);
  std::cout //
    << "package received, idx: " 
    << index 
    << ", frame size: " 
    << size 
    << ", content: " 
    << content 
    << std::endl;
}

int main() {
  // 从文件路径中载入
  // const std::string configPath = "/path/to/config.json";
  // Context ctx(configPath, true);

  // 使用文件内容载入
  Context ctx(config, ContentHint(), true);

  Reader reader(&ctx, topic, on_read_data);

  std::this_thread::sleep_for(std::chrono::seconds(30));

  return 0;
}

关闭 Context

关闭上下文是一种独特的优化手段,关闭 Context 后,DDS Reader 将不再接收来自 Writer 的数据,同时会退出后台线程,直到再次打开 Context。

  1. 我们首先创建一个服务端,这个服务端会尝试创建话题 test,并且 Publish 到所有订阅了该话题的客户端
  2. 我们创建一个客户端,这个客户端会尝试订阅话题 test,并且显示所有从服务端接收到的消息,在第 10s 的时候,客户端会尝试关闭 Context,意味着客户端会断开与服务端之间的连接,在第 20s 的时候,客户端会重新打开 Context
cpp
#include <signal.h>
#include <string.h>

#include <iostream>
#include <thread>

#include "dds_api.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

void ThreadWriter(Writer* writer) {
  int idx = 0;
  while (true) {
    char acData[1024] = {0};
    sprintf(acData, "shared memory, hello dds, %d", idx++);

    // 向 queue 写入数据
    int ret = writer->Write(acData, strlen(acData) + 1);
    if (ret != DDSSuccess) {
      printf("Write failed, iRet = %d\n", ret);
    }

    printf("Write data = %s\n", acData);

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

int main() {
  Context ctx(config, ContentHint());
  Writer writer(&ctx, "test");
  std::thread t1(ThreadWriter, &writer);

  for (int i = 0; i < 60; i++) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  t1.join();

  return 0;
}
cpp
#include <signal.h>

#include <iostream>
#include <thread>

#include "dds_api.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

void onData(const char* topic, size_t index, void* ptr, size_t size, void* user) {
  printf("[topic: %s] array[%lu] = %d\n", topic, index, *(int*)ptr);
}

int main() {
  Context ctx(config, ContentHint(), true);
  Reader reader(&ctx, "test", onData);

  for (int i = 0; i < 60; i++) {
    if (i == 10) {
      ctx.DisableEventLoop();  
      std::cout << "disable" << std::endl;
    }
    if (i == 20) {
      ctx.EnableEventLoop();  
      std::cout << "enable" << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  return 0;
}

智能汽车赋能者