From bce55bf30197e5670b3ea5c89943f6a508a69181 Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Sat, 8 Jan 2022 15:38:05 +0100 Subject: [PATCH] refactored job runner system to run more reliable in dev mode --- IHP/Job/Queue.hs | 19 ++--- IHP/Job/Runner.hs | 194 ++++++++++++++++++++++++++-------------------- IHP/Job/Types.hs | 20 ++++- IHP/Server.hs | 63 +++++++-------- 4 files changed, 166 insertions(+), 130 deletions(-) diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs index 5cc86e4b8..6015449c0 100644 --- a/IHP/Job/Queue.hs +++ b/IHP/Job/Queue.hs @@ -68,18 +68,15 @@ fetchNextJob workerId = do -- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@ -- You will see that @"Something changed in the projects table"@ is printed onto the screen. -- -watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> IO () -> IO PGListener.Subscription -watchForJob pgListener tableName pollInterval handleJob = do +watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ()) +watchForJob pgListener tableName pollInterval onNewJob = do let tableNameBS = cs tableName sqlExec (createNotificationTrigger tableNameBS) () - poller <- pollForJob tableName pollInterval handleJob - subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const handleJob) + poller <- pollForJob tableName pollInterval onNewJob + subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable)) - -- When the thread, we also want to stop the poller - Async.link poller - - pure subscription + pure (subscription, poller) -- | Periodically checks the queue table for open jobs. Calls the callback if there are any. -- @@ -89,15 +86,15 @@ watchForJob pgListener tableName pollInterval handleJob = do -- -- This function returns a Async. Call 'cancel' on the async to stop polling the database. -- -pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> IO () -> IO (Async.Async ()) -pollForJob tableName pollInterval handleJob = do +pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ()) +pollForJob tableName pollInterval onNewJob = do let query = "SELECT COUNT(*) FROM ? WHERE ((status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds')) AND locked_by IS NULL AND run_at <= NOW() LIMIT 1" let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry) Async.asyncBound do forever do count :: Int <- sqlQueryScalar query params - when (count > 0) handleJob + when (count > 0) (Concurrent.putMVar onNewJob JobAvailable) -- Add up to 2 seconds of jitter to avoid all job queues polling at the same time jitter <- Random.randomRIO (0, 2000000) diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs index 158a9154d..1fa4b72b6 100644 --- a/IHP/Job/Runner.hs +++ b/IHP/Job/Runner.hs @@ -21,71 +21,102 @@ import qualified System.Timeout as Timeout import qualified Control.Concurrent.Async.Pool as Pool import qualified IHP.PGListener as PGListener -runJobWorkers :: [JobWorker] -> Script -runJobWorkers jobWorkers = do - runJobWorkersWithExitHandler jobWorkers waitExitHandler - forever (Concurrent.threadDelay maxBound) - -runJobWorkersKillOnExit :: [JobWorker] -> Script -runJobWorkersKillOnExit jobWorkers = runJobWorkersWithExitHandler jobWorkers stopExitHandler +import IHP.Log.Types +import qualified IHP.Log as Log -runJobWorkersWithExitHandler :: [JobWorker] -> (JobWorkerArgs -> IO () -> IO ()) -> Script -runJobWorkersWithExitHandler jobWorkers withExitHandler = do +-- | Used by the RunJobs binary +runJobWorkers :: [JobWorker] -> Script +runJobWorkers jobWorkers = dedicatedProcessMainLoop jobWorkers + +-- | This job worker main loop is used when the job workers are running as part of their own binary +-- +-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops +-- as the stop handling works a different in those cases. +-- +dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO () +dedicatedProcessMainLoop jobWorkers = do + threadId <- Concurrent.myThreadId + exitSignalsCount <- newIORef 0 workerId <- UUID.nextRandom - allJobs <- newIORef [] - let oneSecond = 1000000 + let logger = ?context |> get #logger - putStrLn ("Starting worker " <> tshow workerId) + Log.info ("Starting worker " <> tshow workerId) -- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync -- could overload the main PGListener connection. In that case we still want jobs to be -- run independent of the system being very busy. pgListener <- PGListener.init ?modelContext + stopSignal <- Concurrent.newEmptyMVar + waitForExitSignal <- installSignalHandlers + + let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener } + + processes <- jobWorkers + |> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs) + + waitForExitSignal + + Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text) - -- Todo: When do we call `PGListener.stop pgListener` ? + -- Stop subscriptions and poller already + -- This will stop all producers for the queue MVar + forEach processes \JobWorkerProcess { poller, subscription, action } -> do + PGListener.unsubscribe subscription pgListener + Async.cancel poller + Concurrent.putMVar action Stop - let jobWorkerArgs = JobWorkerArgs { allJobs, workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener } - withExitHandler jobWorkerArgs do - listenAndRuns <- jobWorkers - |> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs) + PGListener.stop pgListener - forEach listenAndRuns Async.link + -- While waiting for all jobs to complete, we also wait for another exit signal + -- If the user sends two exit signals, we just kill all processes + async do + waitForExitSignal + + Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text) + + forEach processes \JobWorkerProcess { runners } -> do + forEach runners Async.cancel + + Concurrent.throwTo threadId Exit.ExitSuccess pure () -waitExitHandler JobWorkerArgs { .. } main = do - threadId <- Concurrent.myThreadId - exitSignalsCount <- newIORef 0 - let catchHandler = do - exitSignalsCount' <- readIORef exitSignalsCount - modifyIORef exitSignalsCount ((+) 1) - allJobs' <- readIORef allJobs - allJobsCompleted <- allJobs' - |> mapM Pool.poll - >>= pure . filter isNothing - >>= pure . null - if allJobsCompleted - then Concurrent.throwTo threadId Exit.ExitSuccess - else if exitSignalsCount' == 0 - then do - putStrLn "Waiting for jobs to complete. CTRL+C again to force exit" - forEach allJobs' Pool.wait - Concurrent.throwTo threadId Exit.ExitSuccess - else if exitSignalsCount' == 1 then do - putStrLn "Canceling all running jobs. CTRL+C again to force exit" - forEach allJobs' Pool.cancel - - Concurrent.throwTo threadId Exit.ExitSuccess - else Concurrent.throwTo threadId Exit.ExitSuccess + -- Wait for all runners to complete + forEach processes \JobWorkerProcess { runners } -> do + forEach runners Async.wait + + Concurrent.throwTo threadId Exit.ExitSuccess + +devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO () +devServerMainLoop frameworkConfig pgListener jobWorkers = do + workerId <- UUID.nextRandom + let ?context = frameworkConfig + let logger = frameworkConfig |> get #logger + + Log.info ("Starting worker " <> tshow workerId) + + let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener } + + processes <- jobWorkers + |> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs) + + (forever (Concurrent.threadDelay maxBound)) `Exception.finally` do + forEach processes \JobWorkerProcess { poller, subscription, runners, action } -> do + Concurrent.putMVar action Stop + Async.cancel poller + forEach runners Async.cancel +-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent +installSignalHandlers :: IO (IO ()) +installSignalHandlers = do + exitSignal <- Concurrent.newEmptyMVar + + let catchHandler = Concurrent.putMVar exitSignal () Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing - main - - pure () - + pure (Concurrent.takeMVar exitSignal) stopExitHandler JobWorkerArgs { .. } main = main @@ -128,44 +159,41 @@ jobWorkerFetchAndRunLoop :: forall job. , CanUpdate job , Show job , Table job - ) => JobWorkerArgs -> IO (Async.Async ()) + ) => JobWorkerArgs -> IO JobWorkerProcess jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do let ?context = frameworkConfig let ?modelContext = modelContext - -- TOOD: now that we have maxConcurrency a better approach would be to - -- put all jobs in a MVar and then start maxConcurrency asyncs taking jobs from there - async do - Pool.withTaskGroup (maxConcurrency @job) \taskGroup -> do - -- This loop schedules all jobs that are in the queue. - -- It will be initally be called when first starting up this job worker - -- and after that it will be called when something has been inserted into the queue (or changed to retry) - let startLoop = do - putStrLn "STARTING ASYNC JOB" - asyncJob <- Pool.async taskGroup do - Exception.mask $ \restore -> do - maybeJob <- Queue.fetchNextJob @job workerId - case maybeJob of - Just job -> do - putStrLn ("Starting job: " <> tshow job) - let ?job = job - let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job) - resultOrException <- Exception.try (Timeout.timeout timeout (restore (perform job))) - case resultOrException of - Left exception -> Queue.jobDidFail job exception - Right Nothing -> Queue.jobDidTimeout job - Right (Just _) -> Queue.jobDidSucceed job - - startLoop - Nothing -> pure () - Pool.link asyncJob - modifyIORef allJobs (asyncJob:) - - -- Start all jobs in the queue - startLoop - - -- Start a job when a new job is added to the table or when it's set to retry - watcher <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) startLoop - - -- Keep the task group alive until the outer async is killed - forever (Concurrent.threadDelay maxBound) + action <- Concurrent.newMVar JobAvailable + runners <- forM [0..(maxConcurrency @job)] \index -> async do + let loop = do + receivedAction <- Concurrent.takeMVar action + + case receivedAction of + JobAvailable -> do + maybeJob <- Queue.fetchNextJob @job workerId + case maybeJob of + Just job -> do + Log.info ("Starting job: " <> tshow job) + + let ?job = job + let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job) + resultOrException <- Exception.try (Timeout.timeout timeout (perform job)) + case resultOrException of + Left exception -> Queue.jobDidFail job exception + Right Nothing -> Queue.jobDidTimeout job + Right (Just _) -> Queue.jobDidSucceed job + + loop + Nothing -> loop + Stop -> do + -- Put the stop signal back in to stop the other runners as well + Concurrent.putMVar action Stop + pure () + + loop + + (subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) action + + + pure JobWorkerProcess { runners, subscription, poller, action } \ No newline at end of file diff --git a/IHP/Job/Types.hs b/IHP/Job/Types.hs index 0cd2f2ce5..05e29855a 100644 --- a/IHP/Job/Types.hs +++ b/IHP/Job/Types.hs @@ -5,6 +5,8 @@ module IHP.Job.Types , JobWorker (..) , JobStatus (..) , Worker (..) +, JobWorkerProcess (..) +, JobWorkerProcessMessage (..) ) where @@ -13,6 +15,7 @@ import IHP.FrameworkConfig import qualified Control.Concurrent.Async as Async import qualified Control.Concurrent.Async.Pool as Pool import qualified IHP.PGListener as PGListener +import qualified Control.Concurrent as Concurrent class Job job where perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO () @@ -41,14 +44,13 @@ class Worker application where workers :: application -> [JobWorker] data JobWorkerArgs = JobWorkerArgs - { allJobs :: IORef [Pool.Async ()] - , workerId :: UUID + { workerId :: UUID , modelContext :: ModelContext , frameworkConfig :: FrameworkConfig , pgListener :: PGListener.PGListener } -newtype JobWorker = JobWorker (JobWorkerArgs -> IO (Async.Async ())) +newtype JobWorker = JobWorker (JobWorkerArgs -> IO JobWorkerProcess) -- | Mapping for @JOB_STATUS@. The DDL statement for this can be found in IHPSchema.sql: -- @@ -61,3 +63,15 @@ data JobStatus | JobStatusSucceeded | JobStatusRetry deriving (Eq, Show, Read, Enum) + +data JobWorkerProcess + = JobWorkerProcess + { runners :: [Async ()] + , subscription :: PGListener.Subscription + , poller :: Async () + , action :: Concurrent.MVar JobWorkerProcessMessage + } + +data JobWorkerProcessMessage + = JobAvailable + | Stop \ No newline at end of file diff --git a/IHP/Server.hs b/IHP/Server.hs index e0a2b3d7b..db73c1541 100644 --- a/IHP/Server.hs +++ b/IHP/Server.hs @@ -37,45 +37,42 @@ import qualified System.Directory as Directory run :: (FrontController RootApplication, Job.Worker RootApplication) => ConfigBuilder -> IO () run configBuilder = do - frameworkConfig <- buildFrameworkConfig configBuilder - - sessionVault <- Vault.newKey - modelContext <- initModelContext frameworkConfig - pgListener <- PGListener.init modelContext - autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener) - - let ?modelContext = modelContext - let ?applicationContext = ApplicationContext { modelContext = ?modelContext, session = sessionVault, autoRefreshServer, frameworkConfig, pgListener } - - sessionMiddleware <- initSessionMiddleware sessionVault frameworkConfig - staticMiddleware <- initStaticMiddleware frameworkConfig - let corsMiddleware = initCorsMiddleware frameworkConfig - let requestLoggerMiddleware = get #requestLoggerMiddleware frameworkConfig - - let run = withBackgroundWorkers frameworkConfig $ - runServer frameworkConfig $ - staticMiddleware $ - corsMiddleware $ - sessionMiddleware $ - requestLoggerMiddleware $ - methodOverridePost $ - application - - run `finally` do - frameworkConfig |> get #logger |> get #cleanup - PGListener.stop pgListener + let withFrameworkConfig = Exception.bracket (buildFrameworkConfig configBuilder) (\frameworkConfig -> frameworkConfig |> get #logger |> get #cleanup) + + withFrameworkConfig \frameworkConfig -> do + modelContext <- initModelContext frameworkConfig + let withPGListener = Exception.bracket (PGListener.init modelContext) PGListener.stop + + withPGListener \pgListener -> do + sessionVault <- Vault.newKey + + autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener) + + let ?modelContext = modelContext + let ?applicationContext = ApplicationContext { modelContext = ?modelContext, session = sessionVault, autoRefreshServer, frameworkConfig, pgListener } + + sessionMiddleware <- initSessionMiddleware sessionVault frameworkConfig + staticMiddleware <- initStaticMiddleware frameworkConfig + let corsMiddleware = initCorsMiddleware frameworkConfig + let requestLoggerMiddleware = get #requestLoggerMiddleware frameworkConfig + + withBackgroundWorkers pgListener frameworkConfig $ + runServer frameworkConfig $ + staticMiddleware $ + corsMiddleware $ + sessionMiddleware $ + requestLoggerMiddleware $ + methodOverridePost $ + application {-# INLINABLE run #-} -withBackgroundWorkers :: (Job.Worker RootApplication, ?modelContext :: ModelContext) => FrameworkConfig -> IO a -> IO a -withBackgroundWorkers frameworkConfig app = do +withBackgroundWorkers :: (Job.Worker RootApplication, ?modelContext :: ModelContext) => PGListener.PGListener -> FrameworkConfig -> IO a -> IO a +withBackgroundWorkers pgListener frameworkConfig app = do let jobWorkers = Job.workers RootApplication let isDevelopment = get #environment frameworkConfig == Env.Development if isDevelopment && not (isEmpty jobWorkers) - then do - workerAsync <- async (let ?context = frameworkConfig in Job.runJobWorkersKillOnExit jobWorkers) - Async.link workerAsync - app + then Exception.bracket (async (Job.devServerMainLoop frameworkConfig pgListener jobWorkers)) (Async.cancel) (const app) else app -- | Returns a middleware that returns files stored in the app's @static/@ directory and IHP's own @static/@ directory