Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MM-22787] Convert status sync job to use cluster job API #43

Merged
merged 5 commits into from
Mar 16, 2020

Conversation

mickmister
Copy link
Contributor

@mickmister mickmister commented Mar 3, 2020

Summary

This PR uses Schedule from mattermost-plugin-api to synchronize with other nodes running the same plugin in the same HA environment.

The job has been moved to a jobs package, along with a JobManager to handle config changes.

Ticket Link

https://mattermost.atlassian.net/browse/MM-22787
Fixes #41

@mickmister mickmister added 2: Dev Review Requires review by a core committer 3: QA Review Requires review by a QA tester labels Mar 3, 2020
@larkox
Copy link
Contributor

larkox commented Mar 3, 2020

I have to check better the code, but wouldn't this solution generate a process in each cluster that will get locked by the mutex, and when the mutex is released (for example, the cancelled channel is closed) one of the processes in another cluster will just get released and start executing?

@mickmister
Copy link
Contributor Author

@larkox Yes, that is the intended functionality. The job could be closed for a few reasons:

  • Plugin is shutting down due to plugin disable or server shutdown, or admin disabled the job
    • Sibling nodes (those waiting for the mutex) will be canceled. They may temporarily have control over the mutex. I think this is the problem you are referring to. We may want to have a small wait just after the mutex.Lock() call to avoid running the job when we have a cancel send coming soon.
  • Current node crashed
    • Another node will claim the mutex and begin running

Copy link
Contributor

@larkox larkox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost everything seems nice. See the comments and let me know what you think.

go.mod Outdated Show resolved Hide resolved
server/jobs/job_manager.go Outdated Show resolved Hide resolved
server/jobs/job_manager.go Show resolved Hide resolved
server/jobs/job_manager.go Outdated Show resolved Hide resolved
server/jobs/status_sync_job.go Show resolved Hide resolved
@lieut-data lieut-data self-requested a review March 6, 2020 13:24
Copy link
Member

@lieut-data lieut-data left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mickmister, thanks for the chance to provide an early review. It's been helpful to get the context into this particular usage.

You asked specifically about testing the activeJob stuff. I can see the dilemma here, as the logic is complex, and yet somewhat specific to this use case (needing to run on 5 minute intervals relative to the clock, so that the status has since updated from MS Calendar). Let me give this some more thought in case there's an opportunity to refactor the cluster Job code slightly to facilitate this kind of usage.


// Get mutex to gain access to sensitive section
lock := cluster.NewMutex(papi, job.id)
lock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's still pending PR review, but is the plan to use the new LockWithContext here? (In your case, would just need a context with cancellation created when the job is constructed, and then called as part of close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would make the cancel and cancelled channels I've put on activeJob obsolete, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lieut-data Is there a way I can use context.Context using cluster.Schedule, as we planned with LockWithContext?

server/jobs/job.go Outdated Show resolved Hide resolved
server/jobs/job.go Outdated Show resolved Hide resolved
for {
select {
case <-ticker.C:
go job.work(getEnv())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running as a goroutine introduces the possibility that the job is executed concurrently with itself. Is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the best approach is to call without go. I was attempting to make it so if a job lasted longer than INTERVAL, then the next job would not be offset from its intended run time. But I think the solution was incorrect.

Maybe if a given iteration goes overtime, we just skip the next run? It would depend on the use case of the job of course.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if a given iteration goes overtime, we just skip the next run?

+1

cancel: make(chan struct{}),
cancelled: make(chan struct{}),
}
actJob.start(jm.getEnv, jm.papi)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be cleaner to expose a job constructor that automatically started the job, i.e.:

activeJob := newActiveJob(jm.getEnv, jm.papi, job)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, getEnv seems like an unfortunate pattern -- effectively requires the job infrastructure to know more than necessary about the internal job function. I understand better your previous ask about folding this kind of scheduling into the cluster.Job. I'm happy to revisit that discussion if it would help!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment above about the config service.


// OnConfigurationChange activates/deactivates jobs based on their current state, and the current plugin config.
func (jm *JobManager) OnConfigurationChange(env mscalendar.Env) error {
jm.env = env
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnConfigurationChange can get called currently as a hook at least -- do we need to guard jm.env or other internal bits of the job manager with a mutex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, also we already have a ticket to re-factor closer to -workflow, and use a config "service".

server/jobs/job_manager.go Outdated Show resolved Hide resolved
server/jobs/job_manager.go Outdated Show resolved Hide resolved
e.Logger.Debugf("Enabling user status sync job")
e.statusSyncJob = mscalendar.NewStatusSyncJob(e.Env)
go e.statusSyncJob.Start()
if p.env.jobManager == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, OnConfigurationChange might be executed concurrently. I'd suggest initializing the job manager and jobs unconditionally when the plugin is constructed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p.env.jobManager is being assigned here in a callback passed to p.updateEnv, which is guarded by a Mutex. So the initialization will only happen once, and any call to jm.OnConfigurationChange will also be guarded by a Mutex.

I think having two persistent pointers to env (one in Plugin, and one in JobManager) is the main anti-pattern here. Any job handling functionality should be stateless, as far as the Config dependencies. I can see Plugin passing a func(callback func()) Env param to initialize the JobManager. This would be guarded by the Mutex just like updateEnv. What do you think @lieut-data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being initialized in p.updateEnv, the same mutex-controlled area that other resources are being initialized, such as the plugin's NotificationProcessor. I think rearranging where they are initialized is out of scope of this PR, but I can make a followup ticket if it's appropriate.

server/jobs/job.go Outdated Show resolved Hide resolved
levb
levb previously approved these changes Mar 15, 2020
Copy link
Contributor

@levb levb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM; I'd like to see @lieut-data's comments addressed or resolved.

for {
select {
case <-ticker.C:
go job.work(getEnv())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if a given iteration goes overtime, we just skip the next run?

+1

cancelOnce sync.Once
}

// NewJobManager creates a JobManager for to let plugin.go coordinate with the scheduled jobs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I'm slightly inclined to focus on getting the functionality in, and maybe filing a ticket to refactor for simplicity. Generally though I agree that the less plugin-side complexity we have here, the better. Specifically, sacrificing a couple minutes of the notification latency/precision appears acceptable.


// OnConfigurationChange activates/deactivates jobs based on their current state, and the current plugin config.
func (jm *JobManager) OnConfigurationChange(env mscalendar.Env) error {
jm.env = env
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, also we already have a ticket to re-factor closer to -workflow, and use a config "service".

cancel: make(chan struct{}),
cancelled: make(chan struct{}),
}
actJob.start(jm.getEnv, jm.papi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment above about the config service.

@codecov-io
Copy link

codecov-io commented Mar 16, 2020

Codecov Report

Merging #43 into master will increase coverage by 2.35%.
The diff coverage is 64.55%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #43      +/-   ##
==========================================
+ Coverage   20.66%   23.02%   +2.35%     
==========================================
  Files          53       54       +1     
  Lines        1674     1724      +50     
==========================================
+ Hits          346      397      +51     
+ Misses       1300     1293       -7     
- Partials       28       34       +6     
Impacted Files Coverage Δ
server/jobs/status_sync_job.go 0.00% <0.00%> (ø)
server/jobs/job_manager.go 78.46% <78.46%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update be9a3a8...d61f8f4. Read the comment docs.

Copy link
Contributor

@larkox larkox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small thing that I do not understand. The rest LGTM

@@ -23,12 +29,22 @@ type RegisteredJob struct {
isEnabledByConfig func(env mscalendar.Env) bool
}

var scheduleFunc = func(api cluster.JobPluginAPI, id string, wait cluster.NextWaitInterval, cb func()) (io.Closer, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not get why are you declaring this function here, instead of using directly `cluster.Schedule(api, id, wait, cb) whenever you need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's so the test can mock the function. I can instead make it into an interface, but since the cluster package exposes it as a flat function, I chose this method instead.

Copy link
Member

@lieut-data lieut-data left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for simpler code -- deferring the job manager discussion to @levb's discretion. Let me know if I can help further!

@mickmister mickmister requested a review from larkox March 16, 2020 18:25
@mickmister mickmister removed the 2: Dev Review Requires review by a core committer label Mar 16, 2020
@mickmister
Copy link
Contributor Author

Merging now, will be tested on master per @DHaussermann's suggestion offline.

@mickmister mickmister merged commit 7320402 into master Mar 16, 2020
@mickmister mickmister deleted the GH-41-cluster-job branch March 16, 2020 19:23
@hanzei hanzei added 4: Reviews Complete All reviewers have approved the pull request and removed 3: QA Review Requires review by a QA tester labels Jun 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
4: Reviews Complete All reviewers have approved the pull request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Convert status sync job to use cluster-safe job
6 participants