Skip to content

Libflow

📦 发布文件

  • libflow.so
  • flow.hpp

原理介绍

Libflow 是一个基于 C++ 开发的跨平台 IPC 库,可以在应用程序中通过 WebSocket 协议实现消息的发布与订阅

通信协议

Libflow 分为两个部分:服务端(Server)和客户端(Client),在服务端上,Libflow 会监听一个特定的端口,并且利用这个端口提供 WebSocket 服务。

客户端通过这个端口连接到服务端后可以通过 WebSocket 的二进制数据帧来相互传递消息,消息采用了 MessagePack 进行数据序列化。

服务端与客户端通信的每条消息应至少包含三个字段:

  1. 消息发送者的名称(source)
  2. 消息的主题(topic),类似于频道名称
  3. 消息的内容(data)

这些字段会被打包成一个 MessagePack 的映射(map)对象,以方便在客户端和服务端之间传输。

消息传输

Libflow 采用了 发布-订阅(Pub-Sub)模式作为消息传递的范式,服务器可以在多个主题上发布消息(Publish),同样的,客户端也可以订阅接收(Subscribe)多个主题的消息。

消息订阅

当客户端初始化并成功连接到服务器后,通常需要向服务器发送一条消息,告知服务器它希望订阅哪些主题。这样服务器就知道将哪些消息发送给该客户端。

客户端需要订阅某个特定的主题时,需要在主题:subscribe 上将目标主题放入 data 字段并且发送给服务端:

json
{
  "source": "client_name",
  "topic":  "subscribe",
  "data":   "topic to subscribe"
}

当客户端不再需要订阅某个主题时,需要在主题:unsubscribe 上将目标主题放入 data 字段并且发送给服务端:

json
{
  "source": "client_name",
  "topic":  "unsubscribe",
  "data":   "topic to unsubscribe"
}

举例:

json
// 客户端 face_id_api_user 需要订阅 dms.face_id_api_server
{
  "source": "face_id_api_user",
  "topic":  "subscribe",
  "data":   "dms.face_id_api_server"
}
json
// 客户端 face_id_api_user 需要取消订阅 dms.face_id_api_server
{
  "source": "face_id_api_user",
  "topic":  "unsubscribe",
  "data":   "dms.face_id_api_server"
}

开发指南

引入Libflow

Libflow 提供了头文件 flow.hpp 和动态库 libflow.so,因此我们只需要在开发的时候确保头文件和动态库文件被正确引入即可:

服务端

每个服务端都由一个 Context 管理,每一个 Context 需要传入一个配置,我们可以用类似以下的代码获得一个服务端程序:

cpp
// 设置 libflow 服务需要监听的地址和端口
const char* host = "127.0.0.1";
const char* port = "24012";

// 通过配置实例化一个 context
flow::ctx0 = flow::Context({ 
    {"addr", host},
    {"port", port},
});

// 启动 libflow 服务端
if (flow::ctx0.start() != 0) { 
  exit(1);
}

// ... 其他代码

// 记得关闭 Context
flow::ctx0.stop();

服务端也可以监听来自客户端的消息,为了实现这个功能,我们需要实现 flow::Receiver 接口:

cpp
// MyReceiver 接口继承自 flow::Receiver 并且重写 recv 方法
// MyReceiver 监听来自客户端的消息,并且将接收到的消息打印到标准错误输出
class MyReceiver : public flow::Receiver {
  void recv(const char* source,   // '\0' terminated string
            const char* topic,    // any binary data
            const char* data,     // any binary data
            size_t size) final {  // < 2^32
    fprintf(
      stderr, 
      "MyReceiver::recv(%s, %s, %s)\n", 
      source, 
      topic, 
      std::string(data, size).c_str()
    );
  }
};

接下来,为了让 Context 知道自己需要处理来自客户端的消息,我们需要将 MyReceiver 注册到 Context:

cpp
// 设置 libflow 服务需要监听的地址和端口
const char* host = "127.0.0.1";
const char* port = "24012";

// 通过配置实例化一个 context
flow::ctx0 = flow::Context({ 
    {"addr", host},
    {"port", port},
});

// 实例化 receiver 并且注册到 Context //
MyReceiver receiver; 
flow::ctx0.add_receiver(receiver);

// 启动 libflow 服务端
if (flow::ctx0.start() != 0) { 
  exit(1);
}

// ... 其他代码

// 记得关闭 Context
flow::ctx0.stop();

服务端除了可以接收消息,也可以发送消息:

cpp
void func_sender() {
  // 通过 Context 实例化一个 sender,并且给这个 sender 命名为 sender1
  flow::Sender sender(&flow::ctx0, "sender1");

  for (int i = 0; i < 30; i++) {
    std::string text = "Hello, world!" + std::to_string(i);
    // 向 topic "test.foo" 发送数据 text
    // !注意 send 是异步操作
    sender.send("test.foo", text, text.c_str(), text.size()); 
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  }

  fprintf(stderr, "the sender thread is done\n");
}

sender 是不需要注册的,直接使用即可:

cpp
// 设置 libflow 服务需要监听的地址和端口
const char* host = "127.0.0.1";
const char* port = "24012";

// 通过配置实例化一个 context
flow::ctx0 = flow::Context({ 
    {"addr", host},
    {"port", port},
});

// 启动 libflow 服务端
if (flow::ctx0.start() != 0) { 
  exit(1);
}

// 发送消息到客户端 //
func_sender();

// ... 其他代码

// 记得关闭 Context
flow::ctx0.stop();

客户端

与服务端相似,客户端也是通过 Context 的方式管理的,我们首先创建一个 Context:

cpp
// 初始化 Context
flow::ctx0 = flow::Context(flow::Config{
    {"servers", "none"}, // 禁用 Servers 功能
});

// 初始化客户端配置文件
flow::ClientConfig config = {
    "FlowClient",           // 客户端名称
    "ws://127.0.0.1:24012", // 服务端 URI,格式为:ws://<host>:<port>
    "*"                     // 需要订阅的话题,* 表示全部订阅
                            // eg:test.foo, test.*, *
};

if (flow::ctx0.start() != 0) {
  fprintf(stderr, "Error: cannot start flow context\n");
  exit(1);
}

客户端也可以监听来自服务端的消息,我们需要实现 flow::Client 接口:

cpp
class MyClient : public flow::Client {
 public:
  explicit MyClient(const flow::ClientConfig& config) : flow::Client(config) {}

  // This function will be run in an event loop.
  // DO NOT do blocking I/O or heavy computation in this function.
  // The data will be dropped after calling this function. If you
  // want to use it in future time, make your own copy.
  void recv(const char* source,      // '\0' terminated string
            const char* topic,       // any binary data
            const char* data,        // any binary data
            size_t size) override {  // < 2^32
    fprintf(stderr, "MyClient::recv(%s, %s, %s)\n", source, topic, std::string(data, size).c_str());
  }
};

然后我们将这个接口注册到 Context:

cpp
// 初始化 Context
flow::ctx0 = flow::Context(flow::Config{
    {"servers", "none"}, // 禁用 Servers 功能
});

// 初始化客户端配置文件
flow::ClientConfig config = {
    "FlowClient",           // 客户端名称
    "ws://127.0.0.1:24012", // 服务端 URI,格式为:ws://<host>:<port>
    "*"                     // 需要订阅的话题,* 表示全部订阅
};

MyClient client(config);        // 实例化 client //
flow::ctx0.add_client(&client); // 注册 client 到 Context

if (flow::ctx0.start() != 0) {
  fprintf(stderr, "Error: cannot start flow context\n");
  exit(1);
}

客户端也可以和服务端通信:

cpp
// 初始化 Context
flow::ctx0 = flow::Context(flow::Config{
    {"servers", "none"}, // 禁用 Servers 功能
});

// 初始化客户端配置文件
flow::ClientConfig config = {
    "FlowClient",           // 客户端名称
    "ws://127.0.0.1:24012", // 服务端 URI,格式为:ws://<host>:<port>
    "*"                     // 需要订阅的话题,* 表示全部订阅
};

MyClient client(config);        // 实例化 client 
flow::ctx0.add_client(&client); // 注册 client 到 Context

if (flow::ctx0.start() != 0) {
  fprintf(stderr, "Error: cannot start flow context\n");
  exit(1);
}

for (int i = 0; i < 30; i++) {
  std::string payload = "hello world" + std::to_string(i);
  client.send("test.foo", payload.c_str(), payload.size());
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

配置选项

除了上述例子中展示的 Context 使用的配置,Context 还支持一些高级的配置:

1. addr

Server监听地址, 默认值: 127.0.0.1

2. port

监听端口, 默认值: 24012

3. servers

设为 none 可以关闭服务

4. pusher.socket.type

内部线程间通信方式,默认值: tcp,也可以设为 udp

5. sender.serialize

消息序列化格式,默认值: msgpack

设为 raw 则不做序列化,直接将原始数据发给客户端。服务端和客户端必须将此配置项设为相同值。

6. queue.topic.capacity

消息队列容量,默认值: 8

其中 topic 为具体发送的消息主题,每个 topic 可以设置不同的队列容量。较小的队列容量意味着低延迟、高丢包率,较大的队列容量意味着高延迟、低丢包率。

示例代码

Pub-Sub

在《Pub-Sub》中

  1. 我们尝试创建一个服务端,服务端有三个 Sender(th1, th2 和 th3),这三个 Sender 制造 topic 并借由服务端 Publish 到所有订阅了对应话题的客户端。
  2. 我们创建了一个客户端,这个客户端先连接到服务端,然后订阅了服务端所有的 topic。
cpp
// 服务器,作为数据发布者

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <chrono>
#include <thread>

#include "flow.hpp"

// th1 负责制造 test.foo 话题
void func_th1() {
  flow::Sender sender(&flow::ctx0, "th1");

  for (;;) {
    const char* text = "Hello, world!";
    // 向 topic "test.foo" 发送数据 text
    sender.send("test.foo", text, strlen(text));
    std::this_thread::sleep_for(std::chrono::milliseconds(1200));
  }

  fprintf(stderr, "the th1 thread is done\n");
}

// th2 负责制造 test.bar 话题
void func_th2() {
  flow::Sender sender(&flow::ctx0, "th2");

  for (;;) {
    const char* text = "Are you OK?";
    // 向 topic "test.bar" 发送数据 text
    sender.send("test.bar", text, strlen(text));
    std::this_thread::sleep_for(std::chrono::milliseconds(800));
  }

  fprintf(stderr, "the th2 thread is done\n");
}

// th3 负责制造 test1.bar 话题
void func_th3() {
  flow::Sender sender(&flow::ctx0, "th3");

  for (;;) {
    const char* text = "Welcome to libflow";
    // 向 topic "test1.bar" 发送数据 text
    sender.send("test1.bar", text, strlen(text));
    std::this_thread::sleep_for(std::chrono::milliseconds(800));
  }

  fprintf(stderr, "the th3 thread is done\n");
}

int main() {
  // 服务器监听的地址和端口
  const char* host = "127.0.0.1";
  const char* port = "24012";

  flow::ctx0 = flow::Context({
      {"addr", host},
      {"port", port},
  });

  if (flow::ctx0.start() != 0) {
    exit(1);
  }

  printf("Listening on %s:%s\n", host, port);

  typedef void (*thread_func_t)(void);
  thread_func_t funcs[3] = {
      func_th1,
      func_th2,
      func_th3,
  };

  std::thread threads[3];

  for (int i = 0; i < 3; i += 1) {
    threads[i] = std::thread(funcs[i]);
  }

  for (int i = 0; i < 3; i += 1) {
    threads[i].join();
  }

  flow::ctx0.stop();
  return 0;
}
cpp
// 客户端,作为数据的订阅者

#include <stdio.h>
#include <string.h>

#include <chrono>
#include <string>
#include <thread>

#include "flow.hpp"

class MyClient : public flow::Client {
 public:
  explicit MyClient(const flow::ClientConfig& config) : flow::Client(config) {}

  // This function will be run in an event loop.
  // DO NOT do blocking I/O or heavy computation in this function.
  // The data will be dropped after calling this function. If you
  // want to use it in future time, make your own copy.
  virtual void recv(const char* source,  // '\0' terminated string
                    const char* topic,   // any binary data
                    const char* data,    // any binary data
                    size_t size) {       // < 2^32
    printf("--------------------------------------------------\n");
    fprintf(stderr, "MyClient::recv(%s, %s, %s)\n", source, topic, std::string(data, size).c_str());
  }
};

int main(int argc, char* argv[]) {
  // 这里设置 {"servers", "none"} 是为了关闭服务端功能
  flow::Config ctx_config{{"servers", "none"}};
  flow::ctx0 = flow::Context(ctx_config);

  std::string uri = "ws://127.0.0.1:24012";

  // FlowClient 是客户端名字
  // uri 是服务器的地址和端口
  // "*" 表示订阅所有的 topic
  flow::ClientConfig config = {"FlowClient", uri, "*"};

  MyClient client(config);
  flow::ctx0.add_client(&client);

  if (flow::ctx0.start() != 0) {
    fprintf(stderr, "Error: cannot start flow context\n");
    exit(1);
  }

  while (1) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  flow::ctx0.stop();
  return 0;
}

广播

在《广播》中:

  1. 我们尝试创建一个服务端,这个服务端接受并广播所有来自客户端的消息
  2. 我们创建一个客户端 client1,这个客户端会尝试向服务端发送消息并且接收所有来自服务端的消息
  3. 我们创建一个客户端 client2,这个客户端会接收来自服务端的消息并且展示出来
cpp
// 服务器,作为数据发布者

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <chrono>

#include "flow.hpp"
#include "mpmc_queue.hpp"

struct Message {
  std::string source;
  std::string topic;
  std::string data;
};

// 消息通道,负责在不同的线程中通信
mpmc_queue<Message> channel(20);

// receiver 负责接收来自 client 的消息
class MyReceiver : public flow::Receiver {
  void recv(const char* source,   // '\0' terminated string
            const char* topic,    // any binary data
            const char* data,     // any binary data
            size_t size) final {  // < 2^32
    fprintf(stderr, "MyReceiver::recv(%s, %s, %s)\n", source, topic, std::string(data, size).c_str());

    // 将读取到的消息放入消息通道中
    channel.push(Message{
        .source = source,
        .topic = topic,
        .data = std::string(data, size),
    });
  }
};

void func_broadcast() {
  flow::Sender sender(&flow::ctx0, "broadcast");

  // 从消息通道中取出消息并将其广播到所有的客户端
  Message payload;
  while (channel.pop(payload)) {
    // 接收到 close 的时候关闭服务端
    if (payload.data == "close") {
      channel.stop();
    }
    sender.send(payload.topic, payload.data.c_str(), payload.data.size());
  }
}

int main() {
  // 服务器监听的地址和端口
  const char* host = "127.0.0.1";
  const char* port = "24012";

  flow::ctx0 = flow::Context({
      {"addr", host},
      {"port", port},
  });

  // 将 receiver 注册到 context 中
  auto receiver = new MyReceiver;
  flow::ctx0.add_receiver(receiver);

  if (flow::ctx0.start() != 0) {
    exit(1);
  }

  printf("Listening on %s:%s\n", host, port);

  // 启动广播并等待其结束
  func_broadcast();
  flow::ctx0.stop();
  delete receiver;

  return 0;
}
cpp
// 客户端 1,作为消息的制造者与消费者

#include <chrono>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <string>
#include <thread>

#include "flow.hpp"

class MyClient : public flow::Client {
 public:
  explicit MyClient(const flow::ClientConfig& config) : flow::Client(config) {}

  // This function will be run in an event loop.
  // DO NOT do blocking I/O or heavy computation in this function.
  // The data will be dropped after calling this function. If you
  // want to use it in future time, make your own copy.
  void recv(const char* source,      // '\0' terminated string
            const char* topic,       // any binary data
            const char* data,        // any binary data
            size_t size) override {  // < 2^32
    printf("--------------------------------------------------\n");
    fprintf(stderr, "MyClient::recv(%s, %s, %s)\n", source, topic, std::string(data, size).c_str());
  }
};

int main(int /*argc*/, char* /*argv*/[]) {
  // 启动 libflow 客户端
  flow::Config ctx_config{{"servers", "none"}};
  flow::ctx0 = flow::Context(ctx_config);

  // 连接到服务端并且订阅所有消息
  std::string uri = "ws://127.0.0.1:24012";
  flow::ClientConfig config = {"FlowClient", uri, "*"};

  // 将 MyClient 注册到 context
  MyClient client(config);
  flow::ctx0.add_client(&client);

  // 启动 context
  if (flow::ctx0.start() != 0) {
    fprintf(stderr, "Error: cannot start flow context\n");
    exit(1);
  }

  // 向服务端连续发送 30 条消息,服务端接收到这些消息以后将尝试广播
  for (int i = 0; i < 30; i++) {
    std::string data = "Hello world " + std::to_string(i);
    client.send("test.foo", data.c_str(), data.size());
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  // 客户端退出,通知服务端也退出
  std::string close = "close";
  client.send("test.foo", close.c_str(), close.size());
  std::this_thread::sleep_for(std::chrono::seconds(1));

  // 关闭 context
  flow::ctx0.stop();

  return 0;
}
cpp
// 客户端 2,作为消息的消费者
// 内容与示例代码 1 中客户端相同,

#include <chrono>
#include <cstdio>
#include <cstring>
#include <string>
#include <thread>

#include "flow.hpp"

class MyClient : public flow::Client {
 public:
  explicit MyClient(const flow::ClientConfig& config) : flow::Client(config) {}

  // This function will be run in an event loop.
  // DO NOT do blocking I/O or heavy computation in this function.
  // The data will be dropped after calling this function. If you
  // want to use it in future time, make your own copy.
  void recv(const char* source,      // '\0' terminated string
            const char* topic,       // any binary data
            const char* data,        // any binary data
            size_t size) override {  // < 2^32
    printf("--------------------------------------------------\n");
    fprintf(stderr, "MyClient::recv(%s, %s, %s)\n", source, topic, std::string(data, size).c_str());
  }
};

int main(int /*argc*/, char* /*argv*/[]) {
  flow::Config ctx_config{{"servers", "none"}};
  flow::ctx0 = flow::Context(ctx_config);

  std::string uri = "ws://127.0.0.1:24012";
  flow::ClientConfig config = {"FlowClient", uri, "*"};

  MyClient client(config);
  flow::ctx0.add_client(&client);

  if (flow::ctx0.start() != 0) {
    fprintf(stderr, "Error: cannot start flow context\n");
    exit(1);
  }

  while (1) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  flow::ctx0.stop();

  return 0;
}

附录

名词解释

  1. WebSocket: rfc6455
  2. Binary Data Frame: rfc6455
  3. MessagePack: msgpack.org
  4. Pub-Sub(Publish-Subscribe): Pub-Sub

智能汽车赋能者