Skip to content

Commit

Permalink
refactored job runner system to run more reliable in dev mode
Browse files Browse the repository at this point in the history
  • Loading branch information
mpscholten committed Jan 8, 2022
1 parent 9e89be1 commit bce55bf
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 130 deletions.
19 changes: 8 additions & 11 deletions IHP/Job/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,15 @@ fetchNextJob workerId = do
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> IO () -> IO PGListener.Subscription
watchForJob pgListener tableName pollInterval handleJob = do
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob pgListener tableName pollInterval onNewJob = do
let tableNameBS = cs tableName
sqlExec (createNotificationTrigger tableNameBS) ()

poller <- pollForJob tableName pollInterval handleJob
subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const handleJob)
poller <- pollForJob tableName pollInterval onNewJob
subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable))

-- When the thread, we also want to stop the poller
Async.link poller

pure subscription
pure (subscription, poller)

-- | Periodically checks the queue table for open jobs. Calls the callback if there are any.
--
Expand All @@ -89,15 +86,15 @@ watchForJob pgListener tableName pollInterval handleJob = do
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
--
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> IO () -> IO (Async.Async ())
pollForJob tableName pollInterval handleJob = do
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob tableName pollInterval onNewJob = do
let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1"
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry)
Async.asyncBound do
forever do
count :: Int <- sqlQueryScalar query params

when (count > 0) handleJob
when (count > 0) (Concurrent.putMVar onNewJob JobAvailable)

-- Add up to 2 seconds of jitter to avoid all job queues polling at the same time
jitter <- Random.randomRIO (0, 2000000)
Expand Down
194 changes: 111 additions & 83 deletions IHP/Job/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,71 +21,102 @@ import qualified System.Timeout as Timeout
import qualified Control.Concurrent.Async.Pool as Pool
import qualified IHP.PGListener as PGListener

runJobWorkers :: [JobWorker] -> Script
runJobWorkers jobWorkers = do
runJobWorkersWithExitHandler jobWorkers waitExitHandler
forever (Concurrent.threadDelay maxBound)

runJobWorkersKillOnExit :: [JobWorker] -> Script
runJobWorkersKillOnExit jobWorkers = runJobWorkersWithExitHandler jobWorkers stopExitHandler
import IHP.Log.Types
import qualified IHP.Log as Log

runJobWorkersWithExitHandler :: [JobWorker] -> (JobWorkerArgs -> IO () -> IO ()) -> Script
runJobWorkersWithExitHandler jobWorkers withExitHandler = do
-- | Used by the RunJobs binary
runJobWorkers :: [JobWorker] -> Script
runJobWorkers jobWorkers = dedicatedProcessMainLoop jobWorkers

-- | This job worker main loop is used when the job workers are running as part of their own binary
--
-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops
-- as the stop handling works a different in those cases.
--
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop jobWorkers = do
threadId <- Concurrent.myThreadId
exitSignalsCount <- newIORef 0
workerId <- UUID.nextRandom
allJobs <- newIORef []
let oneSecond = 1000000
let logger = ?context |> get #logger

putStrLn ("Starting worker " <> tshow workerId)
Log.info ("Starting worker " <> tshow workerId)

-- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
-- could overload the main PGListener connection. In that case we still want jobs to be
-- run independent of the system being very busy.
pgListener <- PGListener.init ?modelContext
stopSignal <- Concurrent.newEmptyMVar
waitForExitSignal <- installSignalHandlers

let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }

processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)

waitForExitSignal

Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)

-- Todo: When do we call `PGListener.stop pgListener` ?
-- Stop subscriptions and poller already
-- This will stop all producers for the queue MVar
forEach processes \JobWorkerProcess { poller, subscription, action } -> do
PGListener.unsubscribe subscription pgListener
Async.cancel poller
Concurrent.putMVar action Stop

let jobWorkerArgs = JobWorkerArgs { allJobs, workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }
withExitHandler jobWorkerArgs do
listenAndRuns <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)
PGListener.stop pgListener

forEach listenAndRuns Async.link
-- While waiting for all jobs to complete, we also wait for another exit signal
-- If the user sends two exit signals, we just kill all processes
async do
waitForExitSignal

Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)

forEach processes \JobWorkerProcess { runners } -> do
forEach runners Async.cancel

Concurrent.throwTo threadId Exit.ExitSuccess

pure ()

waitExitHandler JobWorkerArgs { .. } main = do
threadId <- Concurrent.myThreadId
exitSignalsCount <- newIORef 0
let catchHandler = do
exitSignalsCount' <- readIORef exitSignalsCount
modifyIORef exitSignalsCount ((+) 1)
allJobs' <- readIORef allJobs
allJobsCompleted <- allJobs'
|> mapM Pool.poll
>>= pure . filter isNothing
>>= pure . null
if allJobsCompleted
then Concurrent.throwTo threadId Exit.ExitSuccess
else if exitSignalsCount' == 0
then do
putStrLn "Waiting for jobs to complete. CTRL+C again to force exit"
forEach allJobs' Pool.wait
Concurrent.throwTo threadId Exit.ExitSuccess
else if exitSignalsCount' == 1 then do
putStrLn "Canceling all running jobs. CTRL+C again to force exit"
forEach allJobs' Pool.cancel

Concurrent.throwTo threadId Exit.ExitSuccess
else Concurrent.throwTo threadId Exit.ExitSuccess
-- Wait for all runners to complete
forEach processes \JobWorkerProcess { runners } -> do
forEach runners Async.wait

Concurrent.throwTo threadId Exit.ExitSuccess

devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop frameworkConfig pgListener jobWorkers = do
workerId <- UUID.nextRandom
let ?context = frameworkConfig
let logger = frameworkConfig |> get #logger

Log.info ("Starting worker " <> tshow workerId)

let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }

processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)

(forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
forEach processes \JobWorkerProcess { poller, subscription, runners, action } -> do
Concurrent.putMVar action Stop
Async.cancel poller
forEach runners Async.cancel

-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
exitSignal <- Concurrent.newEmptyMVar

let catchHandler = Concurrent.putMVar exitSignal ()

Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing

main

pure ()

pure (Concurrent.takeMVar exitSignal)

stopExitHandler JobWorkerArgs { .. } main = main

Expand Down Expand Up @@ -128,44 +159,41 @@ jobWorkerFetchAndRunLoop :: forall job.
, CanUpdate job
, Show job
, Table job
) => JobWorkerArgs -> IO (Async.Async ())
) => JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do
let ?context = frameworkConfig
let ?modelContext = modelContext

-- TOOD: now that we have maxConcurrency a better approach would be to
-- put all jobs in a MVar and then start maxConcurrency asyncs taking jobs from there
async do
Pool.withTaskGroup (maxConcurrency @job) \taskGroup -> do
-- This loop schedules all jobs that are in the queue.
-- It will be initally be called when first starting up this job worker
-- and after that it will be called when something has been inserted into the queue (or changed to retry)
let startLoop = do
putStrLn "STARTING ASYNC JOB"
asyncJob <- Pool.async taskGroup do
Exception.mask $ \restore -> do
maybeJob <- Queue.fetchNextJob @job workerId
case maybeJob of
Just job -> do
putStrLn ("Starting job: " <> tshow job)
let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.try (Timeout.timeout timeout (restore (perform job)))
case resultOrException of
Left exception -> Queue.jobDidFail job exception
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

startLoop
Nothing -> pure ()
Pool.link asyncJob
modifyIORef allJobs (asyncJob:)

-- Start all jobs in the queue
startLoop

-- Start a job when a new job is added to the table or when it's set to retry
watcher <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) startLoop

-- Keep the task group alive until the outer async is killed
forever (Concurrent.threadDelay maxBound)
action <- Concurrent.newMVar JobAvailable
runners <- forM [0..(maxConcurrency @job)] \index -> async do
let loop = do
receivedAction <- Concurrent.takeMVar action

case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)

let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.try (Timeout.timeout timeout (perform job))
case resultOrException of
Left exception -> Queue.jobDidFail job exception
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

loop
Nothing -> loop
Stop -> do
-- Put the stop signal back in to stop the other runners as well
Concurrent.putMVar action Stop
pure ()

loop

(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action


pure JobWorkerProcess { runners, subscription, poller, action }
20 changes: 17 additions & 3 deletions IHP/Job/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module IHP.Job.Types
, JobWorker (..)
, JobStatus (..)
, Worker (..)
, JobWorkerProcess (..)
, JobWorkerProcessMessage (..)
)
where

Expand All @@ -13,6 +15,7 @@ import IHP.FrameworkConfig
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async.Pool as Pool
import qualified IHP.PGListener as PGListener
import qualified Control.Concurrent as Concurrent

class Job job where
perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO ()
Expand Down Expand Up @@ -41,14 +44,13 @@ class Worker application where
workers :: application -> [JobWorker]

data JobWorkerArgs = JobWorkerArgs
{ allJobs :: IORef [Pool.Async ()]
, workerId :: UUID
{ workerId :: UUID
, modelContext :: ModelContext
, frameworkConfig :: FrameworkConfig
, pgListener :: PGListener.PGListener
}

newtype JobWorker = JobWorker (JobWorkerArgs -> IO (Async.Async ()))
newtype JobWorker = JobWorker (JobWorkerArgs -> IO JobWorkerProcess)

-- | Mapping for @JOB_STATUS@. The DDL statement for this can be found in IHPSchema.sql:
--
Expand All @@ -61,3 +63,15 @@ data JobStatus
| JobStatusSucceeded
| JobStatusRetry
deriving (Eq, Show, Read, Enum)

data JobWorkerProcess
= JobWorkerProcess
{ runners :: [Async ()]
, subscription :: PGListener.Subscription
, poller :: Async ()
, action :: Concurrent.MVar JobWorkerProcessMessage
}

data JobWorkerProcessMessage
= JobAvailable
| Stop
63 changes: 30 additions & 33 deletions IHP/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,45 +37,42 @@ import qualified System.Directory as Directory

run :: (FrontController RootApplication, Job.Worker RootApplication) => ConfigBuilder -> IO ()
run configBuilder = do
frameworkConfig <- buildFrameworkConfig configBuilder

sessionVault <- Vault.newKey
modelContext <- initModelContext frameworkConfig
pgListener <- PGListener.init modelContext
autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener)

let ?modelContext = modelContext
let ?applicationContext = ApplicationContext { modelContext = ?modelContext, session = sessionVault, autoRefreshServer, frameworkConfig, pgListener }

sessionMiddleware <- initSessionMiddleware sessionVault frameworkConfig
staticMiddleware <- initStaticMiddleware frameworkConfig
let corsMiddleware = initCorsMiddleware frameworkConfig
let requestLoggerMiddleware = get #requestLoggerMiddleware frameworkConfig

let run = withBackgroundWorkers frameworkConfig $
runServer frameworkConfig $
staticMiddleware $
corsMiddleware $
sessionMiddleware $
requestLoggerMiddleware $
methodOverridePost $
application

run `finally` do
frameworkConfig |> get #logger |> get #cleanup
PGListener.stop pgListener
let withFrameworkConfig = Exception.bracket (buildFrameworkConfig configBuilder) (\frameworkConfig -> frameworkConfig |> get #logger |> get #cleanup)

withFrameworkConfig \frameworkConfig -> do
modelContext <- initModelContext frameworkConfig
let withPGListener = Exception.bracket (PGListener.init modelContext) PGListener.stop

withPGListener \pgListener -> do
sessionVault <- Vault.newKey

autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener)

let ?modelContext = modelContext
let ?applicationContext = ApplicationContext { modelContext = ?modelContext, session = sessionVault, autoRefreshServer, frameworkConfig, pgListener }

sessionMiddleware <- initSessionMiddleware sessionVault frameworkConfig
staticMiddleware <- initStaticMiddleware frameworkConfig
let corsMiddleware = initCorsMiddleware frameworkConfig
let requestLoggerMiddleware = get #requestLoggerMiddleware frameworkConfig

withBackgroundWorkers pgListener frameworkConfig $
runServer frameworkConfig $
staticMiddleware $
corsMiddleware $
sessionMiddleware $
requestLoggerMiddleware $
methodOverridePost $
application

{-# INLINABLE run #-}

withBackgroundWorkers :: (Job.Worker RootApplication, ?modelContext :: ModelContext) => FrameworkConfig -> IO a -> IO a
withBackgroundWorkers frameworkConfig app = do
withBackgroundWorkers :: (Job.Worker RootApplication, ?modelContext :: ModelContext) => PGListener.PGListener -> FrameworkConfig -> IO a -> IO a
withBackgroundWorkers pgListener frameworkConfig app = do
let jobWorkers = Job.workers RootApplication
let isDevelopment = get #environment frameworkConfig == Env.Development
if isDevelopment && not (isEmpty jobWorkers)
then do
workerAsync <- async (let ?context = frameworkConfig in Job.runJobWorkersKillOnExit jobWorkers)
Async.link workerAsync
app
then Exception.bracket (async (Job.devServerMainLoop frameworkConfig pgListener jobWorkers)) (Async.cancel) (const app)
else app

-- | Returns a middleware that returns files stored in the app's @static/@ directory and IHP's own @static/@ directory
Expand Down

0 comments on commit bce55bf

Please sign in to comment.