Skip to content

Commit

Permalink
fix: Use QOS1 for rarely sent discovery and availability messages (#1…
Browse files Browse the repository at this point in the history
…8756)

* Use QOS1 for rarely sent messages

Specifically discovery and availability depending on the set up, it's possible for these messages to be silently discarded by the broker.

Sending them as QOS1 means the mqtt library used will resend them until it gets an ACK.
Ensuring delivery is particularly important for messages that are rarely sent like discovery or availability as they are only sent when there are changes, and nondelivery or dropped messages for them impede proper functioning in home assistant.

* Fix tests for discovery and availability using QOS1
  • Loading branch information
ruifung committed Aug 27, 2023
1 parent 677db10 commit d1e50ce
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 80 deletions.
4 changes: 2 additions & 2 deletions lib/extension/availability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export default class Availability extends Extension {
override async start(): Promise<void> {
this.eventBus.onEntityRenamed(this, (data) => {
if (utils.isAvailabilityEnabledForEntity(data.entity, settings.get())) {
this.mqtt.publish(`${data.from}/availability`, null, {retain: true, qos: 0});
this.mqtt.publish(`${data.from}/availability`, null, {retain: true, qos: 1});
this.publishAvailability(data.entity, false, true);
}
});
Expand Down Expand Up @@ -164,7 +164,7 @@ export default class Availability extends Extension {
const topic = `${entity.name}/availability`;
const payload = utils.availabilityPayload(available ? 'online' : 'offline', settings.get());
this.availabilityCache[entity.ID] = available;
this.mqtt.publish(topic, payload, {retain: true, qos: 0});
this.mqtt.publish(topic, payload, {retain: true, qos: 1});

if (!skipGroups && entity.isDevice()) {
this.zigbee.groups().filter((g) => g.hasMember(entity))
Expand Down
10 changes: 5 additions & 5 deletions lib/extension/homeassistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ export default class HomeAssistant extends Extension {
@bind onDeviceRemoved(data: eventdata.DeviceRemoved): void {
logger.debug(`Clearing Home Assistant discovery topic for '${data.name}'`);
this.discovered[data.ieeeAddr]?.topics.forEach((topic) => {
this.mqtt.publish(topic, null, {retain: true, qos: 0}, this.discoveryTopic, false, false);
this.mqtt.publish(topic, null, {retain: true, qos: 1}, this.discoveryTopic, false, false);
});

delete this.discovered[data.ieeeAddr];
Expand Down Expand Up @@ -1049,7 +1049,7 @@ export default class HomeAssistant extends Extension {
if (data.homeAssisantRename) {
for (const config of this.getConfigs(data.entity)) {
const topic = this.getDiscoveryTopic(config, data.entity);
this.mqtt.publish(topic, null, {retain: true, qos: 0}, this.discoveryTopic, false, false);
this.mqtt.publish(topic, null, {retain: true, qos: 1}, this.discoveryTopic, false, false);
}

// Make sure Home Assistant deletes the old entity first otherwise another one (_2) is created
Expand Down Expand Up @@ -1422,7 +1422,7 @@ export default class HomeAssistant extends Extension {
}

const topic = this.getDiscoveryTopic(config, entity);
this.mqtt.publish(topic, stringify(payload), {retain: true, qos: 0}, this.discoveryTopic, false, false);
this.mqtt.publish(topic, stringify(payload), {retain: true, qos: 1}, this.discoveryTopic, false, false);
this.discovered[discoverKey].topics.add(topic);
this.discovered[discoverKey].objectIDs.add(config.object_id);
config.mockProperties?.forEach((mockProperty) =>
Expand Down Expand Up @@ -1482,7 +1482,7 @@ export default class HomeAssistant extends Extension {
if (clear) {
logger.debug(`Clearing Home Assistant config '${data.topic}'`);
const topic = data.topic.substring(this.discoveryTopic.length + 1);
this.mqtt.publish(topic, null, {retain: true, qos: 0}, this.discoveryTopic, false, false);
this.mqtt.publish(topic, null, {retain: true, qos: 1}, this.discoveryTopic, false, false);
}
} else if ((data.topic === this.statusTopic || data.topic === defaultStatusTopic) &&
data.message.toLowerCase() === 'online') {
Expand Down Expand Up @@ -1603,7 +1603,7 @@ export default class HomeAssistant extends Extension {
origin: this.discoveryOrigin,
};

await this.mqtt.publish(topic, stringify(payload), {retain: true, qos: 0}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, stringify(payload), {retain: true, qos: 1}, this.discoveryTopic, false, false);
this.discoveredTriggers[device.ieeeAddr].add(discoveredKey);
}

Expand Down
34 changes: 17 additions & 17 deletions test/availability.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ describe('Availability', () => {

it('Should publish availabilty on startup for device where it is enabled for', async () => {
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
expect(MQTT.publish).not.toHaveBeenCalledWith('zigbee2mqtt/bulb_color_2/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
});

it('Should publish offline for active device when not seen for 10 minutes', async () => {
Expand All @@ -77,7 +77,7 @@ describe('Availability', () => {
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(1);
expect(devices.bulb_color.ping).toHaveBeenNthCalledWith(1, true);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
});

it('Shouldnt do anything for a device when availability: false is set for device', async () => {
Expand All @@ -93,7 +93,7 @@ describe('Availability', () => {
await advancedTime(utils.hours(26));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
});

it('Should reset ping timer when device last seen changes for active device', async () => {
Expand All @@ -111,7 +111,7 @@ describe('Availability', () => {
expect(devices.bulb_color.ping).toHaveBeenCalledTimes(1);
expect(devices.bulb_color.ping).toHaveBeenNthCalledWith(1, true);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
});

it('Should ping again when first ping fails', async () => {
Expand All @@ -132,7 +132,7 @@ describe('Availability', () => {
expect(devices.bulb_color.ping).toHaveBeenNthCalledWith(1, true);
expect(devices.bulb_color.ping).toHaveBeenNthCalledWith(2, false);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
});

it('Should reset ping timer when device last seen changes for passive device', async () => {
Expand All @@ -150,21 +150,21 @@ describe('Availability', () => {
await advancedTime(utils.hours(3));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
});

it('Should immediately mark device as online when it lastSeen changes', async () => {
MQTT.publish.mockClear();

await advancedTime(utils.minutes(15));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));

devices.bulb_color.lastSeen = Date.now();
await zigbeeHerdsman.events.lastSeenChanged({device: devices.bulb_color});
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
});

it('Should allow to change availability timeout via device options', async () => {
Expand Down Expand Up @@ -283,12 +283,12 @@ describe('Availability', () => {
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_color/availability',
null, {retain: true, qos: 0}, expect.any(Function));
null, {retain: true, qos: 1}, expect.any(Function));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_new_name/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
await advancedTime(utils.hours(12));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/bulb_new_name/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
});

it('Should publish availabiltiy payload in JSON format', async () => {
Expand All @@ -298,7 +298,7 @@ describe('Availability', () => {
await advancedTime(utils.hours(26));
expect(devices.remote.ping).toHaveBeenCalledTimes(0);
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/remote/availability',
stringify({state: 'offline'}), {retain: true, qos: 0}, expect.any(Function));
stringify({state: 'offline'}), {retain: true, qos: 1}, expect.any(Function));
});

it('Deprecated - should allow to block via advanced.availability_blocklist', async () => {
Expand Down Expand Up @@ -332,16 +332,16 @@ describe('Availability', () => {
settings.set(['devices', devices.bulb_color_2.ieeeAddr, 'availability'], true);
await resetExtension();
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/group_tradfri_remote/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
MQTT.publish.mockClear();
await advancedTime(utils.minutes(12));
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/group_tradfri_remote/availability',
'offline', {retain: true, qos: 0}, expect.any(Function));
'offline', {retain: true, qos: 1}, expect.any(Function));
MQTT.publish.mockClear();
devices.bulb_color_2.lastSeen = Date.now();
await zigbeeHerdsman.events.lastSeenChanged({device: devices.bulb_color_2});
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledWith('zigbee2mqtt/group_tradfri_remote/availability',
'online', {retain: true, qos: 0}, expect.any(Function));
'online', {retain: true, qos: 1}, expect.any(Function));
});
});
Loading

0 comments on commit d1e50ce

Please sign in to comment.