Skip to content

Commit

Permalink
[mqtt.homie] handle exceptions parsing attributes (openhab#12254)
Browse files Browse the repository at this point in the history
fixes openhab#10711

technically this code is in mqtt.generic, but it's only used by Homie.

in particular, if an incoming string doesn't match an enum, this will now
just ignore the value instead of raising an exception to be caught somewhere inside
of Hive MQTT, and eventually timing out and logging that mandatory topics weren't
received, instead of logging a pointer to the actual problem. this makes it so that
if there's a homie $datatype openhab doesn't understand (like duration), it will be
able to get to the point of just choosing a string channel

also did some minor debug logging cleanup for mqtt:
 * fixed a typo
 * when logging homie device name from the thing handler, use the config deviceid,
   since we likely don't have the attributes from MQTT yet

Signed-off-by: Cody Cutrer <cody@cutrer.us>
Signed-off-by: Nick Waterton <n.waterton@outlook.com>
  • Loading branch information
ccutrer authored and NickWaterton committed Apr 27, 2022
1 parent bc7188c commit 1629a49
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public boolean isStateful() {
}

private void internalStop() {
logger.debug("Unsubscribed channel {} form topic: {}", this.channelUID, config.stateTopic);
logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
this.connection = null;
this.channelStateUpdateListener = null;
hasSubscribed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,35 @@ public void processMessage(String topic, byte[] payload) {
}

String valueStr = new String(payload, StandardCharsets.UTF_8);
String originalValueStr = valueStr;

// Check if there is a manipulation annotation attached to the field
final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
Object value;
if (transform != null) {
// Add a prefix/suffix to the value
valueStr = transform.prefix() + valueStr + transform.suffix();
// Split the value if the field is an array. Convert numbers/enums if necessary.
value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
: numberConvert(valueStr, field.getType());
} else if (field.getType().isArray()) {
throw new IllegalArgumentException("No split character defined!");
} else {
// Convert numbers/enums if necessary
value = numberConvert(valueStr, field.getType());
try {
final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
Object value;
if (transform != null) {
// Add a prefix/suffix to the value
valueStr = transform.prefix() + valueStr + transform.suffix();
// Split the value if the field is an array. Convert numbers/enums if necessary.
value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
: numberConvert(valueStr, field.getType());
} else if (field.getType().isArray()) {
throw new IllegalArgumentException("No split character defined!");
} else {
// Convert numbers/enums if necessary
value = numberConvert(valueStr, field.getType());
}
receivedValue = true;
changeConsumer.fieldChanged(field, value);
future.complete(null);
} catch (IllegalArgumentException e) {
if (mandatory) {
future.completeExceptionally(e);
} else {
logger.warn("Unable to interpret {} from topic {}", originalValueStr, topic);
future.complete(null);
}
}
receivedValue = true;
changeConsumer.fieldChanged(field, value);
future.complete(null);
}

void timeoutReached() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,25 @@ public void subscribeAndReceive() throws IllegalArgumentException, IllegalAccess

assertThat(future.isDone(), is(true));
}

@Test
public void ignoresInvalidEnum() throws IllegalArgumentException, IllegalAccessException {
final Attributes attributes = spy(new Attributes());

doAnswer(this::createSubscriberAnswer).when(attributes).createSubscriber(any(), any(), anyString(),
anyBoolean());

verify(connection, times(0)).subscribe(anyString(), any());

// Subscribe now to all fields
CompletableFuture<Void> future = attributes.subscribeAndReceive(connection, executor, "homie/device123",
fieldChangedObserver, 10);
assertThat(future.isDone(), is(true));

SubscribeFieldToMQTTtopic field = attributes.subscriptions.stream().filter(f -> f.field.getName() == "state")
.findFirst().get();
field.processMessage(field.topic, "garbage".getBytes());
verify(fieldChangedObserver, times(0)).attributeChanged(any(), any(), any(), any(), anyBoolean());
assertThat(attributes.state.toString(), is("unknown"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ protected void setInternalObjects(Device device, DelayedBatchProcessing<Object>

@Override
public void initialize() {
logger.debug("About to initialize Homie device {}", device.attributes.name);
config = getConfigAs(HandlerConfiguration.class);
logger.debug("About to initialize Homie device {}", config.deviceid);
if (config.deviceid.isEmpty()) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
return;
Expand All @@ -119,7 +119,7 @@ public void handleRemoval() {

@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
logger.debug("About to start Homie device {}", device.attributes.name);
logger.debug("About to start Homie device {}", config.deviceid);
if (connection.getQos() != 1) {
// QoS 1 is required.
logger.warn(
Expand All @@ -129,13 +129,13 @@ public void handleRemoval() {
return device.subscribe(connection, scheduler, attributeReceiveTimeout).thenCompose((Void v) -> {
return device.startChannels(connection, scheduler, attributeReceiveTimeout, this);
}).thenRun(() -> {
logger.debug("Homie device {} fully attached (start)", device.attributes.name);
logger.debug("Homie device {} fully attached (start)", config.deviceid);
});
}

@Override
protected void stop() {
logger.debug("About to stop Homie device {}", device.attributes.name);
logger.debug("About to stop Homie device {}", config.deviceid);
final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
if (heartBeatTimer != null) {
heartBeatTimer.cancel(false);
Expand Down Expand Up @@ -227,7 +227,7 @@ public void accept(@Nullable List<Object> t) {
final MqttBrokerConnection connection = this.connection;
if (connection != null) {
device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
logger.debug("Homie device {} fully attached (accept)", device.attributes.name);
logger.debug("Homie device {} fully attached (accept)", config.deviceid);
});
}
}
Expand Down

0 comments on commit 1629a49

Please sign in to comment.