2023-07-21-nodeRed-源码分析10-从node.send到'input'回调
1. 功能概述#
Node-RED 中消息的流转是通过连接不同的节点来完成的。当一个节点接收到消息时,它会将其处理后发送到下一个节点。下一个节点再对该消息进行处理,最终将结果传递到下一个节点,以此类推,直至消息到达最终节点完成处理。
在 Node-RED 中,消息可由源节点(如 mqtt input node、http input node)生成,也可由其他节点产生(如 function node、template node 等)。对于每一个消息,Node-RED 需要确定它应该发送到哪个节点。这个过程是基于节点之间的连接来进行的。
连接可以直接从一个节点到另一个节点,也可以有中间节点传输。在流程执行过程中,Node-RED 会根据流程图的连线信息,按照一定的顺序发送消息,以达到预期的流程目标。
2. 涉及文件#
- packages/node_modules/@node-red/runtime/lib/flows/Flow.js
- packages/node_modules/@node-red/runtime/lib/nodes/Node.js
3. 代码分析#
3.1. 从 Node.prototype.send 说起#
在节点中,有消息发出是会调用 node.send,例:
1 | // respond to inputs.... |
函数将消息处理后转由 this._flow.send(sendEvents)
3.2. Flow.send#
这里涉及到一个概念:消息路由。
消息路由即通过路由规则动态规划消息的传输路径,使消息按照过滤条件,从消息源路由到目标节点。通过消息路由,可实现对数据路由的灵活控制和提高数据安全性。
主要有两种方式:
堆栈式
类 Express 的 middleware,每新增加一个处理模块,会在处理堆栈的顶端,最终形成一个消息处理堆栈。Hook 式
以 Hook(钩子)的形式实现,这需要在特定的节点处,增加对事件的回调,在有事件触发时,按序执行回调。
堆栈式有一个问题,当以动态方式增加处理模块后,在不需要处理模块的情况下,删除增加的处理模块需要恢复原有的堆栈,增加了处理复杂度。以 Hook 方式动态的增加删除 Hook 不会影响原有消息路由。
这种选择也是根据 NodeRed 实际情况而来,在原有消息的传递机制上增加”关键点“的方式好实现。
NodeRed 实现了以下钩子函数:
1 | // onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object. |
在这个过程中有以下注意的点:
- 消息拷贝
在 Node-RED 中,通过消息拷贝来创建一个新的消息对象,目的可以是为了防止更改原始消息,或者为了将消息传递给多个节点。 - 异步消息传递与同步消息传递
在 Node-RED 中,消息传递有两种方式:同步和异步。- 同步消息传递
同步消息传递是指当一个节点收到消息后,先处理该消息,然后再将结果传递给下一个节点。在同步消息传递的情况下,下一个节点只有在当前节点处理完成后才能继续执行。
Node-RED 中大部分节点都使用同步消息传递方式,包括最常用的 function 和 switch 节点。在同步消息传递的情况下,消息传递的时序是可预测的,可以保证下一个节点执行时,前一个节点已经完成了处理。 - 异步消息传递
异步消息传递是指当一个节点收到消息后,不会立即处理该消息,而是将消息存储起来,然后立即返回。在异步消息传递的情况下,下一个节点可以在当前节点处理完成前继续执行。
Node-RED 中一些节点使用异步消息传递方式,例如 delay 和 template 节点等。在异步消息传递的情况下,消息传递的时序是不可预测的,需要在下一个节点中进行特殊处理,例如等待当前节点完成处理。
在异步消息传递的情况下,需要注意的是,由于下一个节点可以在当前节点处理完成前继续执行,因此需要保证消息的顺序性和一致性。如果消息的顺序和一致性不能得到保证,可能会导致消息处理错误或者数据不一致的情况发生。
- 同步消息传递
3.3. Node.proto.receive#
在 PreDeliver 钩子处理完成后,消息会由目的地节点处理,这里调用 Node.proto.receive
1 | function handlePreDeliver(flow,sendEvent, reportError) { |
在这里,我们看到了熟悉的‘input’事件
1 | if (!msg) { |
3.4. Node.prototype.emit#
经过一层封装,我们进到 Node.prototype._emitInput
1 | var node = this; |
3.5. Node.prototype._emitInput#
在这里,进入到.on(‘input’,callback)所配置的回调函数中,实现消息由上一节点到下一节点的流转。
1 | ... |
4. 总结#
消息路由是 NodeRed 中的重要一部分,通过 Hook 方式在消息生命周期节点上进行扩展, 增强了系统可扩展性。
2023-07-21-nodeRed-源码分析10-从node.send到'input'回调
https://blog.buqia.fun/2023/07/21/2023-07-21-nodeRed-源码分析10-从node.send到'input'回调/