-
Notifications
You must be signed in to change notification settings - Fork 382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix (azure-iot-device): use paho disconnect/reconnect to refresh sas token #576
Changes from 2 commits
123c045
c0af01e
fe4398f
3575a55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -897,26 +897,46 @@ class ReconnectState(object): | |
""" | ||
Class which holds reconenct states as class variables. Created to make code that reads like an enum without using an enum. | ||
|
||
NEVER_CONNECTED: Ttransport has never been conencted. This state is necessary because some errors might be fatal or transient, | ||
depending on wether the transport has been connceted. For example, a failed conenction is a transient error if we've connected | ||
before, but it's fatal if we've never conencted. | ||
|
||
WAITING_TO_RECONNECT: This stage is in a waiting period before reconnecting. | ||
|
||
CONNECTED_OR_DISCONNECTED: The transport is either connected or disconencted. This stage doesn't really care which one, so | ||
it doesn't keep track. | ||
WAITING_TO_RECONNECT: This stage is in a waiting period before reconnecting. This state implies | ||
that the user wants the pipeline to be connected. ie. After a successful connection, the | ||
state will change to LOGICALLY_CONNECTED | ||
|
||
LOGICALLY_CONNECTED: The client wants the pipeline to be connected. This state is independent | ||
of the actual connection state since the pipeline could be logically connected but | ||
physically disconnected (this is a temporary condition though. If we're logically connected | ||
and physically disconnected, then we should be waiting to reconnect. | ||
|
||
LOGICALLY_DISCONNECTED: The client does not want the pipeline to be connected or the pipeline had | ||
a permanent errors error and was forced to disconnect. If the state is LOGICALLY_DISCONNECTED, then the pipeline | ||
should be physically disconnected since there is no reason to leave the pipeline connected in this state. | ||
""" | ||
|
||
NEVER_CONNECTED = "NEVER_CONNECTED" | ||
WAITING_TO_RECONNECT = "WAITING_TO_RECONNECT" | ||
CONNECTED_OR_DISCONNECTED = "CONNECTED_OR_DISCONNECTED" | ||
LOGICALLY_CONNECTED = "LOGICALLY_CONNECTED" | ||
LOGICALLY_DISCONNECTED = "LOGICALLY_DISCONNECTED" | ||
|
||
|
||
class ReconnectStage(PipelineStage): | ||
def __init__(self): | ||
super(ReconnectStage, self).__init__() | ||
self.reconnect_timer = None | ||
self.state = ReconnectState.NEVER_CONNECTED | ||
self.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
# never_connected is important because some errors are handled differently the frist time | ||
# that we're connecting versus later connections. | ||
# | ||
# For example, if we get a "host not found" the first time we connect, it might mean: | ||
# 1. The connection string is wrong, in which case we don't want to retry | ||
# or it might mean: | ||
# 2. The connection string is correct and the host is just not available (network glitch?) | ||
# | ||
# If never_connected=True, we don't know if it's a permanent error (#1) or if it's | ||
# temporary (#2), so we take the conservative approach and assume it's a permanent error. | ||
# | ||
# If never_connected=False, we know that we've connected before, so the credentials were | ||
# valid at some point in the recent past. We still don't know with certainty if it's | ||
# a temporary error or if it's permanent (the hub may have been deallocated), but it has | ||
# worked in the past, so we assume it's a temporary error. | ||
self.never_connected = True | ||
# connect delay is hardcoded for now. Later, this comes from a retry policy | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also would like some doc surrounding this value, and it's significance. Clearly it is quite important to the logic, but just from the code here I can't tell why we have to care if something was never connected. I understand that a failure on the first connect is fatal (or so it says below), but the "why" isn't really captured anywhere here. #Closed |
||
self.reconnect_delay = 10 | ||
self.waiting_connect_ops = [] | ||
|
@@ -933,31 +953,42 @@ def _run_op(self, op): | |
self.waiting_connect_ops.append(op) | ||
else: | ||
logger.info( | ||
"{}({}): State is {}. Adding to wait list and sending new connect op down".format( | ||
"{}({}): State changes {}->LOGICALLY_CONNECTED. Adding to wait list and sending new connect op down".format( | ||
self.name, op.name, self.state | ||
) | ||
) | ||
self.state = ReconnectState.LOGICALLY_CONNECTED | ||
# We don't send this op down. Instead, we send a new connect op down. This way, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we already were Logically Connected? Should we perhaps have an additional conditional block to prevent us from trying to connect when we already are (logically) connected? #Resolved There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this train of thought, but it's taking me in a different direction. For now, ConnectionLockStage is above this one, so we only get a ConnectOperation here if we're physically disconnected. or something like that. Myabe PhysicalConnectOp is guaranteed and NonGuaranteedConnectOp is the one that isn't guaranteed. I'm not sure. Either way, that's rambing and not well thouht out. The code in here is good for now. In reply to: 439846074 [](ancestors = 439846074) |
||
# we can distinguish between connect ops that we're handling (they go into the | ||
# queue) and connect ops that we are sending down. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this indicates perhaps there should be a concrete separate op so this isn't just by convention. Something to consider going forth #WontFix |
||
# | ||
# Once we finally connect, we only have to complete the ops in the queue and we | ||
# never have to worry about completing the op that we sent down. The code is much | ||
# cleaner this way, especially when you take retries into account, trust me. | ||
self.waiting_connect_ops.append(op) | ||
self._send_new_connect_op_down() | ||
|
||
elif isinstance(op, pipeline_ops_base.DisconnectOperation): | ||
if self.state == ReconnectState.WAITING_TO_RECONNECT: | ||
logger.info( | ||
"{}({}): State is {}. Canceling waiting ops and sending disconnect down.".format( | ||
"{}({}): State changes {}->LOGICALLY_DISCONNECTED. Canceling waiting ops and sending disconnect down.".format( | ||
self.name, op.name, self.state | ||
) | ||
) | ||
self.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
self._clear_reconnect_timer() | ||
self._complete_waiting_connect_ops( | ||
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Unrelated to the rest of this PR I guess, but I don't get what's going on here, and why this gets set as a waiting connect op There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if you selected the wrong line here. This code (DisconnectOperation, WAITING_TO_RECONNECT) covers the case where the connection was dropped and it's waiting to retry when the user calls disconnect(). In that case, any operations that were waiting for the reconnect to succeed need to be cancelled. By calling disconnect(), the user was effectively cancelling all operations that were pending. In reply to: 439686461 [](ancestors = 439686461) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure because the line you selected is the one that completes (cancels) all of the waiting connection ops. In reply to: 440422365 [](ancestors = 440422365,439686461) |
||
) | ||
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED | ||
op.complete() | ||
|
||
else: | ||
logger.info( | ||
"{}({}): State is {}. Sending op down.".format(self.name, op.name, self.state) | ||
"{}({}): State changes {}->LOGICALLY_DISCONNECTED. Sending op down.".format( | ||
self.name, op.name, self.state | ||
) | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here (and in the above blocks as well) we log the old state right before setting a new state. That's probably going to really confuse up the log statements, I assume it would be better for us to log AFTER adjusting the state #Resolved |
||
self.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
self.send_op_down(op) | ||
|
||
else: | ||
|
@@ -966,18 +997,31 @@ def _run_op(self, op): | |
@pipeline_thread.runs_on_pipeline_thread | ||
def _handle_pipeline_event(self, event): | ||
if isinstance(event, pipeline_events_base.DisconnectedEvent): | ||
if self.pipeline_root.connected: | ||
logger.info( | ||
"{}({}): State is {}. Triggering reconnect timer".format( | ||
self.name, event.name, self.state | ||
) | ||
logger.debug( | ||
"{}({}): State is {} Connected is {}.".format( | ||
self.name, event.name, self.state, self.pipeline_root.connected | ||
) | ||
) | ||
|
||
if self.pipeline_root.connected and self.state == ReconnectState.LOGICALLY_CONNECTED: | ||
# When we get disconnected, we try to reconnect as soon as we can. We don't want | ||
# to reconnect right here bcause we're in a callback in the middle of being | ||
# disconnected and we want things to "settle down" a bit before reconnecting. | ||
# | ||
# We also use a timer to reconnect here because the "reconnect timer expired" | ||
# code path is well tested. If we tried to immediately reconnect here, there | ||
# would be an entire set of scenarios that would need to be tested for | ||
# this case, and these tests would be idential to the "reconnect timer expired" | ||
# tests. Likewise, if there were 2 reconnect code paths (one immediate and one | ||
# delayed), then both those paths would need to be maintained as separate | ||
# flows | ||
self.state = ReconnectState.WAITING_TO_RECONNECT | ||
self._start_reconnect_timer() | ||
self._start_reconnect_timer(0.01) | ||
|
||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
for an user disconnect the code path will turn up here right ? where the reconnect timer will not fire ? #Resolved There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
logger.info( | ||
"{}({}): State is {}. Doing nothing".format(self.name, event.name, self.state) | ||
) | ||
# If user manually disconnected, ReconnectState will be LOGICALLY_DISCONNECTED, and | ||
# no reconnect timer will be created. | ||
pass | ||
|
||
self.send_event_up(event) | ||
|
||
|
@@ -992,60 +1036,52 @@ def _send_new_connect_op_down(self): | |
def on_connect_complete(op, error): | ||
this = self_weakref() | ||
if this: | ||
logger.debug( | ||
"{}({}): on_connect_complete error={} state={} never_connected={} connected={} ".format( | ||
this.name, | ||
op.name, | ||
error, | ||
this.state, | ||
this.never_connected, | ||
this.pipeline_root.connected, | ||
) | ||
) | ||
if error: | ||
if this.state == ReconnectState.NEVER_CONNECTED: | ||
logger.info( | ||
"{}({}): error on first connection. Not triggering reconnection".format( | ||
this.name, op.name | ||
) | ||
) | ||
if this.never_connected: | ||
# any error on a first connection is assumed to e permanent error | ||
this.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
this._clear_reconnect_timer() | ||
this._complete_waiting_connect_ops(error) | ||
elif type(error) in transient_connect_errors: | ||
logger.info( | ||
"{}({}): State is {}. Connect failed with transient error. Triggering reconnect timer".format( | ||
self.name, op.name, self.state | ||
) | ||
) | ||
# transient errors cause a reconnect attempt | ||
self.state = ReconnectState.WAITING_TO_RECONNECT | ||
self._start_reconnect_timer() | ||
|
||
elif this.state == ReconnectState.WAITING_TO_RECONNECT: | ||
logger.info( | ||
"{}({}): non-tranient error. Failing all waiting ops.n".format( | ||
this.name, op.name | ||
) | ||
) | ||
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED | ||
self._clear_reconnect_timer() | ||
this._complete_waiting_connect_ops(error) | ||
|
||
self._start_reconnect_timer(self.reconnect_delay) | ||
else: | ||
logger.info( | ||
"{}({}): State is {}. Connection failed. Not triggering reconnection".format( | ||
this.name, op.name, this.state | ||
) | ||
) | ||
# all others are permanent errors | ||
this.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
this._clear_reconnect_timer() | ||
this._complete_waiting_connect_ops(error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This seems like it would include, as mentioned in a comment above, OperationCancelled operations, which represent a disconnect somehow. So, reaching a disconnect state by connection failure causes the disconnect operation to fail? I think I'm missing something here though There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related to my previous response. The DisconnectOperation is never set as a waiting_connectoin_op. The disconnect causes waiting ops to be cancelled, but it's never a waiting op itself. In reply to: 439687473 [](ancestors = 439687473) |
||
else: | ||
logger.info( | ||
"{}({}): State is {}. Connection succeeded".format( | ||
this.name, op.name, this.state | ||
) | ||
) | ||
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED | ||
self._clear_reconnect_timer() | ||
self._complete_waiting_connect_ops() | ||
# successfully connected | ||
this.never_connected = False | ||
this.state = ReconnectState.LOGICALLY_CONNECTED | ||
this._clear_reconnect_timer() | ||
this._complete_waiting_connect_ops() | ||
|
||
logger.info("{}: sending new connect op down".format(self.name)) | ||
logger.debug("{}: sending new connect op down".format(self.name)) | ||
op = pipeline_ops_base.ConnectOperation(callback=on_connect_complete) | ||
self.send_op_down(op) | ||
|
||
@pipeline_thread.runs_on_pipeline_thread | ||
def _start_reconnect_timer(self): | ||
def _start_reconnect_timer(self, delay): | ||
""" | ||
Set a timer to reconnect after some period of time | ||
""" | ||
logger.info("{}: State is {}. Starting reconnect timer".format(self.name, self.state)) | ||
logger.debug( | ||
"{}: State is {}. Connected={} Starting reconnect timer".format( | ||
self.name, self.state, self.pipeline_root.connected | ||
) | ||
) | ||
|
||
self._clear_reconnect_timer() | ||
|
||
|
@@ -1054,23 +1090,22 @@ def _start_reconnect_timer(self): | |
@pipeline_thread.invoke_on_pipeline_thread_nowait | ||
def on_reconnect_timer_expired(): | ||
this = self_weakref() | ||
this.reconnect_timer = None | ||
if this.state == ReconnectState.WAITING_TO_RECONNECT: | ||
logger.info( | ||
"{}: State is {}. Reconnect timer expired. Sending connect op down".format( | ||
this.name, this.state | ||
) | ||
logger.debug( | ||
"{}: Reconnect timer expired. State is {} Connected is {}.".format( | ||
self.name, self.state, self.pipeline_root.connected | ||
) | ||
this.state = ReconnectState.CONNECTED_OR_DISCONNECTED | ||
) | ||
|
||
this.reconnect_timer = None | ||
if ( | ||
this.state == ReconnectState.WAITING_TO_RECONNECT | ||
and not self.pipeline_root.connected | ||
): | ||
# if we're waiting to reconnect and not connected, we try again | ||
this.state = ReconnectState.LOGICALLY_CONNECTED | ||
this._send_new_connect_op_down() | ||
else: | ||
logger.info( | ||
"{}: State is {}. Reconnect timer expired. Doing nothing".format( | ||
this.name, this.state | ||
) | ||
) | ||
|
||
self.reconnect_timer = threading.Timer(self.reconnect_delay, on_reconnect_timer_expired) | ||
self.reconnect_timer = threading.Timer(delay, on_reconnect_timer_expired) | ||
self.reconnect_timer.start() | ||
|
||
@pipeline_thread.runs_on_pipeline_thread | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big improvement here on the names #Resolved