亿迅智能制造网
工业4.0先进制造技术信息网站!
首页 | 制造技术 | 制造设备 | 工业物联网 | 工业材料 | 设备保养维修 | 工业编程 |
home  MfgRobots >> 亿迅智能制造网 >  >> Manufacturing Technology >> 工业技术

gRPC Python – 读写流程数据

本文介绍如何使用 Python 访问和编写简单的流程数据,并使用 gRPC 和 AXC F 3152。 (https://www.plcnext.help/te/Service_Components/gRPC_Introduction.htm)

先决条件

首先,我们必须在 PLC 之外准备所需的文件,例如在 Windows 机器上。

  1. 安装 Python 3.9(3.10 可能会导致错误)
  2. 安装所需的 Python 包以从 .proto 文件生成代码:pip install grpcio-tools==1.36.1
  3. 从 https://github.com/PLCnext/gRPC 下载并解压缩包含 .proto 文件的存储库

从 .proto 文件生成 _pb2.py 和 _pb2_grpc.py

接下来,我们必须从提供的 .proto 文件中生成所需的 python 文件。后者位于以下文件夹:gRPC-master/protobuf。

使用此代码在文件夹 gRPC-master 中创建 Python 脚本,例如 generate_grpc.py。脚本

  1. 生成所需的文件并将它们放在 gRPC-master/pxc_grpc 中
  2. 调整导入路径

import glob
import os
from pathlib import Path

# create the output directory
Path('pxc_grpc').mkdir(parents=True, exist_ok=True)

grpc_command_base = 'python -m grpc_tools.protoc -I./protobuf --python_out=pxc_grpc --grpc_python_out=pxc_grpc '

import_paths = set()

# generate the *_pb2.py and *_pb2_grpc.py files
for filename in glob.iglob('./protobuf/**', recursive=True):

    if filename.endswith('.proto'):
        
        # store the import path
        path_parts = filename.split(os.sep)
        import_paths.add('.'.join(path_parts[1:-1]))

        grpc_command = ''.join([grpc_command_base, os.path.join('.', os.path.relpath(filename))])
        stream = os.popen(grpc_command)
        output = stream.read()
        if output != '':
            print(''.join(['error/info for file ', os.path.relpath(filename), ' - ', output]))


# get the python files in the base directory
base_pys = set()

for (dirpath, dirnames, filenames) in os.walk('./pxc_grpc'):
    for f in filenames:
        base_pys.add(f.split('.py')[0])
    break

# reformat the stored paths to adapt the import statements
try:
    import_paths.remove('')
except:
    pass

import_paths = list(import_paths)
import_paths.sort(key=len)
import_paths.reverse()

# adapt the imports
for filename in glob.iglob('./pxc_grpc/**', recursive=True):

    if filename.endswith('.py'):

        new_lines = []

        with open(filename, 'r') as file:
            lines = file.readlines()
            for line in lines:
                if line.startswith('from'):
                    for import_path in import_paths:
                        if import_path in line:
                            line = line.replace(import_path, ''.join(['pxc_grpc.', import_path]), 1)
                            break
                elif line.startswith('import'):
                    parts = line.split()
                    if parts[1] in base_pys:
                        line = line.replace('import', 'from pxc_grpc import')
                
                new_lines.append(line)

        with open(filename, 'w') as file:
            file.write(''.join(new_lines))

打开一个shell并执行脚本:pyton generate_grpc.py

创建一个 PLCnext 演示项目

所示项目应仅演示 gRPC 接口如何与 GDS 交互。随意使用现有项目。对于单个项目,您必须相应地在以下 Python 脚本中编辑端口名称,例如 Arp.Plc.Eclr/MainInstance.strInput .

准备PLC

安装 pip 来管理你的 Python 包:

  1. 将 AXC F 3152 控制器连接到互联网。
  2. 输入命令 curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py。
  3. 然后输入命令python3 get-pip.py。

安装所需的包:pip install grpcio protobuf==3.20.0

在项目目录(/opt/plcnext/projects/)中创建一个文件夹“grpc_test”。

将包含 crated Python 文件的“pxc_grpc”文件夹复制到“grpc_test”,例如使用 WinSCP。

在 'grpc_test' 文件夹中创建一个名为 'grpc_test.py' 的 Python 脚本并插入以下代码:


import grpc
from pxc_grpc.Plc.Gds.IDataAccessService_pb2 import IDataAccessServiceReadSingleRequest, \
    IDataAccessServiceReadRequest, IDataAccessServiceWriteSingleRequest, IDataAccessServiceWriteRequest
from pxc_grpc.Plc.Gds.IDataAccessService_pb2_grpc import IDataAccessServiceStub
from pxc_grpc.Plc.Gds.WriteItem_pb2 import WriteItem


def write_single_string(stub, port_name, value):
    
    single_write_request = IDataAccessServiceWriteSingleRequest()
    single_write_request.data.PortName = port_name
    single_write_request.data.Value.TypeCode = 19
    single_write_request.data.Value.StringValue = value

    return stub.WriteSingle(single_write_request)


def write_single_int(stub, port_name, value):
    
    single_write_request = IDataAccessServiceWriteSingleRequest()
    single_write_request.data.PortName = port_name
    single_write_request.data.Value.TypeCode = 6
    single_write_request.data.Value.Int16Value = value

    return stub.WriteSingle(single_write_request)


def write_multiple_values(stub):

    write_request = IDataAccessServiceWriteRequest()

    wi1 = WriteItem()
    wi1.PortName = 'Arp.Plc.Eclr/MainInstance.strInput'
    wi1.Value.StringValue = "test1"
    wi1.Value.TypeCode = 19
    
    wi2 = WriteItem()
    wi2.PortName = 'Arp.Plc.Eclr/MainInstance.strInput2'
    wi2.Value.StringValue = "test2"
    wi2.Value.TypeCode = 19

    # add multiple WriteItems at once
    write_request.data.extend([wi1, wi2])

    # add WriteItems separately
    # response1.data.append(wi1)
    # response1.data.append(wi2)

    return stub.Write(write_request)


def read_single_value(stub, port_name):

    single_read_request = IDataAccessServiceReadSingleRequest()
    single_read_request.portName=port_name

    return stub.ReadSingle(single_read_request)


def read_multiple_values(stub, port_names):

    read_request = IDataAccessServiceReadRequest()
    read_request.portNames.extend(port_names)

    return stub.Read(read_request)


if __name__ == "__main__":
   
    # create channel and stub
    channel = grpc.insecure_channel('unix:/run/plcnext/grpc.sock')
    stub = IDataAccessServiceStub(channel)

    print(write_single_string(stub, 'Arp.Plc.Eclr/MainInstance.strInput', 'test123'))
    print(write_single_int(stub, 'Arp.Plc.Eclr/MainInstance.iInput', 18))

    print(write_multiple_values(stub))

    r = read_single_value(stub, 'Arp.Plc.Eclr/MainInstance.strInput')
    print(r)
    print(r._ReturnValue.Value.TypeCode)
    print(r._ReturnValue.Value.StringValue)

    r = read_multiple_values(stub, ['Arp.Plc.Eclr/MainInstance.iInput', 'Arp.Plc.Eclr/MainInstance.strInput'])
    for value in r._ReturnValue:
        print(value, value.Value.TypeCode)


将您的 PLC 连接到 PLCnext Engineer,下载项目并启动实时视图。

运行示例

现在开始示例。通过 ssh 登录 PLC 并导航到“grpc_test”,然后启动 Python 脚本:

  1. cd projects/grpc_test/
  2. python3 grpc_test.py

gRPC 支持与 GDS 变量的交互。

数据类型

要读取和写入变量,需要数据类型,例如 wi1.Value.TypeCode = 19 .类型在生成的文件 gRPC-master/pxc_grpc/ArpTypes_pb2.py 中描述 从第 242 行开始:

CT_None = 0
CT_End = 0
CT_Void = 1
CT_Boolean = 2
CT_Char = 3
CT_Int8 = 4
CT_Uint8 = 5
CT_Int16 = 6
CT_Uint16 = 7
CT_Int32 = 8
CT_Uint32 = 9
CT_Int64 = 10
CT_Uint64 = 11
CT_Real32 = 12
CT_Real64 = 13
CT_Struct = 18
CT_String = 19
CT_Utf8String = 19
CT_Array = 20
CT_DateTime = 23
CT_Version = 24
CT_Guid = 25
CT_AnsiString = 26
CT_Object = 28
CT_Utf16String = 30
CT_Stream = 34
CT_Enumerator = 35
CT_SecureString = 36
CT_Enum = 37
CT_Dictionary = 38
CT_SecurityToken = 39
CT_Exception = 40
CT_IecTime = 41
CT_IecTime64 = 42
CT_IecDate = 43
CT_IecDate64 = 44
CT_IecDateTime = 45
CT_IecDateTime64 = 46
CT_IecTimeOfDay = 47
CT_IecTimeOfDay64 = 48

对应的值变量,例如 r._ReturnValue.Value.StringValue , 可以在同一个文件中找到,从第 365 行开始,例如 BoolValue , Int8Value , StringValue .


工业技术

  1. 数字记忆术语和概念
  2. Python 数据类型
  3. 宇瞻:CV110-SD 和 CV110-MSD 卡在全球推出
  4. 宇瞻:工业级 SV250 SSD 系列,读写速度分别为 560 和 520MB/s
  5. 如何理解大数据:RTU 和过程控制应用
  6. 什么是铣削?- 定义、过程和操作
  7. 什么是钻孔?- 定义、过程和技巧
  8. 什么是粉末冶金?-定义和工艺
  9. 什么是拉削?- 工艺、加工和类型
  10. 什么是化学加工?- 工作和过程
  11. 什么是超声波加工?- 工作和过程
  12. 什么是喷焊?- 工艺和技术