-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug 1562412 - Notarization poller worker #121
bug 1562412 - Notarization poller worker #121
Conversation
Moving to review-ready before I've fully tested, since I'm a ways away from in-tree patches. I'm hoping to avoid the situation where I'm all ready to land, but am missing review during the holidays. I can try to keep track of the interdiff between the reviewed revision and any new changes. @mitchhentges do you have time / headspace for giving this a review pass? If not, possibly @JohanLorenzo ? |
Oof, this is a big patch. I'll look at this later this week if I have time :) |
Yeah. @tomprince said he could take a look too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me! I like how multiple tasks are handled in a single worker. I haven't found any major issue.
I haven't looked too closely at the project configuration (tox, flake8 and all), because I assumed it's a copy-pasta of the other projects.
👍 👍 👍
exit_code = STATUSES["internal-error"] | ||
|
||
|
||
class RetryError(WorkerError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can reuse
class RetryError(ClientError): |
I wonder if we should move the other exceptions there too. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. This is a bit different in that we're setting a resource-unavailable
status. Other than that, possibly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a bunch of comments on the implementation with suggestions on some restructuring that I think would make the code easier to reason about, and in one case make the notarization code clearly separate from the taskcluster worker code.
It is probably worth chatting about this before addressing the comments.
new_tasks = await self._run_cancellable(claim_work(self.config, queue, num_tasks=num_tasks_to_claim)) | ||
self.last_claim_work = arrow.utcnow() | ||
for claim_task in new_tasks.get("tasks", []): | ||
new_task = Task(self.config, claim_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Task
has a .start
method that returns a future, you can add a done_callback
to remove it from running_tasks, rather than periodically pruning it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't done this. The current approach seems to be working. Not sure if this was a blocker?
self.running_tasks.append(new_task) | ||
await self.prune_running_tasks() | ||
sleep_time = self.last_claim_work.timestamp + self.config["claim_work_interval"] - arrow.utcnow().timestamp | ||
sleep_time > 0 and await self._run_cancellable(sleep(sleep_time)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if the body of this functions is wrapped in an asyncio.Task
, then you don't need _run_cancellable
, and cancelling the task will propagate to the awaited futures here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've done this in the latest working patchset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still see _run_cancellable
here. It exists to propagate cancellation to the sleep
and claim_work
calls. If you don't have that function, and cancel the future from invoke
, it should automatically propagate the cancellation to the sleep
and claim_work
calls.
That said, this seems like something that can be left for a followup.
Tom's wip here. I'll push up my latest changes here after I have a chance to clean them up. I haven't split the queue interactions from the apple interactions, but I may do so to support integration tests better (since we can easily "mock" apple interactions out by instantiating a different class for the task logic). |
I'm planning on merging and deploying this. We can follow up with any needed changes. |
/notarization_poller has errors:
|
else: | ||
log.exception("reclaim_task unexpected exception: %s %s", self.task_id, self.run_id) | ||
self.status = STATUSES["internal-error"] | ||
self.task_fut and self.task_fut.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be slightly inclined to switch the creation of task_fut
and reclaim_fut
above, and drop the self.task_fut and
here, but probably not important.
self.running_tasks.append(new_task) | ||
await self.prune_running_tasks() | ||
sleep_time = self.last_claim_work.timestamp + self.config["claim_work_interval"] - arrow.utcnow().timestamp | ||
sleep_time > 0 and await self._run_cancellable(sleep(sleep_time)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still see _run_cancellable
here. It exists to propagate cancellation to the sleep
and claim_work
calls. If you don't have that function, and cancel the future from invoke
, it should automatically propagate the cancellation to the sleep
and claim_work
calls.
That said, this seems like something that can be left for a followup.
log.info("SIGTERM received; shutting down") | ||
nonlocal done | ||
done = True | ||
await running_tasks.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since add_signal_handler
does handle futures, I think it would be better if _handle_sigterm
were sync and handled anything that needed to be async itself.
That said, it looks like it async so that running_tasks.cancel
can wait on the task futures. Since nothing is waiting on the result of that, I don't think that await actually has any effect. Thus, I think this function and running_tasks.cancel
can both be sync.
fae37f8#diff-5fb318e79005ec63cfed70064fa1e861R71 is where I handled waiting on all of those tasks to finish in my sketch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mainly copied over these coroutines. I'm almost leaning towards tearing out this code, since the worst that can happen with notarization_poller
dying is we wait a bit longer on tasks that just poll and wait. Do you have a preference between tearing these out and leaving them as-is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine either way. My main concerns are:
- the code is more complicated than it needs to be
- the code makes it look like it will wait for the futures to complete when it likely won't. (Whether they will or not depends on the order of callback in the event loop, and may also depend on how much (if any) work they do in being cancelled)
Long-term, I think we should improve the code here and in scriptworker to handle this case well. But, I'm fine landing without this code or as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RIght. My main concerns are: I have done a significant amount of testing with the code as is. Making non-trivial changes has the likelihood of introducing new errors, just as I'm about to roll out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment (about handling graceful shutdown well), and a number of possible followups.
@@ -76,9 +76,12 @@ def start(self): | |||
except TaskError: | |||
self.status = STATUSES["malformed-payload"] | |||
self.task_log(traceback.format_exc(), level=logging.CRITICAL) | |||
except asyncio.CancelledError: | |||
# We already dealt with self.status in reclaim_task | |||
self.task_log(traceback.format_exc(), level=logging.CRITICAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(followup) We may want to consider not logging here, since we'll have already logged elsewhere. It probably isn't worth worrying about until after this has been deployed and is in production for a while though.
log.info("SIGTERM received; shutting down") | ||
nonlocal done | ||
done = True | ||
await running_tasks.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine either way. My main concerns are:
- the code is more complicated than it needs to be
- the code makes it look like it will wait for the futures to complete when it likely won't. (Whether they will or not depends on the order of callback in the event loop, and may also depend on how much (if any) work they do in being cancelled)
Long-term, I think we should improve the code here and in scriptworker to handle this case well. But, I'm fine landing without this code or as-is.
new_task.start() | ||
self.running_tasks.append(new_task) | ||
await self.prune_running_tasks() | ||
sleep_time = self.last_claim_work.timestamp + self.config["claim_work_interval"] - arrow.utcnow().timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(potential followup) Since we don't do anything async[1] between setting last_claim_work
and now, I suspect we could use claim_work_interval
directly here, but it doesn't hurt to do this calculation. (It would be slightly more interesting if we took the time before calling claim work, so it was the time between calls to claim work)
[1] We await prune_running_tasks
, so it may take several event loop iterations, but we won't wait on I/O or anything.
This is the poller worker, as described in the user story for bug 1562412. We need to be able to claim and track multiple concurrent tasks, poll Apple for each of their statuses, and resolve tasks when Apple's ready.
It looks like we could potentially abstract away some of this logic into a base python worker module, and share it with scriptworker. I've explicitly decided not to do that this time around.