Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (azure-iot-device): use paho disconnect/reconnect to refresh sas token #576

Merged
merged 4 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 3 additions & 32 deletions azure-iot-device/azure/iot/device/common/mqtt_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ def on_disconnect(client, userdata, rc):
if not this:
# Paho will sometimes call this after we've been garbage collected, If so, we have to
# stop the loop to make sure the Paho thread shuts down.
logger.info("disconnected after garbage collection. stopping loop")
logger.info(
"on_disconnect called with transport==None. Transport must have been garbage collected. stopping loop"
)
client.loop_stop()
else:
if this.on_mqtt_disconnected_handler:
Expand Down Expand Up @@ -428,37 +430,6 @@ def connect(self, password=None):
raise _create_error_from_rc_code(rc)
self._mqtt_client.loop_start()

def reauthorize_connection(self, password=None):
"""
Reauthorize with the MQTT broker, using username set at instantiation.

Connect should have previously been called in order to use this function.

The password is not required if the transport was instantiated with an x509 certificate.

:param str password: The password for reauthorizing with the MQTT broker (Optional).

:raises: ConnectionFailedError if connection could not be established.
:raises: ConnectionDroppedError if connection is dropped during execution.
:raises: UnauthorizedError if there is an error authenticating.
:raises: ProtocolClientError if there is some other client error.
"""
logger.info("reauthorizing MQTT client")
self._mqtt_client.username_pw_set(username=self._username, password=password)
try:
rc = self._mqtt_client.reconnect()
except Exception as e:
logger.info("reconnect raised {}".format(e))
self._cleanup_transport_on_error()
raise exceptions.ConnectionDroppedError(
message="Unexpected Paho failure during reconnect", cause=e
)
logger.debug("_mqtt_client.reconnect returned rc={}".format(rc))
if rc:
# This could result in ConnectionFailedError, ConnectionDroppedError, UnauthorizedError
# or ProtocolClientError
raise _create_error_from_rc_code(rc)

def disconnect(self):
"""
Disconnect from the MQTT broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,26 +897,46 @@ class ReconnectState(object):
"""
Class which holds reconenct states as class variables. Created to make code that reads like an enum without using an enum.

NEVER_CONNECTED: Ttransport has never been conencted. This state is necessary because some errors might be fatal or transient,
depending on wether the transport has been connceted. For example, a failed conenction is a transient error if we've connected
before, but it's fatal if we've never conencted.

WAITING_TO_RECONNECT: This stage is in a waiting period before reconnecting.

CONNECTED_OR_DISCONNECTED: The transport is either connected or disconencted. This stage doesn't really care which one, so
it doesn't keep track.
WAITING_TO_RECONNECT: This stage is in a waiting period before reconnecting. This state implies
that the user wants the pipeline to be connected. ie. After a successful connection, the
state will change to LOGICALLY_CONNECTED

LOGICALLY_CONNECTED: The client wants the pipeline to be connected. This state is independent
of the actual connection state since the pipeline could be logically connected but
physically disconnected (this is a temporary condition though. If we're logically connected
and physically disconnected, then we should be waiting to reconnect.

LOGICALLY_DISCONNECTED: The client does not want the pipeline to be connected or the pipeline had
a permanent errors error and was forced to disconnect. If the state is LOGICALLY_DISCONNECTED, then the pipeline
should be physically disconnected since there is no reason to leave the pipeline connected in this state.
"""

NEVER_CONNECTED = "NEVER_CONNECTED"
WAITING_TO_RECONNECT = "WAITING_TO_RECONNECT"
CONNECTED_OR_DISCONNECTED = "CONNECTED_OR_DISCONNECTED"
LOGICALLY_CONNECTED = "LOGICALLY_CONNECTED"
LOGICALLY_DISCONNECTED = "LOGICALLY_DISCONNECTED"
Copy link
Member

@cartertinney cartertinney Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big improvement here on the names #Resolved



class ReconnectStage(PipelineStage):
def __init__(self):
super(ReconnectStage, self).__init__()
self.reconnect_timer = None
self.state = ReconnectState.NEVER_CONNECTED
self.state = ReconnectState.LOGICALLY_DISCONNECTED
# never_connected is important because some errors are handled differently the frist time
# that we're connecting versus later connections.
#
# For example, if we get a "host not found" the first time we connect, it might mean:
# 1. The connection string is wrong, in which case we don't want to retry
# or it might mean:
# 2. The connection string is correct and the host is just not available (network glitch?)
#
# If never_connected=True, we don't know if it's a permanent error (#1) or if it's
# temporary (#2), so we take the conservative approach and assume it's a permanent error.
#
# If never_connected=False, we know that we've connected before, so the credentials were
# valid at some point in the recent past. We still don't know with certainty if it's
# a temporary error or if it's permanent (the hub may have been deallocated), but it has
# worked in the past, so we assume it's a temporary error.
self.never_connected = True
# connect delay is hardcoded for now. Later, this comes from a retry policy
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also would like some doc surrounding this value, and it's significance. Clearly it is quite important to the logic, but just from the code here I can't tell why we have to care if something was never connected. I understand that a failure on the first connect is fatal (or so it says below), but the "why" isn't really captured anywhere here. #Closed

self.reconnect_delay = 10
self.waiting_connect_ops = []
Expand All @@ -933,31 +953,42 @@ def _run_op(self, op):
self.waiting_connect_ops.append(op)
else:
logger.info(
"{}({}): State is {}. Adding to wait list and sending new connect op down".format(
"{}({}): State changes {}->LOGICALLY_CONNECTED. Adding to wait list and sending new connect op down".format(
self.name, op.name, self.state
)
)
self.state = ReconnectState.LOGICALLY_CONNECTED
# We don't send this op down. Instead, we send a new connect op down. This way,
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we already were Logically Connected? Should we perhaps have an additional conditional block to prevent us from trying to connect when we already are (logically) connected? #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this train of thought, but it's taking me in a different direction. For now, ConnectionLockStage is above this one, so we only get a ConnectOperation here if we're physically disconnected.
In the longer term, your comment somewhere else in here makes me wonder if we want to have different types of connect ops:
PhysicalConnectOp - for calling into transport
LogicalConnectOp - for setting LogicallyConnected
GuaranteedPhysicalConnectOp - physical connect op with retry

or something like that. Myabe PhysicalConnectOp is guaranteed and NonGuaranteedConnectOp is the one that isn't guaranteed. I'm not sure.

Either way, that's rambing and not well thouht out. The code in here is good for now.


In reply to: 439846074 [](ancestors = 439846074)

# we can distinguish between connect ops that we're handling (they go into the
# queue) and connect ops that we are sending down.
Copy link
Member

@cartertinney cartertinney Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this indicates perhaps there should be a concrete separate op so this isn't just by convention. Something to consider going forth #WontFix

#
# Once we finally connect, we only have to complete the ops in the queue and we
# never have to worry about completing the op that we sent down. The code is much
# cleaner this way, especially when you take retries into account, trust me.
self.waiting_connect_ops.append(op)
self._send_new_connect_op_down()

elif isinstance(op, pipeline_ops_base.DisconnectOperation):
if self.state == ReconnectState.WAITING_TO_RECONNECT:
logger.info(
"{}({}): State is {}. Canceling waiting ops and sending disconnect down.".format(
"{}({}): State changes {}->LOGICALLY_DISCONNECTED. Canceling waiting ops and sending disconnect down.".format(
self.name, op.name, self.state
)
)
self.state = ReconnectState.LOGICALLY_DISCONNECTED
self._clear_reconnect_timer()
self._complete_waiting_connect_ops(
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline_exceptions.OperationCancelled("Explicit disconnect invoked") [](start = 20, length = 69)

Unrelated to the rest of this PR I guess, but I don't get what's going on here, and why this gets set as a waiting connect op

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you selected the wrong line here. This code (DisconnectOperation, WAITING_TO_RECONNECT) covers the case where the connection was dropped and it's waiting to retry when the user calls disconnect(). In that case, any operations that were waiting for the reconnect to succeed need to be cancelled. By calling disconnect(), the user was effectively cancelling all operations that were pending.


In reply to: 439686461 [](ancestors = 439686461)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure because the line you selected is the one that completes (cancels) all of the waiting connection ops.


In reply to: 440422365 [](ancestors = 440422365,439686461)

)
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED
op.complete()

else:
logger.info(
"{}({}): State is {}. Sending op down.".format(self.name, op.name, self.state)
"{}({}): State changes {}->LOGICALLY_DISCONNECTED. Sending op down.".format(
self.name, op.name, self.state
)
)
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here (and in the above blocks as well) we log the old state right before setting a new state. That's probably going to really confuse up the log statements, I assume it would be better for us to log AFTER adjusting the state #Resolved

self.state = ReconnectState.LOGICALLY_DISCONNECTED
self.send_op_down(op)

else:
Expand All @@ -966,18 +997,31 @@ def _run_op(self, op):
@pipeline_thread.runs_on_pipeline_thread
def _handle_pipeline_event(self, event):
if isinstance(event, pipeline_events_base.DisconnectedEvent):
if self.pipeline_root.connected:
logger.info(
"{}({}): State is {}. Triggering reconnect timer".format(
self.name, event.name, self.state
)
logger.debug(
"{}({}): State is {} Connected is {}.".format(
self.name, event.name, self.state, self.pipeline_root.connected
)
)

if self.pipeline_root.connected and self.state == ReconnectState.LOGICALLY_CONNECTED:
# When we get disconnected, we try to reconnect as soon as we can. We don't want
# to reconnect right here bcause we're in a callback in the middle of being
# disconnected and we want things to "settle down" a bit before reconnecting.
#
# We also use a timer to reconnect here because the "reconnect timer expired"
# code path is well tested. If we tried to immediately reconnect here, there
# would be an entire set of scenarios that would need to be tested for
# this case, and these tests would be idential to the "reconnect timer expired"
# tests. Likewise, if there were 2 reconnect code paths (one immediate and one
# delayed), then both those paths would need to be maintained as separate
# flows
self.state = ReconnectState.WAITING_TO_RECONNECT
self._start_reconnect_timer()
self._start_reconnect_timer(0.01)

else:
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else [](start = 12, length = 4)

for an user disconnect the code path will turn up here right ? where the reconnect timer will not fire ? #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrrect


In reply to: 437156650 [](ancestors = 437156650)

logger.info(
"{}({}): State is {}. Doing nothing".format(self.name, event.name, self.state)
)
# If user manually disconnected, ReconnectState will be LOGICALLY_DISCONNECTED, and
# no reconnect timer will be created.
pass

self.send_event_up(event)

Expand All @@ -992,60 +1036,52 @@ def _send_new_connect_op_down(self):
def on_connect_complete(op, error):
this = self_weakref()
if this:
logger.debug(
"{}({}): on_connect_complete error={} state={} never_connected={} connected={} ".format(
this.name,
op.name,
error,
this.state,
this.never_connected,
this.pipeline_root.connected,
)
)
if error:
if this.state == ReconnectState.NEVER_CONNECTED:
logger.info(
"{}({}): error on first connection. Not triggering reconnection".format(
this.name, op.name
)
)
if this.never_connected:
# any error on a first connection is assumed to e permanent error
this.state = ReconnectState.LOGICALLY_DISCONNECTED
this._clear_reconnect_timer()
this._complete_waiting_connect_ops(error)
elif type(error) in transient_connect_errors:
logger.info(
"{}({}): State is {}. Connect failed with transient error. Triggering reconnect timer".format(
self.name, op.name, self.state
)
)
# transient errors cause a reconnect attempt
self.state = ReconnectState.WAITING_TO_RECONNECT
self._start_reconnect_timer()

elif this.state == ReconnectState.WAITING_TO_RECONNECT:
logger.info(
"{}({}): non-tranient error. Failing all waiting ops.n".format(
this.name, op.name
)
)
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED
self._clear_reconnect_timer()
this._complete_waiting_connect_ops(error)

self._start_reconnect_timer(self.reconnect_delay)
else:
logger.info(
"{}({}): State is {}. Connection failed. Not triggering reconnection".format(
this.name, op.name, this.state
)
)
# all others are permanent errors
this.state = ReconnectState.LOGICALLY_DISCONNECTED
this._clear_reconnect_timer()
this._complete_waiting_connect_ops(error)
Copy link
Member

@cartertinney cartertinney Jun 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this._complete_waiting_connect_ops(error) [](start = 24, length = 41)

This seems like it would include, as mentioned in a comment above, OperationCancelled operations, which represent a disconnect somehow.

So, reaching a disconnect state by connection failure causes the disconnect operation to fail?

I think I'm missing something here though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my previous response. The DisconnectOperation is never set as a waiting_connectoin_op. The disconnect causes waiting ops to be cancelled, but it's never a waiting op itself.


In reply to: 439687473 [](ancestors = 439687473)

else:
logger.info(
"{}({}): State is {}. Connection succeeded".format(
this.name, op.name, this.state
)
)
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED
self._clear_reconnect_timer()
self._complete_waiting_connect_ops()
# successfully connected
this.never_connected = False
this.state = ReconnectState.LOGICALLY_CONNECTED
this._clear_reconnect_timer()
this._complete_waiting_connect_ops()

logger.info("{}: sending new connect op down".format(self.name))
logger.debug("{}: sending new connect op down".format(self.name))
op = pipeline_ops_base.ConnectOperation(callback=on_connect_complete)
self.send_op_down(op)

@pipeline_thread.runs_on_pipeline_thread
def _start_reconnect_timer(self):
def _start_reconnect_timer(self, delay):
"""
Set a timer to reconnect after some period of time
"""
logger.info("{}: State is {}. Starting reconnect timer".format(self.name, self.state))
logger.debug(
"{}: State is {}. Connected={} Starting reconnect timer".format(
self.name, self.state, self.pipeline_root.connected
)
)

self._clear_reconnect_timer()

Expand All @@ -1054,23 +1090,22 @@ def _start_reconnect_timer(self):
@pipeline_thread.invoke_on_pipeline_thread_nowait
def on_reconnect_timer_expired():
this = self_weakref()
this.reconnect_timer = None
if this.state == ReconnectState.WAITING_TO_RECONNECT:
logger.info(
"{}: State is {}. Reconnect timer expired. Sending connect op down".format(
this.name, this.state
)
logger.debug(
"{}: Reconnect timer expired. State is {} Connected is {}.".format(
self.name, self.state, self.pipeline_root.connected
)
this.state = ReconnectState.CONNECTED_OR_DISCONNECTED
)

this.reconnect_timer = None
if (
this.state == ReconnectState.WAITING_TO_RECONNECT
and not self.pipeline_root.connected
):
# if we're waiting to reconnect and not connected, we try again
this.state = ReconnectState.LOGICALLY_CONNECTED
this._send_new_connect_op_down()
else:
logger.info(
"{}: State is {}. Reconnect timer expired. Doing nothing".format(
this.name, this.state
)
)

self.reconnect_timer = threading.Timer(self.reconnect_delay, on_reconnect_timer_expired)
self.reconnect_timer = threading.Timer(delay, on_reconnect_timer_expired)
self.reconnect_timer.start()

@pipeline_thread.runs_on_pipeline_thread
Expand Down
Loading