-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MM-22787] Convert status sync job to use cluster job API #43
Conversation
I have to check better the code, but wouldn't this solution generate a process in each cluster that will get locked by the mutex, and when the mutex is released (for example, the cancelled channel is closed) one of the processes in another cluster will just get released and start executing? |
@larkox Yes, that is the intended functionality. The job could be closed for a few reasons:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost everything seems nice. See the comments and let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mickmister, thanks for the chance to provide an early review. It's been helpful to get the context into this particular usage.
You asked specifically about testing the activeJob
stuff. I can see the dilemma here, as the logic is complex, and yet somewhat specific to this use case (needing to run on 5 minute intervals relative to the clock, so that the status has since updated from MS Calendar). Let me give this some more thought in case there's an opportunity to refactor the cluster Job code slightly to facilitate this kind of usage.
server/jobs/job.go
Outdated
|
||
// Get mutex to gain access to sensitive section | ||
lock := cluster.NewMutex(papi, job.id) | ||
lock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's still pending PR review, but is the plan to use the new LockWithContext
here? (In your case, would just need a context with cancellation created when the job is constructed, and then called as part of close
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would make the cancel
and cancelled
channels I've put on activeJob
obsolete, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lieut-data Is there a way I can use context.Context
using cluster.Schedule
, as we planned with LockWithContext
?
server/jobs/job.go
Outdated
for { | ||
select { | ||
case <-ticker.C: | ||
go job.work(getEnv()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running as a goroutine introduces the possibility that the job is executed concurrently with itself. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the best approach is to call without go
. I was attempting to make it so if a job lasted longer than INTERVAL
, then the next job would not be offset from its intended run time. But I think the solution was incorrect.
Maybe if a given iteration goes overtime, we just skip the next run? It would depend on the use case of the job of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if a given iteration goes overtime, we just skip the next run?
+1
server/jobs/job_manager.go
Outdated
cancel: make(chan struct{}), | ||
cancelled: make(chan struct{}), | ||
} | ||
actJob.start(jm.getEnv, jm.papi) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be cleaner to expose a job constructor that automatically started the job, i.e.:
activeJob := newActiveJob(jm.getEnv, jm.papi, job)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, getEnv
seems like an unfortunate pattern -- effectively requires the job infrastructure to know more than necessary about the internal job function. I understand better your previous ask about folding this kind of scheduling into the cluster.Job
. I'm happy to revisit that discussion if it would help!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment above about the config service.
|
||
// OnConfigurationChange activates/deactivates jobs based on their current state, and the current plugin config. | ||
func (jm *JobManager) OnConfigurationChange(env mscalendar.Env) error { | ||
jm.env = env |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnConfigurationChange
can get called currently as a hook at least -- do we need to guard jm.env
or other internal bits of the job manager with a mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, also we already have a ticket to re-factor closer to -workflow, and use a config "service".
server/plugin/plugin.go
Outdated
e.Logger.Debugf("Enabling user status sync job") | ||
e.statusSyncJob = mscalendar.NewStatusSyncJob(e.Env) | ||
go e.statusSyncJob.Start() | ||
if p.env.jobManager == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted above, OnConfigurationChange
might be executed concurrently. I'd suggest initializing the job manager and jobs unconditionally when the plugin is constructed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.env.jobManager
is being assigned here in a callback passed to p.updateEnv
, which is guarded by a Mutex
. So the initialization will only happen once, and any call to jm.OnConfigurationChange
will also be guarded by a Mutex
.
I think having two persistent pointers to env
(one in Plugin
, and one in JobManager
) is the main anti-pattern here. Any job handling functionality should be stateless, as far as the Config dependencies. I can see Plugin
passing a func(callback func()) Env
param to initialize the JobManager
. This would be guarded by the Mutex
just like updateEnv
. What do you think @lieut-data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being initialized in p.updateEnv
, the same mutex-controlled area that other resources are being initialized, such as the plugin's NotificationProcessor
. I think rearranging where they are initialized is out of scope of this PR, but I can make a followup ticket if it's appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM; I'd like to see @lieut-data's comments addressed or resolved.
server/jobs/job.go
Outdated
for { | ||
select { | ||
case <-ticker.C: | ||
go job.work(getEnv()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if a given iteration goes overtime, we just skip the next run?
+1
cancelOnce sync.Once | ||
} | ||
|
||
// NewJobManager creates a JobManager for to let plugin.go coordinate with the scheduled jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point I'm slightly inclined to focus on getting the functionality in, and maybe filing a ticket to refactor for simplicity. Generally though I agree that the less plugin-side complexity we have here, the better. Specifically, sacrificing a couple minutes of the notification latency/precision appears acceptable.
|
||
// OnConfigurationChange activates/deactivates jobs based on their current state, and the current plugin config. | ||
func (jm *JobManager) OnConfigurationChange(env mscalendar.Env) error { | ||
jm.env = env |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, also we already have a ticket to re-factor closer to -workflow, and use a config "service".
server/jobs/job_manager.go
Outdated
cancel: make(chan struct{}), | ||
cancelled: make(chan struct{}), | ||
} | ||
actJob.start(jm.getEnv, jm.papi) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment above about the config service.
Codecov Report
@@ Coverage Diff @@
## master #43 +/- ##
==========================================
+ Coverage 20.66% 23.02% +2.35%
==========================================
Files 53 54 +1
Lines 1674 1724 +50
==========================================
+ Hits 346 397 +51
+ Misses 1300 1293 -7
- Partials 28 34 +6
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small thing that I do not understand. The rest LGTM
@@ -23,12 +29,22 @@ type RegisteredJob struct { | |||
isEnabledByConfig func(env mscalendar.Env) bool | |||
} | |||
|
|||
var scheduleFunc = func(api cluster.JobPluginAPI, id string, wait cluster.NextWaitInterval, cb func()) (io.Closer, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not get why are you declaring this function here, instead of using directly `cluster.Schedule(api, id, wait, cb) whenever you need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's so the test can mock the function. I can instead make it into an interface, but since the cluster
package exposes it as a flat function, I chose this method instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for simpler code -- deferring the job manager discussion to @levb's discretion. Let me know if I can help further!
Merging now, will be tested on master per @DHaussermann's suggestion offline. |
Summary
This PR uses
Schedule
from mattermost-plugin-api to synchronize with other nodes running the same plugin in the same HA environment.The job has been moved to a
jobs
package, along with aJobManager
to handle config changes.Ticket Link
https://mattermost.atlassian.net/browse/MM-22787
Fixes #41