2023-07-21-nodeRed-源码分析10-从node.send到'input'回调

1. 功能概述#

Node-RED 中消息的流转是通过连接不同的节点来完成的。当一个节点接收到消息时,它会将其处理后发送到下一个节点。下一个节点再对该消息进行处理,最终将结果传递到下一个节点,以此类推,直至消息到达最终节点完成处理。

在 Node-RED 中,消息可由源节点(如 mqtt input node、http input node)生成,也可由其他节点产生(如 function node、template node 等)。对于每一个消息,Node-RED 需要确定它应该发送到哪个节点。这个过程是基于节点之间的连接来进行的。

连接可以直接从一个节点到另一个节点,也可以有中间节点传输。在流程执行过程中,Node-RED 会根据流程图的连线信息,按照一定的顺序发送消息,以达到预期的流程目标。

2. 涉及文件#

  • packages/node_modules/@node-red/runtime/lib/flows/Flow.js
  • packages/node_modules/@node-red/runtime/lib/nodes/Node.js

3. 代码分析#

3.1. 从 Node.prototype.send 说起#

在节点中,有消息发出是会调用 node.send,例:

1
2
3
4
5
6
// respond to inputs....
this.on('input', function (msg) {
node.warn("I saw a payload: "+msg.payload);
// in this example just send it straight on... should process it here really
node.send(msg);
});

函数将消息处理后转由 this._flow.send(sendEvents)

3.2. Flow.send#

这里涉及到一个概念:消息路由。
消息路由即通过路由规则动态规划消息的传输路径,使消息按照过滤条件,从消息源路由到目标节点。通过消息路由,可实现对数据路由的灵活控制和提高数据安全性。

主要有两种方式:

  1. 堆栈式
    类 Express 的 middleware,每新增加一个处理模块,会在处理堆栈的顶端,最终形成一个消息处理堆栈。

  2. Hook 式
    以 Hook(钩子)的形式实现,这需要在特定的节点处,增加对事件的回调,在有事件触发时,按序执行回调。

堆栈式有一个问题,当以动态方式增加处理模块后,在不需要处理模块的情况下,删除增加的处理模块需要恢复原有的堆栈,增加了处理复杂度。以 Hook 方式动态的增加删除 Hook 不会影响原有消息路由。

这种选择也是根据 NodeRed 实际情况而来,在原有消息的传递机制上增加”关键点“的方式好实现。

NodeRed 实现了以下钩子函数:

1
2
3
4
5
6
7
// onSend - passed an array of SendEvent objects. The messages inside these objects are exactly what the node has passed to node.send - meaning there could be duplicate references to the same message object.
// preRoute - called once for each SendEvent object in turn
// preDeliver - the local router has identified the node it is going to send to. At this point, the message has been cloned if needed.
// postDeliver - the message has been dispatched to be delivered asynchronously (unless the sync delivery flag is set, in which case it would be continue as synchronous delivery)
// onReceive - a node is about to receive a message
// postReceive - the message has been passed to the node's input handler
// onDone, onError - the node has completed with a message or logged an error

在这个过程中有以下注意的点:

  1. 消息拷贝
    在 Node-RED 中,通过消息拷贝来创建一个新的消息对象,目的可以是为了防止更改原始消息,或者为了将消息传递给多个节点。
  2. 异步消息传递与同步消息传递
    在 Node-RED 中,消息传递有两种方式:同步和异步。
    1. 同步消息传递
      同步消息传递是指当一个节点收到消息后,先处理该消息,然后再将结果传递给下一个节点。在同步消息传递的情况下,下一个节点只有在当前节点处理完成后才能继续执行。
      Node-RED 中大部分节点都使用同步消息传递方式,包括最常用的 function 和 switch 节点。在同步消息传递的情况下,消息传递的时序是可预测的,可以保证下一个节点执行时,前一个节点已经完成了处理。
    2. 异步消息传递
      异步消息传递是指当一个节点收到消息后,不会立即处理该消息,而是将消息存储起来,然后立即返回。在异步消息传递的情况下,下一个节点可以在当前节点处理完成前继续执行。
      Node-RED 中一些节点使用异步消息传递方式,例如 delay 和 template 节点等。在异步消息传递的情况下,消息传递的时序是不可预测的,需要在下一个节点中进行特殊处理,例如等待当前节点完成处理。
      在异步消息传递的情况下,需要注意的是,由于下一个节点可以在当前节点处理完成前继续执行,因此需要保证消息的顺序性和一致性。如果消息的顺序和一致性不能得到保证,可能会导致消息处理错误或者数据不一致的情况发生。

3.3. Node.proto.receive#

在 PreDeliver 钩子处理完成后,消息会由目的地节点处理,这里调用 Node.proto.receive

1
2
3
4
5
function handlePreDeliver(flow,sendEvent, reportError) {
...
sendEvent.destination.node.receive
...
}

在这里,我们看到了熟悉的‘input’事件

1
2
3
4
5
6
7
if (!msg) {
msg = {};
}
if (!msg._msgid) {
msg._msgid = redUtil.generateId();
}
this.emit("input",msg);

3.4. Node.prototype.emit#

经过一层封装,我们进到 Node.prototype._emitInput

1
2
3
4
5
6
var node = this;
if (event === "input") {
this._emitInput.apply(this,args);
} else {
this._emit.apply(this,arguments);
}

3.5. Node.prototype._emitInput#

在这里,进入到.on(‘input’,callback)所配置的回调函数中,实现消息由上一节点到下一节点的流转。

1
2
3
4
5
6
...
node._inputCallback(
arg,
function() { node.send.apply(node,arguments) },
function(err) { node._complete(arg,err); }
...

4. 总结#

消息路由是 NodeRed 中的重要一部分,通过 Hook 方式在消息生命周期节点上进行扩展, 增强了系统可扩展性。

作者

lxmuyu

发布于

2023-07-21

更新于

2023-07-21

许可协议