亿迅智能制造网
工业4.0先进制造技术信息网站!
首页 | 制造技术 | 制造设备 | 工业物联网 | 工业材料 | 设备保养维修 | 工业编程 |
home  MfgRobots >> 亿迅智能制造网 >  >> Industrial Internet of Things >> 嵌入式

为反应式系统实现 MQTT 客户端

MQTT-Reactive 是源自 LiamBindle 的 MQTT-C 库的 MQTT v3.1.1 客户端。 MQTT-Reactive 的目标是提供一个用 C 编写的可移植和非阻塞的 MQTT 客户端,以便在反应式嵌入式系统中使用。首先,本文解释了什么是反应式系统。然后,它描述了如何为这种系统设计合适的软件结构。最后,本文展示了如何通过使用状态机和事件驱动范式在反应式系统中使用 MQTT-Reactive 库。为此,本文以真实的物联网设备为例,通过状态机、交互和结构等 UML 图解释了其软件结构和基于状态的行为。本文还提供了使用 C 语言实现 IoT 设备的 MQTT-Reactive 客户端的指南。

许多嵌入式系统是反应性的,即它们对内部或外部事件做出反应。一旦这些反应完成,软件就会返回等待下一个事件。这就是为什么事件驱动系统也被称为反应式系统。

事件驱动编程,或反应式编程,是为反应式系统实现灵活、可预测和可维护的软件的最合适的编程范式之一。在这种范式中,程序的流程由事件决定。通常,反应式软件的结构由多个并发单元组成,称为活动对象,它们等待和处理不同类型的事件。每个活动对象都拥有一个控制线程和一个事件队列,通过它处理传入的事件。在反应式系统中,活动对象通常具有在状态图中定义的基于状态的行为。

为了探索如何在具有多个并发任务并同时使用状态机和事件驱动范式的反应式系统中使用 MQTT-Reactive 库,我们以物联网设备为例。

使用 MQTT 协议的想法诞生于为铁路公司开发物联网设备时。该设备是一个清晰的反应系统,能够:

  1. 检测并存储多个数字输入的变化
  2. 获取、过滤和存储多个模拟信号
  3. 定期将存储的信息发送到远程服务器
  4. 通过 GSM 网络通过 MQTT 协议发送和接收信息

选择 MQTT 是因为它是一种基于发布者-订阅者的轻量级消息传递协议,常用于物联网和网络应用程序,其中需要高延迟和低数据速率的链接,例如 GSM 网络。

上述 IoT 设备的 MQTT 功能是通过使用 LiamBindle 的 MQTT-C 的修改版本来实现的。由于该设备的软件被设计为反应式软件,因此必须修改 MQTT-C 以通过交换异步事件与系统的其余部分进行通信。这些事件用于通过网络接收和发送流量,以及连接敏感信息并将其发布到服务器。由此产生的软件库被称为 MQTT-Reactive。

状态机

MQTT-Reactive 通过状态机使用,如图 1 所示,它模拟了 MQTT-Reactive 客户端的基本行为。它是一个名为 MqttMgr(MQTT 管理器)的活动对象。图 1 中的状态机操作演示了如何从状态机中使用 MQTT-Reactive 库。尽管图 1 中使用 C 语言作为动作语言,但可以使用任何计算机或形式语言。

点击查看大图

图 1. MQTT-Reactive 客户端的状态机(来源:VortexMakes)

图 1 中的状态机从 WaitingForNetConnection 状态开始。在与服务器建立网络连接后,WaitingForNetConnection 接收到 Activate 事件,然后状态机转换到 WaitingForSync 状态。只有在这种状态下,状态机才能将 MQTT 消息分别通过 Connect 和 Publish 事件发送给代理,例如 CONNECT 或 PUBLISH。同步状态使用 UML 的特殊机制来延迟发布事件,该事件由同步状态的内部隔间中包含的 defer 关键字指定。如果 Publish 事件在 Sync 处于当前状态时发生,它将被保存(推迟)以备将来处理,直到 SM 进入 Publish 事件不在其推迟事件列表(例如 WaitingForSync 或 WaitingForNetConnection)中的状态。进入此类状态后,状态机将自动调用任何保存的 Publish 事件,然后根据转换目标状态消耗或丢弃此事件。

状态机每隔 SyncTime 毫秒转换到 Sync 复合状态,该状态通过将 Receive 和 Send 事件发布到网络管理器来实际发送和接收来自网络的流量。它是处理网络问题的并发实体。

尽管引入的 MqttMgr 仅支持 CONNECT 和 PUBLISH 数据包,但它可以通过相当简单的更改来支持 SUBSCRIBE 数据包。

状态机操作通过使用 params 关键字访问消费事件的参数。例如,在下面的转换中,Connect 事件携带了两个参数,clientId 和 keepAlive,它们的值用于更新对应的 MqttMgr 对象的属性:

Connect(clientId, keepAlive)/ me->clientId =params->clientId;我->keepAlive =params->keepAlive; me->operRes =mqtt_connect(&me->client, me->clientId, NULL, NULL, 0, NULL, NULL, 0, me->keepAlive);

在此示例中, Connect(clientId, keepAlive) 事件是转换的触发器,而 mqtt_connect() 调用是作为结果执行的操作的一部分。换句话说,当 MqttMgr 对象接收到参数为 'publishing_client' 和 '400'、Connect(“publishing_client”, 400) 的 Connect(clientId, keepAlive) 事件时,MqttMgr 的 clientId 和 keepAlive 属性将更新为值 '因此,publishing_client' 和 '400'。

为了创建和发送事件,状态机的操作使用 GEN() 宏。例如,以下语句向 Collector 对象发送一个 Receive 事件,该对象被 itsCollector 指针引用为 MqttMgr 对象的属性:

GEN(me->itsCollector, Receive());

GEN() 语句的第一个参数是接收事件的对象,而第二个参数是正在发送的事件,包括事件参数(如果有的话)。参数必须与事件参数一致。例如,下面的语句会生成一个 ConnRefused(code) 事件,并将其发送给 Collector 对象,并传递代理返回的代码作为事件参数:

GEN(me->itsCollector, ConRefused(code));

使用 params 关键字访问消耗事件的参数和使用 GEN() 宏从操作生成事件的想法是从 Rational Rhapsody Developer 的代码生成器中采用的,仅用于说明目的。

图 1 中状态机的默认操作设置回调,每当从代理接收到连接接受时,MQTT-Reactive 都会调用该回调。这个回调应该在 MqttMgr 代码中实现。此回调必须生成 ConnAccepted 或 ConnRefused(code) 事件以发送到收集器对象,如下所示。

静态 无效 connack_response_callback (枚举 MQTTConnackReturnCode return_code){ /*...*/ if (return_code ==MQTT_CONNACK_ACCEPTED) { GEN(me->itsCollector, ConnAccepted()); } 其他 { GEN(me->itsCollector, ConnRefused(return_code)); }}

模型实现

图 1 中的模型可以通过使用您最喜欢的软件工具或仅使用您自己的状态机实现以 C 或 C++ 实现。互联网上有不同的工具可以做到这一点,例如 RKH 框架、QP 框架、Yakindu Statechart Tool 或 Rational Rhapsody Developer 等。它们都支持状态图和 C/C++ 语言。此外,其中一些包含用于绘制状态图并从中生成代码的工具。

这个状态机是从一个名为 MqttMgr(MQTT Manager)的活动对象执行的,它提供了对 MQTT-Reactive 代码的严格封装,并且它是唯一允许调用任何 MQTT-Reactive 函数或访问 MQTT-Reactive 数据的实体。系统中的其他并发实体以及任何 ISR 只能通过与 MqttMgr 交换事件来间接使用 MQTT-Reactive。使用这种机制来同步并发实体并在它们之间共享数据避免了处理传统阻塞机制(如信号量、互斥量、延迟或事件标志)的危险。这些机制可能会导致难以诊断和修复的意外故障。

MqttMgr 活动对象将其属性封装为一组数据项。数据项用名称和类型指定变量,其中类型实际上是数据类型。 MqttMgr 对象的数据项映射到对象结构的成员。成员的名称和类型与对象数据的名称和类型相同。例如,MqttMgr 对象类型的客户端属性作为数据成员按值嵌入到 MqttMgr 结构内部:

结构 MqttMgr { /* ... */ 结构 mqtt_client 客户; /* 属性客户端 */ LocalRecvAll localRecv; /* 属性 localRecv */ };

直接访问和修改 MqttMgr 对象的数据,无需使用访问器或修改器操作。比如client和localRecv通过me指针访问,指向MqttMgr的一个实例。

mqtt_recvMsgError(&me->client, &me->localRecv);

MqttMgr 的属性列表如表 1 所示。

表 1. MqttMgr 属性

图 2 中的结构有助于记住相关参与者之间的关系。它们是: Collector 对象,它想向代理发送信息;处理网络的 NetMgr 对象;和 MqttMgr 对象。


图 2. 物联网系统架构草图(来源:VortexMakes)

图 3 中的序列图显示了当需要打开与 MQTT 服务器的会话时 MqttMgr 对象如何与系统的其余部分交互。在此图中,MqttMgr 状态和交换的异步消息在 Collector、MqttMgr 和 NetMgr 参与者之间展示。


图 3. 连接到 MQTT 代理(来源:VortexMakes)

在 NetMgr 对象与代理建立网络连接后,从 MqttMgr 发送到 MQTT 服务器的第一个数据包必须是 CONNECT 数据包。因此,收集器参与者向 MqttMgr 参与者发送 Connect(clientId, keepAlive) 事件。此事件必须携带客户端标识符和保持活动时间间隔。如果服务器接受连接请求,MqttMgr actor 将发送 ConnAccepted 事件到 Collector actor 以通知这种情况。从那时起,收集器参与者可以向该代理发布信息消息。

如果服务器拒绝连接请求,MqttMgr 角色将向收集器角色发送 ConnRefused 事件。该事件带有通知拒绝原因的代码,如图 4 所示。参见 MQTT v3.1.1 第 3.2.2.3 节。


图 4. Broker 拒绝连接请求(来源:VortexMakes)

图 5 显示了消息发布时的交互流程。为了做到这一点,收集器参与者发送一个 Publish(data, size, topic, qos) 事件,该事件携带要发布的信息(数据),以字节为单位的信息长度(大小),要发布到的主题名称信息将被发布(主题)以及传递此消息的保证级别 (qos)。在前面提到的 IoT 设备中,发布的信息是使用 JSON 规范格式化的。它是一种开放的标准格式,包含具有人类可读文本中的属性值对的数据对象。这种格式是使用 jWrite 完成的,jWrite 是一个用 C 编写的简单轻量级的库。


图 5. 将数据发布到代理(来源:VortexMakes)

图 6 显示了一个场景,其中向网络接收和发送 MQTT 消息失败。如果网络管理器无法从网络接收流量,它将向 MqttMgr 参与者发送 ReceiveFail。同样,如果网络管理器无法向网络发送数据,则会向 MqttMgr 参与者发送 SendFail。


图 6. 网络故障(来源:VortexMakes)

表 2 总结了所示场景中涉及的事件。

表 2. 事件

结论

通过避免传统阻塞机制(如信号量、互斥、延迟或事件标志)的危险,本文中提出的 MQTT-Reactive 库、状态机和软件架构允许反应式嵌入式系统以新颖的方式实现 MQTT 客户端大大地。它是通过将 MQTT-Reactive 代码封装在一个称为 active object 的并发单元中来实现的 ,其基于状态的行为在建议的状态机中定义。这个主动对象通过交换异步事件与系统的其余部分进行通信:不仅用于通过网络接收和发送流量,还用于连接和发布信息到物联网应用程序的服务器。


嵌入式

  1. IIoT 分类法
  2. 为工业 4.0 构建灵活的制造系统
  3. 微控制器和嵌入式系统的 IC 技术简介
  4. 成功实施 RTLS 解决方案的 6 个防弹技巧
  5. 项目探索物联网安全的可信设计和验证流程
  6. Würth Elektronik eiSos 展示智能系统的新组件
  7. 为机器人系统设计电机控制
  8. Syslogic:适用于工程机械的坚固型计算机和 HMI 系统
  9. 控创和 SYSGO:用于安全关键系统的 SAFe-VX 计算平台
  10. 电路的所需状态配置
  11. 企业为智能系统设定最后期限
  12. 制造商的十大工作流程