亿迅智能制造网
工业4.0先进制造技术信息网站!
首页 | 制造技术 | 制造设备 | 工业物联网 | 工业材料 | 设备保养维修 | 工业编程 |
home  MfgRobots >> 亿迅智能制造网 >  >> Manufacturing Technology >> 工业技术

通过 MQTT 将 PLCnext Control 连接到 Apache Kafka

技术背景

卡夫卡

Apache Kafka 是一个用于数据摄取、存储、处理和重新分配的框架。如今,它已广泛应用于世界各地的公司。 Kafka 的官方网站提供了有关其想法以及如何部署它的更多信息。它的一个关键特性是已经存在大量连接到其他应用程序和通信协议(如 MQTT)的连接器。

MQTT

MQTT 是一种基于 TCP 的轻量级消息传递协议,由于其健壮性和占用空间小,通常用于 IoT 通信。 OASIS 标准 MQTT 的详细信息可以在其网站上找到。

在这里,您可以找到有关如何为 PLCnext 交叉编译 mosquitto 的 Makers 博客文章,PLCnext 是 Eclipse 的 MQTT 实现。或者,PLCnext Store 提供现成的 MQTT 应用程序。

要求

设置

下图显示了我们将要实现的设置的概述,以将数据从 PLCnext 控件摄取到 Kafka。虽然可以将 Confluent 的 MQTT 代理用于他们的 Kafka (2) 版本,但我们将专注于更通用的解决方案 (1)。它包括客户端连接并发布消息的 MQTT 代理和订阅代理上的主题、处理消息并将它们转发到 Kafka 的连接器。

创建连接器

在本教程中,我们的连接器基于来自 GitHub 的 evokly/kafka-connect-mqtt 存储库,根据 MIT 许可证(详细的许可证信息)获得许可。首先,我们下载并解压存储库。由于最新的存储库版本是 2016 年底,我们更新了 build.gradle 文件,通过用新版本替换旧依赖项:

ext { kafkaVersion = '2.6.0' }
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.13' 
    compile "org.apache.kafka:connect-api:$kafkaVersion"
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' 
    compile 'org.bouncycastle:bcprov-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpkix-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpg-jdk15on:1.67' 
    compile 'commons-io:commons-io:2.8.0' 
    compile 'org.slf4j:slf4j-api:1.7.30' 
    testCompile 'org.slf4j:slf4j-simple:1.7.30'
}

在本例中,我们将向 Kafka 发送纯字符串消息。因此我们必须编辑Java类DumbProcessor.java 在文件夹 /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt ,这是默认的消息处理器:

@Override
public SourceRecord[] getRecords(String kafkaTopic) {

    return new SourceRecord[]{new SourceRecord(null, //sourcePartition
                   null,                //sourceOffset
                   kafkaTopic,          //topic
                   null,                //partition
                   null,                //keySchema
                   mTopic,              //key
                   null,                //valueSchema
                   mMessage.toString(), //value
                   new Long(123L))};    //long timestamp
}

此后,我们构建了一个包含依赖项的 Java 存档文件 (JAR):./gradlew clean jar .我们复制输出 JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar 可以在文件夹 /kafka-connect-mqtt-master/build/libs 中找到 到 libs Kafka目录。

我们还需要 Kafka 的 libs 目录中的 org.eclipse.paho.client.mqttv3-1.2.5.jar 存档的副本。我们可以在这里下载。

此外,我们必须为连接器 mqtt.properties 创建一个配置文件 在 Kafka 的 config 文件夹。文件内容如下:

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

kafka.topic=test_in                     # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123

mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://172.17.0.1:1883  # address of the MQTT broker
mqtt.topic=test/#                       # MQTT topic where the messages should be collected

#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor

本地测试

现在我们可以在本地测试我们的连接器了。进入 Kafka 的目录,启动 ZooKeeper 和 Broker 实例:

# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# start Kafka:
bin/kafka-server-start.sh config/server.properties

# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto 

# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties

# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true

# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123

该消息显示在控制台使用者中。


工业技术

  1. 电机控制电路
  2. 控制电路
  3. Eclipse Hono 支持 Apache Kafka 进行消息传递
  4. 通过英特尔居里的运动敏感电路控制
  5. Arduino - 通过网络控制灯泡
  6. 远程生产控制的5个优点
  7. 过孔的阻抗控制及其对PCB设计中信号完整性的影响
  8. 通过 SNMP 管理 PLCnext 控制设备
  9. PLCnext 上的集群管理?
  10. PLCnext Tableau 仪表板
  11. PLCnext Power BI 报告
  12. PLCnext 控件上的 Java 应用程序