diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 127e30a..d8b72c8 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted import Control.Concurrent.STM hiding (atomically) import Control.Concurrent.STM qualified as STM import Control.Concurrent.Thread.Lifted qualified as T -import Control.Exception (AsyncException (ThreadKilled)) +import Control.Exception (AsyncException (ThreadKilled), evaluate) import Control.Exception.Safe qualified as ES import Control.Monad import Control.Monad.Base @@ -22,6 +22,7 @@ import Data.Foldable qualified as F import Data.Function import Data.Int import Data.Map.Strict qualified as M +import Data.Maybe (catMaybes) import Data.Monoid.Utils import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer @@ -301,6 +302,7 @@ spawnDispatcher , MonadTime m , Show idx , ToSQL idx + , FromSQL idx ) => ConsumerConfig m idx job -> ConnectionSourceM m @@ -327,7 +329,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs loop :: Int -> m Bool loop limit = do - (batch, batchSize) <- reserveJobs limit + batch <- reserveJobs limit + let batchSize = length batch when (batchSize > 0) $ do logInfo "Processing batch" $ object @@ -357,36 +360,59 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: Int -> m [job] reserveJobs limit = runDBT cs ts $ do now <- currentTime - n <- - runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ - smconcat - [ "UPDATE" <+> raw ccJobsTable <+> "SET" - , " reserved_by =" cid - , ", attempts = CASE" - , " WHEN finished_at IS NULL THEN attempts + 1" - , " ELSE 1" - , " END" - , "WHERE id IN (" <> reservedJobs now <> ")" - , "RETURNING" <+> mintercalate ", " ccJobSelectors - ] - -- Decode lazily as we want the transaction to be as short as possible. - (,n) . F.toList . fmap ccJobFetcher <$> queryResult - where - reservedJobs :: UTCTime -> SQL - reservedJobs now = - smconcat - [ "SELECT id FROM" <+> raw ccJobsTable - , "WHERE" - , " reserved_by IS NULL" - , " AND run_at IS NOT NULL" - , " AND run_at <= " now - , " ORDER BY run_at" - , "LIMIT" limit - , "FOR UPDATE SKIP LOCKED" - ] + runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $ + smconcat + [ "SELECT id FROM" <+> raw ccJobsTable + , "WHERE" + , " reserved_by IS NULL" + , " AND run_at IS NOT NULL" + , " AND run_at <= " now + , " ORDER BY run_at" + , "LIMIT" limit + , "FOR UPDATE SKIP LOCKED" + ] + jobIds :: [idx] <- fetchMany runIdentity + if null jobIds + then pure [] + else + handle + ( \(SomeException e) -> do + logAttention "Failure to fetch the jobs, will reenqueue for the next day" $ object ["error" .= show e, "job_ids" .= show jobIds] + rollback + lift $ fetchFailureHandler jobIds + pure [] + ) + ( do + runPreparedSQL_ (preparedSqlName "setReservation" ccJobsTable) $ + smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by =" cid + , ", attempts = CASE" + , " WHEN finished_at IS NULL THEN attempts + 1" + , " ELSE 1" + , " END" + , "WHERE id = ANY(" Array1 jobIds <+> ")" + , "RETURNING id, " <+> mintercalate ", " ccJobSelectors + ] + qr <- queryResult + results <- forM (F.toList qr) $ \(rawJobId :*: other) -> do + let jobId = runIdentity rawJobId + (Just <$> liftBase (evaluate $ ccJobFetcher other)) `catch` \(SomeException e) -> do + logAttention "Failure to fetch job, will reenqueue for the next day" $ object ["error" .= show e, "job_id" .= show jobId] + rollback + lift $ fetchFailureHandler [jobId] + pure Nothing + pure $ catMaybes results + ) + + fetchFailureHandler :: [idx] -> m () + fetchFailureHandler jobIds = do + let toUpdate :: [(idx, Result)] + toUpdate = (,Failed . RerunAfter . idays $ 1) <$> jobIds + updateJobs toUpdate -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index fd2b61d..471f863 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -103,6 +103,55 @@ test = do rowcount1 :: Int64 <- fetchOne runIdentity liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 + + -- Checking the failing mechanism for a single job. + putJob 1 >> commit + do + -- Move time forward again, since we enqueued new jobs. + modifyTestTime $ addUTCTime (2 * 60 * 60) + finalize + ( localDomain "process" $ + runConsumerWithIdleSignal consumerFailingSingleJobConfig connSource idleSignal + ) + $ do + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) + + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" + rowcount2 :: Int64 <- fetchOne runIdentity + runSQL_ "SELECT run_at from consumers_test_jobs" + newRunTime :: UTCTime <- fetchOne runIdentity + liftIO $ do + T.assertEqual "The failed job should still be in the table" 1 rowcount2 + areInSixHours [newRunTime] + -- Clean up. + runSQL_ "DELETE FROM consumers_test_jobs;" + + -- Checking the failing mechanism for multiple jobs + let nbOfJobsToFail = 5 + replicateM_ nbOfJobsToFail (putJob 1) >> commit + do + -- Move time forward again, since we enqueued new jobs. + modifyTestTime $ addUTCTime (2 * 60 * 60) + -- Jobs get processed one at a time, so we need to run this as many times as there are jobs + replicateM_ nbOfJobsToFail $ do + finalize + ( localDomain "process" $ + runConsumerWithIdleSignal consumerFailingAllJobsConfig connSource idleSignal + ) + $ do + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) + + runSQL_ "SELECT run_at from consumers_test_jobs" + newRunTimes :: [UTCTime] <- fetchMany runIdentity + liftIO $ do + T.assertEqual "All jobs should fail if the query to fetch them is wrong" 5 (length newRunTimes) + areInSixHours newRunTimes + -- Clean up. + runSQL_ "DELETE FROM consumers_test_jobs" + + -- All done. dropTables where waitUntilTrue tmvar = liftIO . atomically $ do @@ -114,6 +163,13 @@ test = do prog <- getProgName putStrLn $ "Usage: " <> prog <> " " + areInSixHours toCheck = do + let inXHours n = addUTCTime (60 * 60 * n) <$> currentTime + in5Hours <- inXHours 5 + in7Hours <- inXHours 7 + liftIO . T.assertBool "The failed jobs should be planned to run more than 5 hours later" $ all (in5Hours <) toCheck + liftIO . T.assertBool "The failed jobs should be planned to run less than 7 hours later" $ all (in7Hours >) toCheck + definitions = emptyDbDefinitions {dbTables = [consumersTable, jobsTable]} -- NB: order of migrations is important. migrations = @@ -156,6 +212,15 @@ test = do , ccJobLogData = \(i, _) -> ["job_id" .= i] } + simulatingFailure :: (Int64, Int32) -> (Int64, Int32) + simulatingFailure _ = error "Simulating row fetch error" + + consumerFailingSingleJobConfig = + consumerConfig {ccJobFetcher = simulatingFailure} + + consumerFailingAllJobsConfig = + consumerConfig {ccJobSelectors = ["id", "countdown::bigint"]} + putJob :: Int32 -> TestEnv () putJob countdown = localDomain "put" $ do now <- currentTime