Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (azure-iot-device): use paho disconnect/reconnect to refresh sas token #576

Merged
merged 4 commits into from
Jun 16, 2020

Conversation

BertKleewein
Copy link
Member

Thank you for helping us improve the Azure IoT Python SDK!

Need Support

  • Have a feature request for SDKs? Please post it on User Voice to help us prioritize
  • Have a technical question? Ask on Stack Overflow with tag "azure-iot-hub"
  • Need Support? Every customer with an active Azure subscription has access to support with guaranteed response time. Consider submitting a ticket and get assistance from Microsoft support team
  • Found a bug? Please help us fix it by thoroughly documenting it and filing an issue on GitHub (See below).

Here's a little checklist of things that will help it make its way to the repository: Note that you don't have to check all the boxes, we can help you with that.
This being said, the more you do, the quicker it'll go through our gated build!
-->

Checklist

  • I have read the contribution guidelines.
  • I added or modified the existing tests to cover the change (we do not allow our test coverage to go down).
  • If this is a modification that impacts the behavior of a public API
    • I edited the corresponding document in the devdoc folder and added or modified requirements.

Reference/Link to the issue solved with this PR (if any)

Description of the problem

Please be as precise as possible: what issue you experienced, how often...

Description of the solution

How you solved the issue and the other things you considered and maybe rejected

@@ -217,7 +217,9 @@ def on_disconnect(client, userdata, rc):
if not this:
# Paho will sometimes call this after we've been garbage collected, If so, we have to
# stop the loop to make sure the Paho thread shuts down.
logger.info("disconnected after garbage collection. stopping loop")
logger.info(
"on_disconnect called with this=None. Transport must have been garbage collected. stopping loop"
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this=None [](start = 47, length = 9)

Do you want to say this specifically ? or say on_disconnect called with transport being None.. #Resolved

# connect delay is hardcoded for now. Later, this comes from a retry policy
self.reconnect_delay = 10
self.delayed_reconnect_delay = 10
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delayed_reconnect_delay [](start = 13, length = 23)

could you put some help docs over the two types of delay ? when do we choose one over the other ? #Resolved

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


In reply to: 437135351 [](ancestors = 437135351)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back to one delay variable and hardcoded the other. There's no reason to ever adjust the immediate delay and it's only used in a single case, so it seems less confusing if we only have a single delay variable.


In reply to: 439658750 [](ancestors = 439658750,437135351)

@@ -992,60 +1015,52 @@ def _send_new_connect_op_down(self):
def on_connect_complete(op, error):
this = self_weakref()
if this:
logger.debug(
"{}({}): on_connect_complete error={} state={} never_conencted={} connected={} ".format(
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never_conencted [](start = 67, length = 15)

spell #Resolved

self.state = ReconnectState.WAITING_TO_RECONNECT
self._start_reconnect_timer()
self._start_reconnect_timer(self.immediate_reconnect_delay)

else:
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else [](start = 12, length = 4)

for an user disconnect the code path will turn up here right ? where the reconnect timer will not fire ? #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrrect


In reply to: 437156650 [](ancestors = 437156650)

@olivakar
Copy link
Collaborator

olivakar commented Jun 9, 2020

class ReconnectStageInstantiationTests(ReconnectStageTestConfig):

initialize test for the never connected attribute to be set to true ? #Resolved


Refers to: azure-iot-device/tests/common/pipeline/test_pipeline_stages_base.py:2666 in 123c045. [](commit_id = 123c045, deletion_comment = False)

before, but it's fatal if we've never conencted.
WAITING_TO_RECONNECT: This stage is in a waiting period before reconnecting. This state implies
that the user wants the pipeline to be connected. ie. After a successful connection, the
state will change to LOGICALLY_CONNECED
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGICALLY_CONNECED [](start = 25, length = 18)

spell #Resolved

)

if self.pipeline_root.connected and self.state == ReconnectState.LOGICALLY_CONNECTED:
# when we get disconnected, we try to immediatly reconnect. If that fails,
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

immediatly [](start = 54, length = 10)

spell #Resolved

@pytest.mark.it(
"If and only if connected and logically connected, changes the state to WAITING_TO_RECONNECT"
)
def test_changes_state(self, stage, event, state, currently_connected):
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_changes_state [](start = 8, length = 18)

can u separate these tests for the currently_connected part ?
i see that this if blocks aare there in all tests and i understand that you may not want to repeat the tests..so never mind my comment.. #Resolved

if currently_connected and state == pipeline_stages_base.ReconnectState.LOGICALLY_CONNECTED:
if old_timer:
assert old_timer.cancel.call_count == 1
assert stage.reconnect_timer != old_timer
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!= [](start = 41, length = 3)

are we trying to assert that these 2 objects are different ? then we should probably use is not for unequal identities. #Resolved

pipeline_stages_base.ReconnectState.CONNECTED_OR_DISCONNECTED,
],
@pytest.mark.it(
"Changes the state to LOGICALLY_DISCONNECTED if the connection fails with an arbitrary error"
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arbitrary [](start = 85, length = 9)

all these arbitrary errors are non transient errors right ? for me if the statement mentions non transient error it makes more sense.. #Resolved

@pytest.mark.it(
"Completes all waiting ops with the transient failure if the connection fails with a transient error"
"Completes all waiting ops with the transient failure if a first-time connection fails with a transient error"
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient error [](start = 102, length = 15)

for a first connection case ,it is not ONLY transient error right ? it includes transient and non transient error of all kinds ? #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. The tests for permanent (arbitrary, non-transient) errors are above.


In reply to: 437760479 [](ancestors = 437760479)

@pytest.mark.it(
"Does not create a reconnect timer if the connection fails with a transient error"
"Does not create a reconnect timer if the connection fails with a transient error on a first-time connection"
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient error on a first-time connection [](start = 74, length = 42)

same comment as the first time connection case #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.


In reply to: 437760653 [](ancestors = 437760653)

@pytest.mark.it(
"Performs an MQTT reconnect via the MQTTTransport, using the pipeline root's SasToken as a password"
)
@pytest.mark.it("Performs an MQTT disconnect via the MQTTTransport")
def test_mqtt_connect(self, mocker, stage, op):
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_mqtt_connect [](start = 8, length = 17)

change test name #Resolved

@pytest.mark.it("Completes a pending ReauthorizeConnectionOperation successfully")
@pytest.mark.it(
"Ignores a pending ReathorizeConnectionOperation when the transport connected event fires"
)
def test_completes_pending_reconnect_op(self, mocker, stage):
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_completes_pending_reconnect_op [](start = 8, length = 35)

does not complete right ? and pending_reauthorize_op ? #Resolved

)
def test_cancels_watchdog_on_pending_reauthorize(self, mocker, stage, mock_timer):
def test_does_not_cancelcel_watchdog_on_pending_reauthorize(self, mocker, stage, mock_timer):
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_does_not_cancelcel_watchdog_on_pending_reauthorize [](start = 8, length = 55)

spell cancel #Resolved

@@ -763,7 +716,9 @@ def test_completes_pending_connect_op(self, mocker, stage):
assert op.error is None
assert stage._pending_connection_op is None

@pytest.mark.it("Completes a pending ReauthorizeConnectionOperation successfully")
@pytest.mark.it(
"Ignores a pending ReathorizeConnectionOperation when the transport connected event fires"
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignores [](start = 9, length = 7)

does not complete sounds more clear.. #Resolved

id="Pending ReauthorizeConnectionOperation",
),
],
"Completes (unsuccessfully) a ConnectOoperation with a ConnectionDroppedError if no cause is provided for the disconnection"
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectOoperation [](start = 38, length = 17)

spell #Resolved

id="Pending ReauthorizeConnectionOperation",
),
],
"Completes (unsuccessfully) a ConnectOoperation with a ConnectionDroppedError if no cause is provided for the disconnection"
)
def test_comletes_with_connection_dropped_error_as_error_if_no_cause(
Copy link
Collaborator

@olivakar olivakar Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comletes [](start = 13, length = 8)

spell #Resolved

)
def test_cancels_watchdog_on_pending_reauthorize(self, mocker, stage, mock_timer, cause):
def test_does_not_cancel_watchdog_on_pending_reauthorize(
Copy link
Collaborator

@olivakar olivakar Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_does_not_cancel_watchdog_on_pending_reauthorize [](start = 8, length = 52)

i understand what you want to test , but on looking at the code of _on_mqtt_disconnected , regardless of what type of op it is we are always calling _cancel_connection_watchdog , so when the test name of description becomes does not cancel , it is not matching if we just read the code. #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For disconnect and reauthorize, there is no connection watchdog, so there's nothing to cancel. The call to _cancle_connection_watchdog just doesn't do anything in this case. I changed the test name to be a little more clear and, unfortunately, also a little big more awkward.


In reply to: 437786906 [](ancestors = 437786906)

WAITING_TO_RECONNECT = "WAITING_TO_RECONNECT"
CONNECTED_OR_DISCONNECTED = "CONNECTED_OR_DISCONNECTED"
LOGICALLY_CONNECTED = "LOGICALLY_CONNECTED"
LOGICALLY_DISCONNECTED = "LOGICALLY_DISCONNECTED"
Copy link
Member

@cartertinney cartertinney Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big improvement here on the names #Resolved

self.state = ReconnectState.LOGICALLY_CONNECTED
# We don't send this op down. Instead, we send a new connect op down. This way,
# we can distinguish between connect ops that we're handling (they go into the
# queue) and connect ops that we are sending down.
Copy link
Member

@cartertinney cartertinney Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this indicates perhaps there should be a concrete separate op so this isn't just by convention. Something to consider going forth #WontFix

self.state = ReconnectState.WAITING_TO_RECONNECT
self._start_reconnect_timer()
self._start_reconnect_timer(self.immediate_reconnect_delay)
Copy link
Member

@cartertinney cartertinney Jun 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.immediate_reconnect_delay [](start = 44, length = 30)

Why not just directly initiate the action rather than starting a timer for 0.01 seconds that then triggers the action?

I assume it's to do with code readability and cleanliness? #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short, 2 reasons:

  1. When this is called, we are still "disconnecting" and not "completely disconnected", so it's better to let this event flow up before we try again.
  2. When I tried to call it immediately, the volume of test cases exploded. I'm normally against changing code to make testing easier, but this is a situation where the tests showed me that calling connect here had more implications than you would think.

In reply to: 439685542 [](ancestors = 439685542)

@@ -947,17 +960,18 @@ def _run_op(self, op):
self.name, op.name, self.state
)
)
self.state = ReconnectState.LOGICALLY_DISCONNECTED
self._clear_reconnect_timer()
self._complete_waiting_connect_ops(
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline_exceptions.OperationCancelled("Explicit disconnect invoked") [](start = 20, length = 69)

Unrelated to the rest of this PR I guess, but I don't get what's going on here, and why this gets set as a waiting connect op

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you selected the wrong line here. This code (DisconnectOperation, WAITING_TO_RECONNECT) covers the case where the connection was dropped and it's waiting to retry when the user calls disconnect(). In that case, any operations that were waiting for the reconnect to succeed need to be cancelled. By calling disconnect(), the user was effectively cancelling all operations that were pending.


In reply to: 439686461 [](ancestors = 439686461)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure because the line you selected is the one that completes (cancels) all of the waiting connection ops.


In reply to: 440422365 [](ancestors = 440422365,439686461)

)
# all others are fatal
this.state = ReconnectState.LOGICALLY_DISCONNECTED
this._clear_reconnect_timer()
this._complete_waiting_connect_ops(error)
Copy link
Member

@cartertinney cartertinney Jun 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this._complete_waiting_connect_ops(error) [](start = 24, length = 41)

This seems like it would include, as mentioned in a comment above, OperationCancelled operations, which represent a disconnect somehow.

So, reaching a disconnect state by connection failure causes the disconnect operation to fail?

I think I'm missing something here though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my previous response. The DisconnectOperation is never set as a waiting_connectoin_op. The disconnect causes waiting ops to be cancelled, but it's never a waiting op itself.


In reply to: 439687473 [](ancestors = 439687473)

@@ -937,6 +942,14 @@ def _run_op(self, op):
self.name, op.name, self.state
)
)
self.state = ReconnectState.LOGICALLY_CONNECTED
# We don't send this op down. Instead, we send a new connect op down. This way,
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we already were Logically Connected? Should we perhaps have an additional conditional block to prevent us from trying to connect when we already are (logically) connected? #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this train of thought, but it's taking me in a different direction. For now, ConnectionLockStage is above this one, so we only get a ConnectOperation here if we're physically disconnected.
In the longer term, your comment somewhere else in here makes me wonder if we want to have different types of connect ops:
PhysicalConnectOp - for calling into transport
LogicalConnectOp - for setting LogicallyConnected
GuaranteedPhysicalConnectOp - physical connect op with retry

or something like that. Myabe PhysicalConnectOp is guaranteed and NonGuaranteedConnectOp is the one that isn't guaranteed. I'm not sure.

Either way, that's rambing and not well thouht out. The code in here is good for now.


In reply to: 439846074 [](ancestors = 439846074)

self._clear_reconnect_timer()
self._complete_waiting_connect_ops(
pipeline_exceptions.OperationCancelled("Explicit disconnect invoked")
)
self.state = ReconnectState.CONNECTED_OR_DISCONNECTED
op.complete()

else:
logger.info(
"{}({}): State is {}. Sending op down.".format(self.name, op.name, self.state)
)
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here (and in the above blocks as well) we log the old state right before setting a new state. That's probably going to really confuse up the log statements, I assume it would be better for us to log AFTER adjusting the state #Resolved



class ReconnectStage(PipelineStage):
def __init__(self):
super(ReconnectStage, self).__init__()
self.reconnect_timer = None
self.state = ReconnectState.NEVER_CONNECTED
self.state = ReconnectState.LOGICALLY_DISCONNECTED
self.never_connected = True
# connect delay is hardcoded for now. Later, this comes from a retry policy
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also would like some doc surrounding this value, and it's significance. Clearly it is quite important to the logic, but just from the code here I can't tell why we have to care if something was never connected. I understand that a failure on the first connect is fatal (or so it says below), but the "why" isn't really captured anywhere here. #Closed

)
)
if this.never_connected:
# any error on a first connection is fatal
Copy link
Member

@cartertinney cartertinney Jun 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fatal [](start = 61, length = 5)

Is it? It doesn't look fatal. The client doesn't need to be scrapped as far as I can tell. #Resolved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"fatal" is maybe a bad error. I changed it to "transient error" or "permanent error". I think this is better.


In reply to: 439846621 [](ancestors = 439846621)

Copy link
Member

@cartertinney cartertinney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@BertKleewein BertKleewein merged commit acb6a46 into Azure:master Jun 16, 2020
@BertKleewein BertKleewein deleted the bertk-reconnect branch June 21, 2020 15:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants