Skip to content

MinieyeDDS

注意

本文档仅适用于 v1.18.0(不包含) 之前版本

📦 发布文件

  • libdds.so
  • libdds_api.so
  • libnanomsg.so
  • dds_api.h
  • dds_capi.h
  • dds_common.h
  • config.json

原理介绍

MinieyeDDS 是一个基于 Rust 构建的消息中间件组件,它提供了包括 C、C++、Python、Rust 等接口,拥有良好的跨平台特性。

通信协议

MinieyeDDS 底层是通过共享内存进行通信的,同时引入了 Nanomsg 作为实现数据的发布订阅功能的组件。

消息传输

一个比较典型的传输场景是这样的:

  1. 启动生产者,初始化 MinieyeDDS,创建共享内存和 nanomsg 发布者
  2. 启动消费者,初始化 MinieyeDDS,尝试连接到共享内存、创建 nanomsg 订阅者并且订阅相关 topic
  3. 生产者发送数据,将数据写入共享内存,然后通过 nanomsg 发布消息
  4. 消费者通过 nanomsg 收到消息,访问共享内存以获取相关数据

架构设计

如下图所示,MinieyeDDS 在底层提供对不同的共享内存类型的封装,对上层提供统一的接口。上层可以基于共享内存的统一接口实现各种不同的数据结构。目前常见的数据结构有队列,数组,原子变量等等。再向上一层,是 MinieyeDDS 为了方便使用者快速入门而提供的一套接口,也可以称为 DDS API。DDS API 提供不同语言的封装。DDS API 目前是基于 nanomsg queue 这个数据结构实现的。


image2


如下图所示,生产者会创建一个共享内存,同时会创建一个 nanomsg 的发布者。消费者会连接到共享内存上,同时创建一个 nanomsg 的订阅者。

MinieyeDDS 的共享内存分为两个部分:

  1. 索引数组:存储每个数据的唯一标识、数据的下标和数据的长度等信息
  2. 数据数组:存储用户写入的具体数据

Alt text


开发指南

引入 MinieyeDDS

使用 C++ 开发时,需要引入以下两个头文件

cpp
#include "dds_api.h"
#include "dds_common.h"

同理,使用 C 开发时,需要引入以下两个头文件

cpp
#include "dds_capi.h"
#include "dds_common.h"

除了引入上述的头文件以外,还需要引入共享库 libdds.solibdds_api.so libnanomsg.so

服务端

与 Libflow 类似,每个服务端都需要由一个 Context 管理,并且在一个进程中,只能初始化一个 Context,额外的初始化可能会导致程序的崩溃。

启动一个 MinieyeDDS 服务端之前,我们需要为其添加一个配置文件,配置文件应为 JSON 格式:

json
// config.json

{
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
}

有了配置文件以后,我们可以利用它实例化一个 MinieyeDDS 服务端程序:

cpp
using namespace minieye::DDS;

// 配置文件的路径
const std::string configPath = "/path/to/config.json";

// 通过配置文件实例化一个 Context 对象
Context ctx(configPath);

// 通过 Context 对象实例化一个 Writer 对象
// 第二个参数表示这个 writer 对象负责向话题 test 写入消息
Writer writer(&ctx, "test");

也可以不额外使用 json 文件进行配置:

cpp
using namespace minieye::DDS;

// 直接写配置
const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 实例化 ctx 对象,第一个参数是配置参数
// 第二个参数是一个占位符,表示直接从字符串读取配置
Context ctx(config, ContentHint());
Writer writer(&ctx, "test");

拥有 minieye::DDS::Writer 对象以后,就可以向客户端写入消息了:

cpp
// 写线程,负责写入数据到客户端
void funcWriter(Writer* writer) {
  for (int i = 0; i < 30; i++) {
    // 生产需要向客户端发送的消息
    std::string payload = "Hello world " + std::to_string(i);

    // 向 queue 写入数据
    int ret = writer->Write(payload.c_str(), payload.size());
    if (ret != DDSSuccess) {
      printf("Write failed, ret = %d\n", ret);
    }

    std::cout << "Write data = " << payload << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

客户端

与服务端相同,客户端也需要使用一个 Context 来管理:

cpp
/** config.json
{
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
} */

using namespace minieye::DDS;

// 配置文件的路径
const std::string configPath = "/path/to/config.json";

// 通过配置文件实例化一个 Context 对象
Context ctx(configPath);

// 通过 Context 对象实例化一个 Reader 对象
// 第二个参数表示这个 reader 对象负责向话题 test 写入消息
Reader reader(&ctx, "test");
cpp
using namespace minieye::DDS;

// 直接写配置
const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 实例化 ctx 对象,第一个参数是配置参数
// 第二个参数是一个占位符,表示直接从字符串读取配置
Context ctx(config, ContentHint());
Reader reader(&ctx, "test");

请注意

因为服务端和客户端会连接到同一块共享内存,因此请确保服务端和客户端的配置文件相同

MinieyeDDS 客户端提供了两种方式来读取从服务端发送的消息:

cpp
// 使用回调之前,需要首先定义一个回调函数
void on_read_data(
    const char* topic,  // 主题
    size_t index,       // array 索引
    void* ptr,          // 接收到的数据地址
    size_t size,        // 接收到的数据长度
    void* user          // 用户自定义数据
) {
  std::string content((char*)ptr, size);
  std::cout << "package received, idx: " 
            << index 
            << ", frame size: " 
            << size 
            << ", content: " 
            << content 
            << std::endl;
}

// 使用回调函数还需要在 Context 初始化的时候说明
int main() {
  // ...  
  
  Context ctx("config.json", true);
  // 从字符串读取配置文件的情况
  // Context ctx(config, ContentHint(), true);
  Reader reader(&ctx, topic, on_read_data);
  
  // ...
  return 0;
}
cpp
void ThreadReader(Reader* reader) {
  while(true) {
    char acBuf[1024] = {0};
    size_t numLength = sizeof(acBuf);
    size_t id = 0;
    size_t index = 0;
    int ret = reader->ReadNotify(acBuf, &numLength, &id, &index);
    if (ret != DDSSuccess) {
      printf("ReadNotify failed, iRet = %d\n", ret);
      continue;
    }

    // 获取 topic array 的容量
    size_t shm_capacity = 0;
    reader->Capacity(&shm_capacity);

    printf("ReadNotify %zu [id:%zu, index:%zu] data = %s\n", shm_capacity, id, index, acBuf);
  }
}

配置文件

MinieyeDDS 的配置文件是 JSON 格式的,其中有三个字段比较重要:

log_level

表示需要启用的日志等级,可选:

1: Error(推荐), 2: Warn, 3: Info, 4: Debug, 5: Trace, 0: close

topics

表示需要申请的共享内存,一个 topic 对应一个共享内存,topics 是一个数组,由 n 个如下结构的元素构成:

json
{
    "topic":            "topic_name", // topic 名称,需要确保唯一
    "dds_mode":         "shm",        // dds 模式,默认 shm
    "buff_num":         200,          // node 的数量
    "elem_max_size":    1000,         // 每个 node 的最大体积(bytes)
                                      // 注意:buff_num x elem_max_size 
                                      //      并不等于实际使用共享内存大小
    "url": "tcp://127.0.0.1:8088",    // 通信连接地址 nanomsg url (非必须)
}

示例代码

Pub-Sub

在 Pub-Sub 中:

  1. 我们创建一个服务端,服务端会尝试创建话题 test 并且 Publish 到所有订阅了该话题的客户端
  2. 我们创建一个客户端,这个客户端会订阅话题 test,并且显示所有从服务端接收到的话题

配置文件如下:

json
// config.json

{
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
}

代码如下:

cpp
#include <iostream>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

// 写线程,负责写入数据到客户端
void ThreadWriter(Writer* writer) {
  for (int i = 0; i < 30; i++) {
    std::string payload = "Hello world " + std::to_string(i);

    // 向 queue 写入数据
    int ret = writer->Write(payload.c_str(), payload.size());
    if (ret != DDSSuccess) {
      printf("Write failed, ret = %d\n", ret);
    }

    std::cout << "Write data = " << payload << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

int main() {
  const std::string configPath = "/path/to/config.json";
  
  Context ctx(configPath);
  Writer writer(&ctx, "test");

  ThreadWriter(&writer);
  return 0;
}
cpp
#include <signal.h>

#include <iostream>
#include <thread>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

void on_read_data(const char* topic, size_t index, void* ptr, size_t size, void* user) {
  std::string content((char*)ptr, size);
  std::cout << "package received, idx: " 
            << index 
            << ", frame size: " 
            << size 
            << ", content: " 
            << content 
            << std::endl;
}

int main() {
  const std::string configPath = "/path/to/config.json";

  Context ctx(configPath, true);
  Reader reader(&ctx, topic, on_read_data);

  std::this_thread::sleep_for(std::chrono::seconds(30));

  return 0;
}
cpp
#include <iostream>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

// 读线程,负责从 reader 中读取消息
void ThreadReader(Reader* reader) {
  while(true) {
    char acBuf[1024] = {0};
    size_t numLength = sizeof(acBuf);
    size_t id = 0;
    size_t index = 0;

    // 从 queue 中读取消息
    int ret = reader->ReadNotify(acBuf, &numLength, &id, &index);
    if (ret != DDSSuccess) {
      printf("ReadNotify failed, iRet = %d\n", ret);
      continue;
    }

    // 获取 topic array 的容量
    size_t shm_capacity = 0;
    reader->Capacity(&shm_capacity);

    printf("ReadNotify %zu [id:%zu, index:%zu] data = %s\n", shm_capacity, id, index, acBuf);
  }
}

int main() {
  const std::string configPath = "/path/to/config.json";
  
  Context ctx(configPath);
  Reader reader(&ctx, "test");

  ThreadReader(&reader);
  return 0;
}

当然,我们也可以从字符串读取配置

cpp
#include <iostream>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 写线程,负责写入数据到客户端
void ThreadWriter(Writer* writer) {
  for (int i = 0; i < 30; i++) {
    std::string payload = "Hello world " + std::to_string(i);

    // 向 queue 写入数据
    int ret = writer->Write(payload.c_str(), payload.size());
    if (ret != DDSSuccess) {
      printf("Write failed, ret = %d\n", ret);
    }

    std::cout << "Write data = " << payload << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

int main() {
  Context ctx(config, ContentHint());
  Writer writer(&ctx, "test");

  ThreadWriter(&writer);
  return 0;
}
cpp
#include <signal.h>

#include <iostream>
#include <thread>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

void on_read_data(const char* topic, size_t index, void* ptr, size_t size, void* user) {
  std::string content((char*)ptr, size);
  std::cout << "package received, idx: " 
            << index 
            << ", frame size: " 
            << size 
            << ", content: " 
            << content 
            << std::endl;
}

int main() {
  Context ctx(config, ContentHint(), true);
  Reader reader(&ctx, topic, on_read_data);

  std::this_thread::sleep_for(std::chrono::seconds(30));

  return 0;
}
cpp
#include <iostream>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

// 读线程,负责从 reader 中读取消息
void ThreadReader(Reader* reader) {
  while(true) {
    char acBuf[1024] = {0};
    size_t numLength = sizeof(acBuf);
    size_t id = 0;
    size_t index = 0;

    // 从 queue 中读取消息
    int ret = reader->ReadNotify(acBuf, &numLength, &id, &index);
    if (ret != DDSSuccess) {
      printf("ReadNotify failed, iRet = %d\n", ret);
      continue;
    }

    // 获取 topic array 的容量
    size_t shm_capacity = 0;
    reader->Capacity(&shm_capacity);

    printf("ReadNotify %zu [id:%zu, index:%zu] data = %s\n", shm_capacity, id, index, acBuf);
  }
}

int main() {
  Context ctx(config, ContentHint());
  Reader reader(&ctx, "test");

  ThreadReader(&reader);
  return 0;
}

关闭 topic

在《关闭 topic》中:

  1. 我们创建一个服务端,这个服务端会尝试创建话题 test,并且 Publish 到所有订阅了该话题的客户端,在第 10s 时,服务端会关闭这个 topic,并且在第 20s 时重新打开这个话题
  2. 我们创建一个客户端,这个客户端会尝试订阅话题 test,并且显示所有从服务端接收到的消息,在第 30s 时,客户端会尝试关闭回调功能,意味着尽管客户端接收到这个消息了,它也不会做出相应的回调处理,在第 40s 时,客户端会重新打开回调功能
cpp
#include <signal.h>
#include <string.h>

#include <iostream>
#include <thread>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

void ThreadWriter(Writer* writer) {
  int idx = 0;
  while (true) {
    char acData[1024] = {0};
    sprintf(acData, "shared memory, hello dds, %d", idx++);

    // 向 queue 写入数据
    int ret = writer->Write(acData, strlen(acData) + 1);
    if (ret != DDSSuccess) {
      printf("Write failed, iRet = %d\n", ret);
    }

    printf("Write data = %s\n", acData);

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }
}

int main() {
  // 先把之前旧的从系统中删除。
  // rshm_unlink_multi_posix(topic, 2);
  // rshm_unlink_multi_svpic(topic, 2);

  Context ctx(config, ContentHint());
  Writer writer(&ctx, "test");
  std::thread t1(ThreadWriter, &writer);

  for (int i = 0; i < 60; i++) {
    if (i == 10) {
      writer.disable();
      std::cout << "disable" << std::endl;
    }
    if (i == 20) {
      writer.enable();
      std::cout << "enable" << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  t1.join();

  return 0;
}
cpp
#include <signal.h>

#include <iostream>
#include <thread>

#include "dds_api.h"
#include "dds_common.h"

using namespace minieye::DDS;

const std::string config = R"({
    "log_level": 1,
    "topics": [{
        "topic": "test",
        "dds_mode": "shm",
        "buff_num": 200,
        "elem_max_size": 10000
    }]
})";

void onData(const char* topic, size_t index, void* ptr, size_t size, void* user) {
  printf("[topic: %s] array[%lu] = %d\n", topic, index, *(int*)ptr);
}

int main() {
  // 先把之前旧的从系统中删除。
  // rshm_unlink_multi_posix(topic, 2);

  Context ctx(config,ContentHint(), true);
  Reader reader(&ctx, "test", onData);

  for (int i = 0; i < 60; i++) {
    if (i == 30) {
      reader.disable();
      std::cout << "disable" << std::endl;
    }
    if (i == 40) {
      reader.enable();
      std::cout << "enable" << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  return 0;
}

智能汽车赋能者