2022-03-09-带着问题看源码4-NodeRed中消息路由实现

NodeRed 提供了一个 Hook 模块,此模块向消息生命周期的节点注册,控制消息的流转。

1. 消息路由是什么? 有几种实现方式 ?#

消息路由即通过路由规则动态规划消息的传输路径,使消息按照过滤条件,从消息源路由到目标节点。通过消息路由,可实现对数据路由的灵活控制和提高数据安全性。

主要有两种方式:

  1. 堆栈式
    类 Express 的 middleware,每新增加一个处理模块,会在处理堆栈的顶端,最终形成一个消息处理堆栈。

  2. Hook 式
    以 Hook(钩子)的形式实现,这需要在特定的节点处,增加对事件的回调,在有事件触发时,按序执行回调。

2. NodeRed 中消息如何流转#

2.1. NodeRed 消息流转图#

message-router-events
从消息发送到处理完毕,有以下几个事件:

  1. onSend

  2. preRoute

  3. preDeliver

  4. postDeliver

  5. onReceive

  6. postReceive

  7. onComplete

2.2. SendEvent object#

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"msg": "<message object>",
"source": {
"id": "<node-id>",
"node": "<node-object>",
"port": "<index of port being sent on>",
},
"destination": {
"id": "<node-id>",
"node": undefined,
},
"cloneMessage": "true|false"
}

2.3. ReceiveEvent object#

1
2
3
4
5
6
7
{
"msg": "<message object>",
"destination": {
"id": "<node-id>",
"node": "<node-object>",
}
}

2.4. CompleteEvent object#

1
2
3
4
5
6
7
8
{
"msg": "<message object>",
"node": {
"id": "<node-id>",
"node": "<node-object>"
},
"error": "<error passed to done, otherwise, undefined>"
}

2.5. 有哪些参与者#

2.5.1. packages/node_modules/@node-red/util/lib/hooks.js#

  1. 新增 Hook(以链表存储 Hook )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function add(hookId, callback) {
...
let tailItem = hooks[id];
if (tailItem === undefined) {
hooks[id] = hookItem;
} else {
while(tailItem.nextHook !== null) {
tailItem = tailItem.nextHook
}
tailItem.nextHook = hookItem;
hookItem.previousHook = tailItem;
}

if (label) {
labelledHooks[label] = labelledHooks[label]||{};
labelledHooks[label][id] = hookItem;
}

...
}
  1. 触发消息流
    支持回调方式,Promise 方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
function trigger(hookId, payload, done) {
let hookItem = hooks[hookId];
if (!hookItem) { //没有钩子执行回调或返回
if (done) {
done();
return;
} else {
return Promise.resolve();
}
}
if (!done) { //有钩子没有回调,以微任务方式处理所有Hook
return new Promise((resolve,reject) => {
invokeStack(hookItem,payload,function(err) {
if (err !== undefined && err !== false) {
if (!(err instanceof Error)) {
err = new Error(err);
}
err.hook = hookId
reject(err);
} else {
resolve(err);
}
})
});
} else { //有钩子有回调,处理所有Hook
invokeStack(hookItem,payload,done)
}
}
  1. 处理 Hook
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
function invokeStack(hookItem,payload,done) {
function callNextHook(err) {
if (!hookItem || err) {
done(err);
return;
}
if (hookItem.removed) {
hookItem = hookItem.nextHook;
callNextHook();
return;
}
const callback = hookItem.cb;
if (callback.length === 1) { //回调函数一个参数
try {
let result = callback(payload);
if (result === false) {
// Halting the flow
done(false);
return
}
if (result && typeof result.then === 'function') {
result.then(handleResolve, callNextHook)
return;
}
hookItem = hookItem.nextHook;
callNextHook();
} catch(err) {
done(err);
return;
}
} else { //回调函数两个参数
try {
callback(payload,handleResolve)
} catch(err) {
done(err);
return;
}
}
}
function handleResolve(result) {
if (result === undefined) {
hookItem = hookItem.nextHook;
callNextHook();
} else {
done(result);
}
}
callNextHook();
}

2.5.2. packages/node_modules/@node-red/runtime/lib/flows/Flow.js#

2.6. 流程哪些#

%% 配置
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#FFFFFF'}}}%%
sequenceDiagram
autonumber

%%实现
title: NodeRed中的节点模块管理-初始阶段

participant flow
participant hooks
participant node
flow ->> flow: send消息数组,调用handleOnSend
flow ->> hooks: hooks.trigger("onSend"
hooks -->> flow: 有异常返回异常,无异常继续消息传递
flow ->> hooks: hooks.trigger("preRoute"
hooks -->> flow: 有异常返回异常,无异常继续消息传递
flow -->> flow: 拷贝消息,准备路由handlePreDeliver
flow ->> hooks: hooks.trigger("preDeliver"
flow -->> flow: 拷贝消息,准备路由handlePreDeliver
flow ->> hooks: hooks.trigger("postDeliver"
hooks -->> flow: 有异常返回异常,无异常继续消息传递
flow ->> node: 调用receive方法,发送Input事件
node ->> hooks: hooks.trigger("onReceive"
hooks -->> node: 有异常返回异常,无异常继续消息传递
node ->> hooks: hooks.trigger("postReceive"
node ->> node: node._complete onComplete
hooks -->> node: 有异常返回异常,无异常结束消息传

3. NodeRed 为什么这么设计,这种设计的优劣有哪些#

堆栈式有一个问题,当以动态方式增加处理模块后,在不需要处理模块的情况下,删除增加的处理模块需要恢复原有的堆栈,增加了处理复杂度。以 Hook 方式动态的增加删除 Hook 不会影响原有消息路由。

这种选择也是根据 NodeRed 实际情况而来,在原有消息的传递机制上增加”关键点“的方式好实现。

4. Hook 模块与其他模块的关系#

Node、Flow 模块调用 Hook 模块,触发 Hook 流转。
自定义模块调用 Hook 模块,增加 Hook 处理。

作者

lxmuyu

发布于

2022-03-09

更新于

2022-03-11

许可协议