-
Notifications
You must be signed in to change notification settings - Fork 382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix (azure-iot-device): use paho disconnect/reconnect to refresh sas token #576
Conversation
…to ReconnectStage
ed7db9f
to
123c045
Compare
@@ -217,7 +217,9 @@ def on_disconnect(client, userdata, rc): | |||
if not this: | |||
# Paho will sometimes call this after we've been garbage collected, If so, we have to | |||
# stop the loop to make sure the Paho thread shuts down. | |||
logger.info("disconnected after garbage collection. stopping loop") | |||
logger.info( | |||
"on_disconnect called with this=None. Transport must have been garbage collected. stopping loop" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this=None [](start = 47, length = 9)
Do you want to say this
specifically ? or say on_disconnect called with transport being None..
#Resolved
# connect delay is hardcoded for now. Later, this comes from a retry policy | ||
self.reconnect_delay = 10 | ||
self.delayed_reconnect_delay = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delayed_reconnect_delay [](start = 13, length = 23)
could you put some help docs over the two types of delay ? when do we choose one over the other ? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went back to one delay variable and hardcoded the other. There's no reason to ever adjust the immediate delay and it's only used in a single case, so it seems less confusing if we only have a single delay variable.
In reply to: 439658750 [](ancestors = 439658750,437135351)
@@ -992,60 +1015,52 @@ def _send_new_connect_op_down(self): | |||
def on_connect_complete(op, error): | |||
this = self_weakref() | |||
if this: | |||
logger.debug( | |||
"{}({}): on_connect_complete error={} state={} never_conencted={} connected={} ".format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never_conencted [](start = 67, length = 15)
spell #Resolved
self.state = ReconnectState.WAITING_TO_RECONNECT | ||
self._start_reconnect_timer() | ||
self._start_reconnect_timer(self.immediate_reconnect_delay) | ||
|
||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else [](start = 12, length = 4)
for an user disconnect the code path will turn up here right ? where the reconnect timer will not fire ? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before, but it's fatal if we've never conencted. | ||
WAITING_TO_RECONNECT: This stage is in a waiting period before reconnecting. This state implies | ||
that the user wants the pipeline to be connected. ie. After a successful connection, the | ||
state will change to LOGICALLY_CONNECED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGICALLY_CONNECED [](start = 25, length = 18)
spell #Resolved
) | ||
|
||
if self.pipeline_root.connected and self.state == ReconnectState.LOGICALLY_CONNECTED: | ||
# when we get disconnected, we try to immediatly reconnect. If that fails, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
immediatly [](start = 54, length = 10)
spell #Resolved
@pytest.mark.it( | ||
"If and only if connected and logically connected, changes the state to WAITING_TO_RECONNECT" | ||
) | ||
def test_changes_state(self, stage, event, state, currently_connected): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_changes_state [](start = 8, length = 18)
can u separate these tests for the currently_connected
part ?
i see that this if blocks aare there in all tests and i understand that you may not want to repeat the tests..so never mind my comment.. #Resolved
if currently_connected and state == pipeline_stages_base.ReconnectState.LOGICALLY_CONNECTED: | ||
if old_timer: | ||
assert old_timer.cancel.call_count == 1 | ||
assert stage.reconnect_timer != old_timer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!= [](start = 41, length = 3)
are we trying to assert that these 2 objects are different ? then we should probably use is not
for unequal identities. #Resolved
pipeline_stages_base.ReconnectState.CONNECTED_OR_DISCONNECTED, | ||
], | ||
@pytest.mark.it( | ||
"Changes the state to LOGICALLY_DISCONNECTED if the connection fails with an arbitrary error" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arbitrary [](start = 85, length = 9)
all these arbitrary errors are non transient errors right ? for me if the statement mentions non transient error it makes more sense.. #Resolved
@pytest.mark.it( | ||
"Completes all waiting ops with the transient failure if the connection fails with a transient error" | ||
"Completes all waiting ops with the transient failure if a first-time connection fails with a transient error" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transient error [](start = 102, length = 15)
for a first connection case ,it is not ONLY transient error right ? it includes transient and non transient error of all kinds ? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. The tests for permanent (arbitrary, non-transient) errors are above.
In reply to: 437760479 [](ancestors = 437760479)
@pytest.mark.it( | ||
"Does not create a reconnect timer if the connection fails with a transient error" | ||
"Does not create a reconnect timer if the connection fails with a transient error on a first-time connection" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transient error on a first-time connection [](start = 74, length = 42)
same comment as the first time connection case #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.mark.it( | ||
"Performs an MQTT reconnect via the MQTTTransport, using the pipeline root's SasToken as a password" | ||
) | ||
@pytest.mark.it("Performs an MQTT disconnect via the MQTTTransport") | ||
def test_mqtt_connect(self, mocker, stage, op): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_mqtt_connect [](start = 8, length = 17)
change test name #Resolved
@pytest.mark.it("Completes a pending ReauthorizeConnectionOperation successfully") | ||
@pytest.mark.it( | ||
"Ignores a pending ReathorizeConnectionOperation when the transport connected event fires" | ||
) | ||
def test_completes_pending_reconnect_op(self, mocker, stage): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_completes_pending_reconnect_op [](start = 8, length = 35)
does not complete right ? and pending_reauthorize_op
? #Resolved
) | ||
def test_cancels_watchdog_on_pending_reauthorize(self, mocker, stage, mock_timer): | ||
def test_does_not_cancelcel_watchdog_on_pending_reauthorize(self, mocker, stage, mock_timer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_does_not_cancelcel_watchdog_on_pending_reauthorize [](start = 8, length = 55)
spell cancel #Resolved
@@ -763,7 +716,9 @@ def test_completes_pending_connect_op(self, mocker, stage): | |||
assert op.error is None | |||
assert stage._pending_connection_op is None | |||
|
|||
@pytest.mark.it("Completes a pending ReauthorizeConnectionOperation successfully") | |||
@pytest.mark.it( | |||
"Ignores a pending ReathorizeConnectionOperation when the transport connected event fires" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignores [](start = 9, length = 7)
does not complete sounds more clear.. #Resolved
id="Pending ReauthorizeConnectionOperation", | ||
), | ||
], | ||
"Completes (unsuccessfully) a ConnectOoperation with a ConnectionDroppedError if no cause is provided for the disconnection" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConnectOoperation [](start = 38, length = 17)
spell #Resolved
id="Pending ReauthorizeConnectionOperation", | ||
), | ||
], | ||
"Completes (unsuccessfully) a ConnectOoperation with a ConnectionDroppedError if no cause is provided for the disconnection" | ||
) | ||
def test_comletes_with_connection_dropped_error_as_error_if_no_cause( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comletes [](start = 13, length = 8)
spell #Resolved
) | ||
def test_cancels_watchdog_on_pending_reauthorize(self, mocker, stage, mock_timer, cause): | ||
def test_does_not_cancel_watchdog_on_pending_reauthorize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_does_not_cancel_watchdog_on_pending_reauthorize [](start = 8, length = 52)
i understand what you want to test , but on looking at the code of _on_mqtt_disconnected
, regardless of what type of op it is we are always calling _cancel_connection_watchdog
, so when the test name of description becomes does not cancel
, it is not matching if we just read the code. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For disconnect and reauthorize, there is no connection watchdog, so there's nothing to cancel. The call to _cancle_connection_watchdog
just doesn't do anything in this case. I changed the test name to be a little more clear and, unfortunately, also a little big more awkward.
In reply to: 437786906 [](ancestors = 437786906)
WAITING_TO_RECONNECT = "WAITING_TO_RECONNECT" | ||
CONNECTED_OR_DISCONNECTED = "CONNECTED_OR_DISCONNECTED" | ||
LOGICALLY_CONNECTED = "LOGICALLY_CONNECTED" | ||
LOGICALLY_DISCONNECTED = "LOGICALLY_DISCONNECTED" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big improvement here on the names #Resolved
self.state = ReconnectState.LOGICALLY_CONNECTED | ||
# We don't send this op down. Instead, we send a new connect op down. This way, | ||
# we can distinguish between connect ops that we're handling (they go into the | ||
# queue) and connect ops that we are sending down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this indicates perhaps there should be a concrete separate op so this isn't just by convention. Something to consider going forth #WontFix
self.state = ReconnectState.WAITING_TO_RECONNECT | ||
self._start_reconnect_timer() | ||
self._start_reconnect_timer(self.immediate_reconnect_delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.immediate_reconnect_delay [](start = 44, length = 30)
Why not just directly initiate the action rather than starting a timer for 0.01 seconds that then triggers the action?
I assume it's to do with code readability and cleanliness? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In short, 2 reasons:
- When this is called, we are still "disconnecting" and not "completely disconnected", so it's better to let this event flow up before we try again.
- When I tried to call it immediately, the volume of test cases exploded. I'm normally against changing code to make testing easier, but this is a situation where the tests showed me that calling connect here had more implications than you would think.
In reply to: 439685542 [](ancestors = 439685542)
@@ -947,17 +960,18 @@ def _run_op(self, op): | |||
self.name, op.name, self.state | |||
) | |||
) | |||
self.state = ReconnectState.LOGICALLY_DISCONNECTED | |||
self._clear_reconnect_timer() | |||
self._complete_waiting_connect_ops( | |||
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked") [](start = 20, length = 69)
Unrelated to the rest of this PR I guess, but I don't get what's going on here, and why this gets set as a waiting connect op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you selected the wrong line here. This code (DisconnectOperation, WAITING_TO_RECONNECT) covers the case where the connection was dropped and it's waiting to retry when the user calls disconnect(). In that case, any operations that were waiting for the reconnect to succeed need to be cancelled. By calling disconnect(), the user was effectively cancelling all operations that were pending.
In reply to: 439686461 [](ancestors = 439686461)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure because the line you selected is the one that completes (cancels) all of the waiting connection ops.
In reply to: 440422365 [](ancestors = 440422365,439686461)
) | ||
# all others are fatal | ||
this.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
this._clear_reconnect_timer() | ||
this._complete_waiting_connect_ops(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this._complete_waiting_connect_ops(error) [](start = 24, length = 41)
This seems like it would include, as mentioned in a comment above, OperationCancelled operations, which represent a disconnect somehow.
So, reaching a disconnect state by connection failure causes the disconnect operation to fail?
I think I'm missing something here though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to my previous response. The DisconnectOperation is never set as a waiting_connectoin_op. The disconnect causes waiting ops to be cancelled, but it's never a waiting op itself.
In reply to: 439687473 [](ancestors = 439687473)
@@ -937,6 +942,14 @@ def _run_op(self, op): | |||
self.name, op.name, self.state | |||
) | |||
) | |||
self.state = ReconnectState.LOGICALLY_CONNECTED | |||
# We don't send this op down. Instead, we send a new connect op down. This way, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we already were Logically Connected? Should we perhaps have an additional conditional block to prevent us from trying to connect when we already are (logically) connected? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this train of thought, but it's taking me in a different direction. For now, ConnectionLockStage is above this one, so we only get a ConnectOperation here if we're physically disconnected.
In the longer term, your comment somewhere else in here makes me wonder if we want to have different types of connect ops:
PhysicalConnectOp - for calling into transport
LogicalConnectOp - for setting LogicallyConnected
GuaranteedPhysicalConnectOp - physical connect op with retry
or something like that. Myabe PhysicalConnectOp is guaranteed and NonGuaranteedConnectOp is the one that isn't guaranteed. I'm not sure.
Either way, that's rambing and not well thouht out. The code in here is good for now.
In reply to: 439846074 [](ancestors = 439846074)
self._clear_reconnect_timer() | ||
self._complete_waiting_connect_ops( | ||
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked") | ||
) | ||
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED | ||
op.complete() | ||
|
||
else: | ||
logger.info( | ||
"{}({}): State is {}. Sending op down.".format(self.name, op.name, self.state) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here (and in the above blocks as well) we log the old state right before setting a new state. That's probably going to really confuse up the log statements, I assume it would be better for us to log AFTER adjusting the state #Resolved
|
||
|
||
class ReconnectStage(PipelineStage): | ||
def __init__(self): | ||
super(ReconnectStage, self).__init__() | ||
self.reconnect_timer = None | ||
self.state = ReconnectState.NEVER_CONNECTED | ||
self.state = ReconnectState.LOGICALLY_DISCONNECTED | ||
self.never_connected = True | ||
# connect delay is hardcoded for now. Later, this comes from a retry policy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also would like some doc surrounding this value, and it's significance. Clearly it is quite important to the logic, but just from the code here I can't tell why we have to care if something was never connected. I understand that a failure on the first connect is fatal (or so it says below), but the "why" isn't really captured anywhere here. #Closed
) | ||
) | ||
if this.never_connected: | ||
# any error on a first connection is fatal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fatal [](start = 61, length = 5)
Is it? It doesn't look fatal. The client doesn't need to be scrapped as far as I can tell. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"fatal" is maybe a bad error. I changed it to "transient error" or "permanent error". I think this is better.
In reply to: 439846621 [](ancestors = 439846621)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for helping us improve the Azure IoT Python SDK!
Need Support
Here's a little checklist of things that will help it make its way to the repository: Note that you don't have to check all the boxes, we can help you with that.
This being said, the more you do, the quicker it'll go through our gated build!
-->
Checklist
devdoc
folder and added or modified requirements.Reference/Link to the issue solved with this PR (if any)
Description of the problem
Please be as precise as possible: what issue you experienced, how often...
Description of the solution
How you solved the issue and the other things you considered and maybe rejected