智能物联网集成与Akenza和Python – CodesCode

在这篇博客文章中,我探讨了Akenza和Python的结合,为智能、实时的物联网集成和监控开辟了新的途径

物联网平台 Akenza 本身擅长收集和管理来自众多物联网设备的数据。但是,与企业资源规划 (ERP)、客户关系管理 (CRM) 平台、工作流管理或环境监测工具等其他系统的集成能够实现对整个组织景观的完整视图。

为了增加 Akenza 的功能并实现平滑集成,Python 编程的多功能性起到了关键作用。由于 Python 的灵活性,它是连接 Akenza 和组织独特需求之间的桥梁的自然选择。

本文讲述的是如何将两者:Akenza 和 Python 结合起来。在完成本文后,您将拥有以下内容:

  • 使用 Python 和 WebSockets 与 Akenza 进行双向连接。
  • 一个 Python 服务,通过 Akenza 订阅并接收物联网设备的事件。
  • 一个 Python 服务,将数据发送到通过 Akenza 连接的物联网设备。

由于 WebSocket 连接是持久的,它们的使用增强了物联网应用的响应性,进而实现实时数据交换,促进了一个动态而灵活的集成生态系统。

Python 和 Akenza WebSocket 连接

首先,让我们看一下完整的 Python 代码,稍后会对其进行讨论。

 # -*- coding: utf-8 -*-# Zatofrom zato.server.service import WSXAdapter# ################################################################################################ ###############################################################################################if 0:    from zato.server.generic.api.outconn.wsx.common import OnClosed, \        OnConnected, OnMessageReceived# ################################################################################################ ###############################################################################################class DemoAkenza(WSXAdapter):    # 我们的名称    name = 'demo.akenza'    def on_connected(self, ctx:'OnConnected') -> 'None':        self.logger.info('Akenza OnConnected -> %s', ctx)# ###############################################################################################    def on_message_received(self, ctx:'OnMessageReceived') -> 'None':        # 确认接收到的内容        self.logger.info('Akenza OnMessageReceived -> %s', ctx.data)        # 这表示我们已经连接成功...        if ctx.data['type'] == 'connected':            # ...为了测试目的,使用一个固定的资产 ID...            asset_id:'str' = 'abc123'            # ...构建我们的订阅消息...            data = {'type': 'subscribe', 'subscriptions': [{'assetId': asset_id, 'topic': '*'}]}            ctx.conn.send(data)        else:            # ...如果我们在这里,表示接收到的消息不是"type"为"connected"的消息。            self.logger.info('Akenza message (other than "connected") -> %s', ctx.data)# ##############################################################################################    def on_closed(self, ctx:'OnClosed') -> 'None':        self.logger.info('Akenza OnClosed -> %s', ctx)# ############################################################################################### ##############################################################################################

现在,将代码部署到 Zato 并创建一个新的传出 WebSocket 连接。将 API 密钥替换为您自己的,并确保将数据格式设置为 JSON。

从 WebSockets 接收消息

您编写的 WebSocket Python 服务具有三个感兴趣的方法,每个方法都对特定事件做出反应:

  • on_connected:在 WebSocket 连接打开后立即调用。请注意,这是一个低级事件,在 Akenza 的情况下,并不能立即发送或接收消息。
  • on_message_received:您大部分时间用于工作的主要方法。每当远程 WebSocket 发送或推送事件到您的服务时都会调用此方法。与 Akenza 一起使用时,每当 Akenza 有需要通知您的事情时都会调用此方法,例如您订阅了消息。
  • on_closed:在 WebSocket 被关闭后调用。一旦 WebSocket 被关闭,就无法再使用它。

让我们重点关注 on_message_received 方法,其中发生了大部分操作。它接收类型为 OnMessageReceived 的单个参数,描述了接收消息的上下文。也就是说,在 “ctx” 中,您可以找到当前请求以及可以用来回复消息的 WebSocket 连接的句柄。

上下文对象的两个重要属性是:

  • ctx.data:Akenza发送给您的数据字典。
  • ctx.conn:数据通过的底层WebSocket连接,通过它可以发送响应。

现在,代码30-40行的逻辑很清晰:

  • 首先,我们检查Akenza是否确认我们已连接(type==’connected’)。每次Akenza向您发送消息时,您都需要检查消息的类型并做出相应反应。
  • 接下来,因为我们知道我们已经连接(例如,我们的API密钥有效),我们可以订阅给定IoT资产的事件。为了测试目的,资产ID直接在源代码中给出,但实际上,这些信息会从配置文件或数据库中读取。
  • 最后,对于任何其他类型的消息,我们只是记录其详细信息。当然,完整的集成将根据具体情况处理这些消息,例如将它们转换并推送到其他应用程序或管理系统。

Akenza的示例消息将如下所示。

INFO - WebSocketClient -  Akenza消息(非“connected”) -> {'type': 'subscribed','replyTo': None, 'timeStamp': '2023-11-20T13:32:50.028Z','subscriptions': [{'assetId': 'abc123', 'topic': '*', 'tagId': None, 'valid': True}],'message': None}

如何向WebSocket发送消息

不容忽视的一个方面是在另一个方向上进行通信,即向WebSocket发送消息。例如,您可以通过REST API或者调度程序调用服务,并且它们的任务是将这些调用转换为IoT设备的配置命令。

下面是这样一个服务的核心部分,重复使用同一个Akenza WebSocket连接:

 # -*- coding: utf-8 -*-# Zatofrom zato.server.service import Service# ############################################################################################### ##############################################################################################class DemoAkenzaSend(Service):    # Our name    name = 'demo.akenza.send'    def handle(self) -> 'None':        # The connection to use        conn_name = 'Akenza'        # Get a connection ..        with self.out.wsx[conn_name].conn.client() as client:            # .. and send data through it.            client.send('Hello')# ############################################################################################### ##############################################################################################

请注意,对于发送到Akenza的消息的响应将使用您的第一个服务的on_message_received方法接收。基于WebSockets的消息传递是异步的,通道是独立的。

现在,我们对Akenza和WebSockets的实时IoT连接有了一个完整的图景。我们能够建立持久的、响应迅速的资产连接,我们可以订阅和发送设备消息,从而构建使用强大的新兴技术的智能自动化和集成架构。


Leave a Reply

Your email address will not be published. Required fields are marked *