-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathtransaction.ts
116 lines (108 loc) · 3.45 KB
/
transaction.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import { wrapMultiResult, noop } from "./utils";
import asCallback from "standard-as-callback";
import Pipeline from "./Pipeline";
import { Callback } from "./types";
import { ChainableCommander } from "./utils/RedisCommander";
export interface Transaction {
pipeline(commands?: unknown[][]): ChainableCommander;
multi(options: { pipeline: false }): Promise<"OK">;
multi(): ChainableCommander;
multi(options: { pipeline: true }): ChainableCommander;
multi(commands?: unknown[][]): ChainableCommander;
}
export function addTransactionSupport(redis) {
redis.pipeline = function (commands) {
const pipeline = new Pipeline(this);
if (Array.isArray(commands)) {
pipeline.addBatch(commands);
}
return pipeline;
};
const { multi } = redis;
redis.multi = function (commands, options) {
if (typeof options === "undefined" && !Array.isArray(commands)) {
options = commands;
commands = null;
}
if (options && options.pipeline === false) {
return multi.call(this);
}
const pipeline = new Pipeline(this);
// @ts-expect-error
pipeline.multi();
if (Array.isArray(commands)) {
pipeline.addBatch(commands);
}
const exec = pipeline.exec;
pipeline.exec = function (callback: Callback) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
return asCallback(
new Promise((resolve, reject) => {
this.redis.delayUntilReady((err) => {
if (err) {
reject(err);
return;
}
this.exec(pipeline).then(resolve, reject);
});
}),
callback
);
}
if (this._transactions > 0) {
exec.call(pipeline);
}
// Returns directly when the pipeline
// has been called multiple times (retries).
if (this.nodeifiedPromise) {
return exec.call(pipeline);
}
const promise = exec.call(pipeline);
return asCallback(
promise.then(function (result: any[]): any[] | null {
const execResult = result[result.length - 1];
if (typeof execResult === "undefined") {
throw new Error(
"Pipeline cannot be used to send any commands when the `exec()` has been called on it."
);
}
if (execResult[0]) {
execResult[0].previousErrors = [];
for (let i = 0; i < result.length - 1; ++i) {
if (result[i][0]) {
execResult[0].previousErrors.push(result[i][0]);
}
}
throw execResult[0];
}
return wrapMultiResult(execResult[1]);
}),
callback
);
};
// @ts-expect-error
const { execBuffer } = pipeline;
// @ts-expect-error
pipeline.execBuffer = function (callback: Callback) {
if (this._transactions > 0) {
execBuffer.call(pipeline);
}
return pipeline.exec(callback);
};
return pipeline;
};
const { exec } = redis;
redis.exec = function (callback: Callback): Promise<any[] | null> {
return asCallback(
exec.call(this).then(function (results: any[] | null) {
if (Array.isArray(results)) {
results = wrapMultiResult(results);
}
return results;
}),
callback
);
};
}