Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Request Queue Behavior #6 #860

Merged
merged 7 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/controller/helpers/request.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {SendRequestWhen, SendPolicy} from '../tstype';
import {SendPolicy} from '../tstype';
import * as Zcl from '../../zcl';

/* eslint-disable-next-line @typescript-eslint/no-explicit-any*/
Expand Down Expand Up @@ -34,16 +34,14 @@ class Request<Type = any> {
frame: Zcl.ZclFrame;
expires: number;
sendPolicy: SendPolicy;
sendWhen: SendRequestWhen;
private resolveQueue: Array<(value: Type) => void>;
private rejectQueue: Array <(error: Error) => void>;
private lastError: Error;
constructor (func: (frame: Zcl.ZclFrame) => Promise<Type>, frame: Zcl.ZclFrame, timeout: number,
sendWhen?: SendRequestWhen, sendPolicy?: SendPolicy, lastError?: Error,
sendPolicy?: SendPolicy, lastError?: Error,
resolve?:(value: Type) => void, reject?: (error: Error) => void) {
this.func = func;
this.frame = frame;
this.sendWhen = sendWhen ?? 'active',
this.expires = timeout + Date.now();
this.sendPolicy = sendPolicy ?? (typeof frame.getCommand !== 'function' ?
undefined : Request.defaultSendPolicy[frame.getCommand().ID]);
Expand Down
2 changes: 1 addition & 1 deletion src/controller/helpers/requestQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RequestQueue extends Set<Request> {
`${this.size}, ${fastPolling})`);

for (const request of this) {
if (fastPolling || (request.sendWhen !== 'fastpoll' && request.sendPolicy !== 'bulk')) {
if (fastPolling || request.sendPolicy !== 'bulk') {
try {
const result = await request.send();
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send success`);
Expand Down
64 changes: 20 additions & 44 deletions src/controller/model/device.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {KeyValue, DatabaseEntry, DeviceType, SendRequestWhen} from '../tstype';
import {KeyValue, DatabaseEntry, DeviceType} from '../tstype';
import {Events as AdapterEvents} from '../../adapter';
import ZclTransactionSequenceNumber from '../helpers/zclTransactionSequenceNumber';
import Endpoint from './endpoint';
Expand Down Expand Up @@ -53,7 +53,6 @@ class Device extends Entity {
private _skipDefaultResponse: boolean;
private _skipTimeResponse: boolean;
private _deleted: boolean;
private _defaultSendRequestWhen?: SendRequestWhen;
private _lastDefaultResponseSequenceNumber: number;
private _checkinInterval: number;
private _pendingRequestTimeout: number;
Expand Down Expand Up @@ -102,10 +101,6 @@ class Device extends Entity {
set skipDefaultResponse(skipDefaultResponse: boolean) {this._skipDefaultResponse = skipDefaultResponse;}
get skipTimeResponse(): boolean {return this._skipTimeResponse;}
set skipTimeResponse(skipTimeResponse: boolean) {this._skipTimeResponse = skipTimeResponse;}
get defaultSendRequestWhen(): SendRequestWhen {return this._defaultSendRequestWhen;}
set defaultSendRequestWhen(defaultSendRequestWhen: SendRequestWhen) {
this._defaultSendRequestWhen = defaultSendRequestWhen;
}
get checkinInterval(): number {return this._checkinInterval;}
get pendingRequestTimeout(): number {return this._pendingRequestTimeout;}
set pendingRequestTimeout(pendingRequestTimeout: number) {this._pendingRequestTimeout = pendingRequestTimeout;}
Expand Down Expand Up @@ -137,7 +132,7 @@ class Device extends Entity {
manufacturerID: number, endpoints: Endpoint[], manufacturerName: string,
powerSource: string, modelID: string, applicationVersion: number, stackVersion: number, zclVersion: number,
hardwareVersion: number, dateCode: string, softwareBuildID: string, interviewCompleted: boolean, meta: KeyValue,
lastSeen: number, defaultSendRequestWhen: SendRequestWhen, checkinInterval: number,
lastSeen: number, checkinInterval: number,
pendingRequestTimeout: number
) {
super();
Expand All @@ -162,7 +157,6 @@ class Device extends Entity {
this._skipTimeResponse = false;
this.meta = meta;
this._lastSeen = lastSeen;
this._defaultSendRequestWhen = defaultSendRequestWhen;
this._checkinInterval = checkinInterval;
this._pendingRequestTimeout = pendingRequestTimeout;
}
Expand Down Expand Up @@ -270,12 +264,12 @@ class Device extends Entity {
fastPollTimeout: 0,
};
debug.log(`check-in from ${this.ieeeAddr}: accepting fast-poll`);
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendWhen: 'immediate'});
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendPolicy: 'immediate'});

// This is a good time to read the checkin interval if we haven't stored it previously
if (this._checkinInterval === undefined) {
const pollPeriod =
await endpoint.read('genPollCtrl', ['checkinInterval'], {sendWhen: 'immediate'});
await endpoint.read('genPollCtrl', ['checkinInterval'], {sendPolicy: 'immediate'});
this._checkinInterval = pollPeriod.checkinInterval / 4; // convert to seconds
this.pendingRequestTimeout = this._checkinInterval * 1000; // milliseconds
debug.log(`Request Queue (${
Expand All @@ -285,14 +279,14 @@ class Device extends Entity {
// We *must* end fast-poll when we're done sending things. Otherwise
// we cause undue power-drain.
debug.log(`check-in from ${this.ieeeAddr}: stopping fast-poll`);
await endpoint.command(frame.Cluster.ID, 'fastPollStop', {}, {sendWhen: 'immediate'});
await endpoint.command(frame.Cluster.ID, 'fastPollStop', {}, {sendPolicy: 'immediate'});
} else {
const payload = {
startFastPolling: false,
fastPollTimeout: 0,
};
debug.log(`check-in from ${this.ieeeAddr}: declining fast-poll`);
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendWhen: 'immediate'});
await endpoint.command(frame.Cluster.ID, 'checkinRsp', payload, {sendPolicy: 'immediate'});
}
} catch (error) {
/* istanbul ignore next */
Expand Down Expand Up @@ -344,19 +338,6 @@ class Device extends Entity {
throw new Error('Cannot load device from group');
}

let defaultSendRequestWhen: SendRequestWhen = entry.defaultSendRequestWhen;
/* istanbul ignore next */
if (defaultSendRequestWhen == null) {
// Guess defaultSendRequestWhen based on old useImplicitCheckin/defaultSendWhenActive
if (entry.hasOwnProperty('useImplicitCheckin') && !entry.useImplicitCheckin) {
defaultSendRequestWhen = 'fastpoll';
} else if (entry.hasOwnProperty('defaultSendWhenActive') && entry.defaultSendWhenActive) {
defaultSendRequestWhen = 'active';
} else {
defaultSendRequestWhen = 'immediate';
}
}

// default: no timeout (messages expire immediately after first send attempt)
let pendingRequestTimeout = 0;
if((endpoints.filter((e): boolean => e.supportsInputCluster('genPollCtrl'))).length > 0) {
Expand All @@ -374,7 +355,7 @@ class Device extends Entity {
entry.id, entry.type, ieeeAddr, networkAddress, entry.manufId, endpoints,
entry.manufName, entry.powerSource, entry.modelId, entry.appVersion,
entry.stackVersion, entry.zclVersion, entry.hwVersion, entry.dateCode, entry.swBuildId,
entry.interviewCompleted, meta, entry.lastSeen || null, defaultSendRequestWhen, entry.checkinInterval,
entry.interviewCompleted, meta, entry.lastSeen || null, entry.checkinInterval,
pendingRequestTimeout
);
}
Expand All @@ -392,8 +373,7 @@ class Device extends Entity {
modelId: this.modelID, epList, endpoints, appVersion: this.applicationVersion,
stackVersion: this.stackVersion, hwVersion: this.hardwareVersion, dateCode: this.dateCode,
swBuildId: this.softwareBuildID, zclVersion: this.zclVersion, interviewCompleted: this.interviewCompleted,
meta: this.meta, lastSeen: this.lastSeen, defaultSendRequestWhen: this.defaultSendRequestWhen,
checkinInterval: this.checkinInterval
meta: this.meta, lastSeen: this.lastSeen, checkinInterval: this.checkinInterval
};
}

Expand Down Expand Up @@ -461,7 +441,7 @@ class Device extends Entity {
const device = new Device(
ID, type, ieeeAddr, networkAddress, manufacturerID, endpointsMapped, manufacturerName,
powerSource, modelID, undefined, undefined, undefined, undefined, undefined, undefined,
interviewCompleted, {}, null, 'immediate', undefined, 0
interviewCompleted, {}, null, undefined, 0
);

Entity.database.insert(device.toDatabaseEntry());
Expand Down Expand Up @@ -612,7 +592,7 @@ class Device extends Entity {
try {
const endpoint = Endpoint.create(1, undefined, undefined, [], [], this.networkAddress, this.ieeeAddr);
const result = await endpoint.read('genBasic', ['modelId', 'manufacturerName'],
{sendWhen: 'immediate'});
{sendPolicy: 'immediate'});
Object.entries(result)
.forEach((entry) => Device.ReportablePropertiesMapping[entry[0]].set(entry[1], this));
} catch (error) {
Expand Down Expand Up @@ -663,7 +643,7 @@ class Device extends Entity {
try {
let result: KeyValue;
try {
result = await endpoint.read('genBasic', [key], {sendWhen: 'immediate'});
result = await endpoint.read('genBasic', [key], {sendPolicy: 'immediate'});
} catch (error) {
// Reading attributes can fail for many reason, e.g. it could be that device rejoins
// while joining like in:
Expand All @@ -673,7 +653,7 @@ class Device extends Entity {
debug.log(`Interview - first ${item.key} retrieval attempt failed, ` +
`retrying after 10 seconds...`);
await Wait(10000);
result = await endpoint.read('genBasic', [key], {sendWhen: 'immediate'});
result = await endpoint.read('genBasic', [key], {sendPolicy: 'immediate'});
} else {
throw error;
}
Expand All @@ -698,14 +678,15 @@ class Device extends Entity {
for (const endpoint of this.endpoints.filter((e): boolean => e.supportsInputCluster('ssIasZone'))) {
debug.log(`Interview - IAS - enrolling '${this.ieeeAddr}' endpoint '${endpoint.ID}'`);

const stateBefore = await endpoint.read('ssIasZone', ['iasCieAddr', 'zoneState'], {sendWhen: 'immediate'});
const stateBefore = await endpoint.read(
'ssIasZone', ['iasCieAddr', 'zoneState'], {sendPolicy: 'immediate'});
debug.log(`Interview - IAS - before enrolling state: '${JSON.stringify(stateBefore)}'`);

// Do not enroll when device has already been enrolled
if (stateBefore.zoneState !== 1 || stateBefore.iasCieAddr !== coordinator.ieeeAddr) {
debug.log(`Interview - IAS - not enrolled, enrolling`);

await endpoint.write('ssIasZone', {'iasCieAddr': coordinator.ieeeAddr}, {sendWhen: 'immediate'});
await endpoint.write('ssIasZone', {'iasCieAddr': coordinator.ieeeAddr}, {sendPolicy: 'immediate'});
debug.log(`Interview - IAS - wrote iasCieAddr`);

// There are 2 enrollment procedures:
Expand All @@ -717,14 +698,14 @@ class Device extends Entity {
await Wait(500);
debug.log(`IAS - '${this.ieeeAddr}' sending enroll response (auto enroll)`);
const payload = {enrollrspcode: 0, zoneid: 23};
await endpoint.command('ssIasZone', 'enrollRsp', payload,
{disableDefaultResponse: true, sendWhen: 'immediate'});
await endpoint.command('ssIasZone', 'enrollRsp', payload,
{disableDefaultResponse: true, sendPolicy: 'immediate'});

let enrolled = false;
for (let attempt = 0; attempt < 20; attempt++) {
await Wait(500);
const stateAfter = await endpoint.read('ssIasZone', ['iasCieAddr', 'zoneState'],
{sendWhen: 'immediate'});
const stateAfter = await endpoint.read('ssIasZone', ['iasCieAddr', 'zoneState'],
{sendPolicy: 'immediate'});
debug.log(`Interview - IAS - after enrolling state (${attempt}): '${JSON.stringify(stateAfter)}'`);
if (stateAfter.zoneState === 1) {
enrolled = true;
Expand All @@ -749,14 +730,9 @@ class Device extends Entity {
for (const endpoint of this.endpoints.filter((e): boolean => e.supportsInputCluster('genPollCtrl'))) {
debug.log(`Interview - Poll control - binding '${this.ieeeAddr}' endpoint '${endpoint.ID}'`);
await endpoint.bind('genPollCtrl', coordinator.endpoints[0]);
const pollPeriod = await endpoint.read('genPollCtrl', ['checkinInterval'], {sendWhen: 'immediate'});
const pollPeriod = await endpoint.read('genPollCtrl', ['checkinInterval'], {sendPolicy: 'immediate'});
this._checkinInterval = pollPeriod.checkinInterval / 4; // convert to seconds
this.pendingRequestTimeout = this._checkinInterval * 1000; // milliseconds
if (pollPeriod.checkinInterval <= 2400) {// 10 minutes
this.defaultSendRequestWhen = 'fastpoll';
} else {
this.defaultSendRequestWhen = 'active';
}
}
} catch (error) {
/* istanbul ignore next */
Expand Down
24 changes: 17 additions & 7 deletions src/controller/model/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,31 @@ class Endpoint extends Entity {
options.disableResponse, options.disableRecovery, options.srcEndpoint) as Promise<Type>;
}): Promise<Type> {
const logPrefix = `Request Queue (${this.deviceIeeeAddress}/${this.ID}): `;
const request = new Request(func, frame, this.getDevice().pendingRequestTimeout, options.sendWhen,
options.sendPolicy);

if(options.sendWhen) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also remove this part? (maybe in a next PR?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely! And yes, that will be another PR:-)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for your dedication 😄

if ((options.sendWhen === 'immediate') && (this.getDevice().pendingRequestTimeout > 0)) {
debug.info (logPrefix
+ "sendWhen is deprecated. Interpreting sendwhen='immediate' as sendPolicy='immediate'");
options.sendPolicy = 'immediate';
} else {
debug.info (logPrefix + "sendWhen is deprecated and will be ignored.");
}
}

const request = new Request(func, frame, this.getDevice().pendingRequestTimeout, options.sendPolicy);

if (request.sendPolicy !== 'bulk') {
// Check if such a request is already in the queue and remove the old one(s) if necessary
this.pendingRequests.filter(request);
}

// send without queueing if sendWhen or sendPolicy is 'immediate' or if the device has no timeout set
if (request.sendWhen === 'immediate' || request.sendPolicy === 'immediate'
// send without queueing if sendPolicy is 'immediate' or if the device has no timeout set
if (request.sendPolicy === 'immediate'
|| !this.getDevice().pendingRequestTimeout) {
if (this.getDevice().defaultSendRequestWhen !=='immediate')
if (this.getDevice().pendingRequestTimeout > 0)
{
debug.info(logPrefix + `send ${frame.getCommand().name} request immediately ` +
`(sendWhen=${options.sendWhen})`);
`(sendPolicy=${options.sendPolicy})`);
}
return request.send();
}
Expand Down Expand Up @@ -820,7 +830,7 @@ class Endpoint extends Entity {
): Options {
const providedOptions = options || {};
return {
sendWhen: this.getDevice().defaultSendRequestWhen,
sendWhen: undefined,
timeout: 10000,
disableResponse: false,
disableRecovery: false,
Expand Down
Loading