2022-03-21-带着问题看源码7-NodeRed中的context模块

1. 此模块的意义#

NodeRed context 模块是运行期做数据保存及共享的一种机制。当节点间有相同内容时,可以考虑将相同的内容放入 context 以实现共享,既减少了空间,也有利于修改。

2. NodeRed 此模块功能#

context 有三类:global、flow、node,其中 global 是随着 context 模块初始化时创建,全局唯一;flow context 以 flowId 为标识,为所有具有相同 flowId 的节点做记录;node context 以 flow:id 为标识。三者作用域不相同:全局、流、节点,不同流 context 数据不共享,不同节点 context 数据不共享。

context 支持以配置文件插件化方式实现功能:插件实现要求的接口,修改配置文件实现自定义功能。

3. NodeRed 中此模块的实现及参与者#

  1. global context
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function init(_settings) {
settings = _settings;
contexts = {};
stores = {};
storeList = [];
hasConfiguredStore = false;
var seed = settings.functionGlobalContext || {};
contexts['global'] = createContext("global",seed);
// create a default memory store - used by the unit tests that skip the full
// `load()` initialisation sequence.
// If the user has any stores configured, this will be disgarded
stores["_"] = new memory();
defaultStore = "memory";
}

global context 在初始化时创建,默认使用’memory’方式实现 context 的读写。contexts 对象保存了所有的 context

  1. flow context 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function getFlowContext(flowId,parentFlowId) {
if (contexts.hasOwnProperty(flowId)) {
return contexts[flowId];
}
var parentContext = contexts[parentFlowId];
if (!parentContext) {
parentContext = createRootContext();
contexts[parentFlowId] = parentContext;
// throw new Error("Flow "+flowId+" is missing parent context "+parentFlowId);
}
var newContext = createContext(flowId,undefined,parentContext);
contexts[flowId] = newContext;
return newContext;

}
  1. 节点 context 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
function getContext(nodeId, flowId) {
var contextId = nodeId;
if (flowId) {
contextId = nodeId+":"+flowId;
}
if (contexts.hasOwnProperty(contextId)) {
return contexts[contextId];
}
var newContext = createContext(contextId);

if (flowId) {
var flowContext = contexts[flowId];
if (!flowContext) {
// This is most likely due to a unit test for a node which doesn't
// initialise the flow properly.
// To keep things working, initialise the missing context.
// This *does not happen* in normal node-red operation
flowContext = createContext(flowId,undefined,createRootContext());
contexts[flowId] = flowContext;
}
Object.defineProperty(newContext, 'flow', {
value: flowContext
});
}
Object.defineProperty(newContext, 'global', {
value: contexts['global']
})
contexts[contextId] = newContext;
return newContext;
}

节点 context 是可以获取到 globle 和 flow context 的。

  1. 插件加载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
function load() {
return new Promise(function(resolve,reject) {
// load & init plugins in settings.contextStorage
var plugins = settings.contextStorage || {};
var defaultIsAlias = false;
var promises = [];
if (plugins && Object.keys(plugins).length > 0) {
var hasDefault = plugins.hasOwnProperty('default');
var defaultName;
for (var pluginName in plugins) {
if (plugins.hasOwnProperty(pluginName)) {
// "_" is a reserved name - do not allow it to be overridden
if (pluginName === "_") {
continue;
}
if (!/^[a-zA-Z0-9_]+$/.test(pluginName)) {
return reject(new Error(log._("context.error-invalid-module-name", {name:pluginName})));
}

// Check if this is setting the 'default' context to be a named plugin
if (pluginName === "default" && typeof plugins[pluginName] === "string") {
// Check the 'default' alias exists before initialising anything
if (!plugins.hasOwnProperty(plugins[pluginName])) {
return reject(new Error(log._("context.error-invalid-default-module", {storage:plugins["default"]})));
}
defaultIsAlias = true;
continue;
}
if (!hasDefault && !defaultName) {
defaultName = pluginName;
}
var plugin;
if (plugins[pluginName].hasOwnProperty("module")) {
// Get the provided config and copy in the 'approved' top-level settings (eg userDir)
var config = plugins[pluginName].config || {};
copySettings(config, settings);

if (typeof plugins[pluginName].module === "string") {
// This config identifies the module by name - assume it is a built-in one
// TODO: check it exists locally, if not, try to require it as-is
try {
plugin = require("./"+plugins[pluginName].module);
} catch(err) {
return reject(new Error(log._("context.error-loading-module2", {module:plugins[pluginName].module,message:err.toString()})));
}
} else {
// Assume `module` is an already-required module we can use
plugin = plugins[pluginName].module;
}
try {
// Create a new instance of the plugin by calling its module function
stores[pluginName] = plugin(config);
var moduleInfo = plugins[pluginName].module;
if (typeof moduleInfo !== 'string') {
if (moduleInfo.hasOwnProperty("toString")) {
moduleInfo = moduleInfo.toString();
} else {
moduleInfo = "custom";
}
}
log.info(log._("context.log-store-init", {name:pluginName, info:"module="+moduleInfo}));
} catch(err) {
return reject(new Error(log._("context.error-loading-module2",{module:pluginName,message:err.toString()})));
}
} else {
// Plugin does not specify a 'module'
return reject(new Error(log._("context.error-module-not-defined", {storage:pluginName})));
}
}
}

// Open all of the configured contexts
for (var plugin in stores) {
if (stores.hasOwnProperty(plugin)) {
promises.push(stores[plugin].open());
}
}
// There is a 'default' listed in the configuration
if (hasDefault) {
// If 'default' is an alias, point it at the right module - we have already
// checked that it exists. If it isn't an alias, then it will
// already be set to a configured store
if (defaultIsAlias) {
stores["_"] = stores[plugins["default"]];
defaultStore = plugins["default"];
} else {
stores["_"] = stores["default"];
defaultStore = "default";
}
} else if (defaultName) {
// No 'default' listed, so pick first in list as the default
stores["_"] = stores[defaultName];
defaultStore = defaultName;
defaultIsAlias = true;
} else {
// else there were no stores list the config object - fall through
// to below where we default to a memory store
storeList = ["memory"];
defaultStore = "memory";
}
hasConfiguredStore = true;
storeList = Object.keys(stores).filter(n=>!(defaultIsAlias && n==="default") && n!== "_");
} else {
// No configured plugins
log.info(log._("context.log-store-init", {name:"default", info:"module=memory"}));
promises.push(stores["_"].open())
storeList = ["memory"];
defaultStore = "memory";
}
return resolve(Promise.all(promises));
}).catch(function(err) {
throw new Error(log._("context.error-loading-module",{message:err.toString()}));
});
}

在 settings.js 中配置 Context Storage 变量 ,通过此函数加载模块。

  1. context 实例化 的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
function createContext(id,seed,parent) {
...

Object.defineProperties(obj, {
get: {
value: function(key, storage, callback) {
var context;
if (!callback && typeof storage === 'function') {
callback = storage;
storage = undefined;
}
if (callback && typeof callback !== 'function'){
throw new Error("Callback must be a function");
}
if (!validateContextKey(key)) {
var err = Error("Invalid context key");
if(callback) {
return callback(err);
} else {
throw err;
}
}
if (!Array.isArray(key)) {
var keyParts = util.parseContextStore(key);
key = keyParts.key;
if (!storage) {
storage = keyParts.store || "_";
}
var result = followParentContext(parent, key);
if (result) {
var [ctx, new_key] = result;
if (ctx && new_key) {
return ctx.get(new_key, storage, callback);
}
else {
if (callback) {
return callback(undefined);
}
else {
return undefined;
}
}
}
} else {
if (!storage) {
storage = "_";
}
}
context = getContextStorage(storage);

if (callback) {
if (!seed) {
context.get(scope,key,callback);
} else {
context.get(scope,key,function() {
if (arguments[0]) {
callback(arguments[0]);
return; // Seed is only set for global context - sourced from functionGlobalContext
var scope = id;
var obj = seed || {};
var seedKeys;
var insertSeedValues;
if (seed) {
seedKeys = Object.keys(seed);
insertSeedValues = function(keys,values) {
if (!Array.isArray(keys)) {
if (values[0] === undefined) {
try {
values[0] = util.getObjectProperty(seed,keys);
} catch(err) {
if (err.code === "INVALID_EXPR") {
throw err;
}
values[0] = undefined;
}
}
} else {
for (var i=0;i<keys.length;i++) {
if (values[i] === undefined) {
try {
values[i] = util.getObjectProperty(seed,keys[i]);
} catch(err) {
if (err.code === "INVALID_EXPR") {
throw err;
}
values[i] = undefined;
}
}
}
}
}
}
insertSeedValues(key,results);
} catch(err) {
callback.apply(err);
return
}
// Put the err arg back
results.unshift(undefined);
callback.apply(null,results);
});
}
} else {
// No callback, attempt to do this synchronously
var results = context.get(scope,key);
if (seed) {
if (Array.isArray(key)) {
insertSeedValues(key,results);
} else if (results === undefined){
try {
results = util.getObjectProperty(seed,key);
} catch(err) {
if (err.code === "INVALID_EXPR") {
throw err;
}
results = undefined;
}
}
}
return results;
}
}
},
set: {
value: function(key, value, storage, callback) {
var context;
if (!callback && typeof storage === 'function') {
callback = storage;
storage = undefined;
}
if (callback && typeof callback !== 'function'){
throw new Error("Callback must be a function");
}
if (!validateContextKey(key)) {
var err = Error("Invalid context key");
if(callback) {
return callback(err);
} else {
throw err;
}
}
if (!Array.isArray(key)) {
var keyParts = util.parseContextStore(key);
key = keyParts.key;
if (!storage) {
storage = keyParts.store || "_";
}
var result = followParentContext(parent, key);
if (result) {
var [ctx, new_key] = result;
if (ctx && new_key) {
return ctx.set(new_key, value, storage, callback);
}
else {
if (callback) {
return callback();
}
return undefined;
}
}
} else {
if (!storage) {
storage = "_";
}
}
context = getContextStorage(storage);

context.set(scope, key, value, callback);
}
},
keys: {
value: function(storage, callback) {
var context;
if (!storage && !callback) {
context = stores["_"];
} else {
if (typeof storage === 'function') {
callback = storage;
storage = "_";
}
if (callback && typeof callback !== 'function') {
throw new Error("Callback must // Seed is only set for global context - sourced from functionGlobalContext
var scope = id;
var obj = seed || {};
var seedKeys;
var insertSeedValues;
if (seed) {
seedKeys = Object.keys(seed);
insertSeedValues = function(keys,values) {
if (!Array.isArray(keys)) {
if (values[0] === undefined) {
try {
values[0] = util.getObjectProperty(seed,keys);
} catch(err) {
if (err.code === "INVALID_EXPR") {
throw err;
}
values[0] = undefined;
}
}
} else {
for (var i=0;i<keys.length;i++) {
if (values[i] === undefined) {
try {
values[i] = util.getObjectProperty(seed,keys[i]);
} catch(err) {
if (err.code === "INVALID_EXPR") {
throw err;
}
values[i] = undefined;
}
}
}
}
}
}
if (seed && settings.exportGlobalContextKeys !== false) {
if (callback) {
context.keys(scope, function(err,keys) {
callback(err,Array.from(new Set(seedKeys.concat(keys)).keys()));
});
} else {
var keys = context.keys(scope);
return Array.from(new Set(seedKeys.concat(keys)).keys())
}
} else {
return context.keys(scope, callback);
}
}
}
});
if (parent) {
Object.defineProperty(obj, "$parent", {
value: parent
});
}
return obj;
}

有三个方法 get、set、keys。

4. NodeRed 为什么这么设计,这种设计的优劣有哪些#

  1. 与 Log 模块类似,留下接口给外部扩展,虽然复杂度略高,但扩展性好
  2. NodeRed 中大量使用的插件机制,利用的 JS 的模块加载,实现起来比静态语言方便很多。
  3. 若将 context 外置,存在运行数据被修改的风险

5. 应用场景分析#

  1. 以文件形式存储,不会因进程关闭而影响 context,可对 context 进行恢复
  2. 以文件形式存储,可提供运行时在外部修改进程 context,比如直接修改文件,改变 context 变量
  3. 以 Restful 形式发送到服务端,服务端进行处理和存储。

6. 实践#

存储#

  1. settings.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
contextStorage: {
default: {
module: 'localfilesystem',
config: {
base: 'context', // the base directory to use
// default: "context"
dir: '/home/freeman/.node-red/', // the directory to create the base directory in
// default: settings.userDir
cache: false, // whether to cache contents in memory
// default: true
flushInterval: 30, // if cache is enabled, the minimum interval
// between writes to storage, in seconds. This
},
},
},

  1. flow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
[
{
"id": "cbd2b3b600a928eb",
"type": "tab",
"label": "流程 1",
"disabled": false,
"info": ""
},
{
"id": "e7b11ff31b0737d4",
"type": "inject",
"z": "cbd2b3b600a928eb",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": "0",
"topic": "",
"payloadType": "date",
"x": 120,
"y": 80,
"wires": [
[
"216def814448f2b8"
]
]
},
{
"id": "216def814448f2b8",
"type": "function",
"z": "cbd2b3b600a928eb",
"name": "",
"func": "var processSwapInt = function processSwapInt(id, buffer) {\n let wordlength = 2; //一个字占2字节\n \n var buf = Buffer.alloc(4);\n buf[0] = buffer[id * wordlength + 2];\n buf[1] = buffer[id * wordlength + 3];\n buf[2] = buffer[id * wordlength + 0];\n buf[3] = buffer[id * wordlength + 1];\n return buf.readIntBE(0,4);\n}\n\nvar processNoSwapInt = function (id, buffer) {\n let wordlength = 2; //一个字占2字节\n let value;\n var buf = buffer.subarray(id * wordlength, id * wordlength + 4);\n return buf.readIntBE(0,4);\n}\n\nvar processSwapFloat = function (id, buffer){\n let wordlength = 2; //一个字占2字节\n var buf = Buffer.alloc(4);\n buf[0] = msg.payload.buffer[id * wordlength + 2];\n buf[1] = msg.payload.buffer[id * wordlength + 3];\n buf[2] = msg.payload.buffer[id * wordlength + 0];\n buf[3] = msg.payload.buffer[id * wordlength + 1];\n return buf.readFloatBE(0);\n}\n\nvar processNoSwapFloat = function (id, buffer){\n let wordlength = 2; //一个字占2字节\n\tvar buf = buffer.subarray(id*wordlength,id*wordlength+4);\n\t\n\treturn buf.readFloatBE(0);\n}\n\nvar processNoSwapShort = function (id, buffer){\n let wordlength = 2; //一个字占2字节\n var buf = buffer.subarray(id * wordlength, id * wordlength + 2);\n return buf.readInt16BE(0,2);\n}\n\nglobal.set('processSwapInt', processSwapInt);\nglobal.set('processNoSwapInt', processNoSwapInt);\n\nglobal.set('processSwapFloat', processNoSwapFloat);\nglobal.set('processNoSwapFloat', processNoSwapFloat);\n\nglobal.set('processNoSwapShort', processNoSwapShort);\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 350,
"y": 80,
"wires": [
[
"3f820a34fd1a8f6d"
]
]
},
{
"id": "3f820a34fd1a8f6d",
"type": "debug",
"z": "cbd2b3b600a928eb",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 640,
"y": 80,
"wires": []
},
{
"id": "5d1a746cb00206ce",
"type": "function",
"z": "cbd2b3b600a928eb",
"name": "",
"func": "\n\nreturn new Promise((reslove,reject)=>{\n global.get('processNoSwapInt',function(error, processNoSwapInt){\n \n var buf = Buffer.alloc(4);\n buf[0] = 1;\n buf[1] = 2;\n buf[2] = 3;\n buf[3] = 4;\n \n if(processNoSwapInt){\n msg.payload = processNoSwapInt(0, buf)\n }\n reslove(msg);\n })\n \n})\n\n ",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 300,
"y": 320,
"wires": [
[
"7dc9a6ff3a7be0b0"
]
]
},
{
"id": "dc0bde4b0474c180",
"type": "inject",
"z": "cbd2b3b600a928eb",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 110,
"y": 320,
"wires": [
[
"5d1a746cb00206ce"
]
]
},
{
"id": "7dc9a6ff3a7be0b0",
"type": "debug",
"z": "cbd2b3b600a928eb",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 540,
"y": 320,
"wires": []
}
]
  1. 源码(修改部分代码,实现函数的读写)
1
packages/node_modules/@node-red/runtime/lib/nodes/context/localfilesystem.js
  1. 结果

截图_选择区域_20220322155310

作者

lxmuyu

发布于

2022-03-21

更新于

2022-03-22

许可协议