通过 MQTT 将 PLCnext Control 连接到 Apache Kafka
技术背景
卡夫卡
Apache Kafka 是一个用于数据摄取、存储、处理和重新分配的框架。如今,它已广泛应用于世界各地的公司。 Kafka 的官方网站提供了有关其想法以及如何部署它的更多信息。它的一个关键特性是已经存在大量连接到其他应用程序和通信协议(如 MQTT)的连接器。
MQTT
MQTT 是一种基于 TCP 的轻量级消息传递协议,由于其健壮性和占用空间小,通常用于 IoT 通信。 OASIS 标准 MQTT 的详细信息可以在其网站上找到。
在这里,您可以找到有关如何为 PLCnext 交叉编译 mosquitto 的 Makers 博客文章,PLCnext 是 Eclipse 的 MQTT 实现。或者,PLCnext Store 提供现成的 MQTT 应用程序。
要求
- PLCnext 上的 MQTT 客户端(有关实施提示,请参见上一节)
- 控制器连接到 PC/VM
- PC/VM 上的 MQTT 代理(例如 mosquitto)
- PC/VM 上的 Kafka 实例(请参阅 Kafka 的快速入门指南)
设置
下图显示了我们将要实现的设置的概述,以将数据从 PLCnext 控件摄取到 Kafka。虽然可以将 Confluent 的 MQTT 代理用于他们的 Kafka (2) 版本,但我们将专注于更通用的解决方案 (1)。它包括客户端连接并发布消息的 MQTT 代理和订阅代理上的主题、处理消息并将它们转发到 Kafka 的连接器。
创建连接器
在本教程中,我们的连接器基于来自 GitHub 的 evokly/kafka-connect-mqtt 存储库,根据 MIT 许可证(详细的许可证信息)获得许可。首先,我们下载并解压存储库。由于最新的存储库版本是 2016 年底,我们更新了 build.gradle
文件,通过用新版本替换旧依赖项:
ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
在本例中,我们将向 Kafka 发送纯字符串消息。因此我们必须编辑Java类DumbProcessor.java
在文件夹 /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
,这是默认的消息处理器:
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
此后,我们构建了一个包含依赖项的 Java 存档文件 (JAR):./gradlew clean jar
.我们复制输出 JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar
可以在文件夹 /kafka-connect-mqtt-master/build/libs
中找到 到 libs
Kafka目录。
我们还需要 Kafka 的 libs 目录中的 org.eclipse.paho.client.mqttv3-1.2.5.jar 存档的副本。我们可以在这里下载。
此外,我们必须为连接器 mqtt.properties
创建一个配置文件 在 Kafka 的 config
文件夹。文件内容如下:
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
本地测试
现在我们可以在本地测试我们的连接器了。进入 Kafka 的目录,启动 ZooKeeper 和 Broker 实例:
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
该消息显示在控制台使用者中。
工业技术