Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 56 additions & 30 deletions consumers/src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.Thread.Lifted qualified as T
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception (AsyncException (ThreadKilled), evaluate)
import Control.Exception.Safe qualified as ES
import Control.Monad
import Control.Monad.Base
Expand All @@ -22,6 +22,7 @@ import Data.Foldable qualified as F
import Data.Function
import Data.Int
import Data.Map.Strict qualified as M
import Data.Maybe (catMaybes)
import Data.Monoid.Utils
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
Expand Down Expand Up @@ -301,6 +302,7 @@ spawnDispatcher
, MonadTime m
, Show idx
, ToSQL idx
, FromSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
Expand All @@ -327,7 +329,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs

loop :: Int -> m Bool
loop limit = do
(batch, batchSize) <- reserveJobs limit
batch <- reserveJobs limit
let batchSize = length batch
when (batchSize > 0) $ do
logInfo "Processing batch" $
object
Expand Down Expand Up @@ -357,36 +360,59 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs

pure (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m [job]
reserveJobs limit = runDBT cs ts $ do
now <- currentTime
n <-
runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $
smconcat
[ "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by =" <?> cid
, ", attempts = CASE"
, " WHEN finished_at IS NULL THEN attempts + 1"
, " ELSE 1"
, " END"
, "WHERE id IN (" <> reservedJobs now <> ")"
, "RETURNING" <+> mintercalate ", " ccJobSelectors
]
-- Decode lazily as we want the transaction to be as short as possible.
(,n) . F.toList . fmap ccJobFetcher <$> queryResult
where
reservedJobs :: UTCTime -> SQL
reservedJobs now =
smconcat
[ "SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $
smconcat
[ "SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
jobIds :: [idx] <- fetchMany runIdentity
if null jobIds
then pure []
else
handle
( \(SomeException e) -> do
logAttention "Failure to fetch the jobs, will reenqueue for the next day" $ object ["error" .= show e, "job_ids" .= show jobIds]
rollback
lift $ fetchFailureHandler jobIds
pure []
)
( do
runPreparedSQL_ (preparedSqlName "setReservation" ccJobsTable) $
smconcat
[ "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by =" <?> cid
, ", attempts = CASE"
, " WHEN finished_at IS NULL THEN attempts + 1"
, " ELSE 1"
, " END"
, "WHERE id = ANY(" <?> Array1 jobIds <+> ")"
, "RETURNING id, " <+> mintercalate ", " ccJobSelectors
]
qr <- queryResult
results <- forM (F.toList qr) $ \(rawJobId :*: other) -> do
let jobId = runIdentity rawJobId
(Just <$> liftBase (evaluate $ ccJobFetcher other)) `catch` \(SomeException e) -> do
logAttention "Failure to fetch job, will reenqueue for the next day" $ object ["error" .= show e, "job_id" .= show jobId]
rollback
lift $ fetchFailureHandler [jobId]
pure Nothing
pure $ catMaybes results
)

fetchFailureHandler :: [idx] -> m ()
fetchFailureHandler jobIds = do
let toUpdate :: [(idx, Result)]
toUpdate = (,Failed . RerunAfter . idays $ 1) <$> jobIds
updateJobs toUpdate

-- Spawn each job in a separate thread.
startJob :: job -> m (job, m (T.Result Result))
Expand Down
65 changes: 65 additions & 0 deletions consumers/test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,55 @@ test = do
rowcount1 :: Int64 <- fetchOne runIdentity
liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0
liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1

-- Checking the failing mechanism for a single job.
putJob 1 >> commit
do
-- Move time forward again, since we enqueued new jobs.
modifyTestTime $ addUTCTime (2 * 60 * 60)
finalize
( localDomain "process" $
runConsumerWithIdleSignal consumerFailingSingleJobConfig connSource idleSignal
)
$ do
waitUntilTrue idleSignal
currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show)

runSQL_ "SELECT COUNT(*) from consumers_test_jobs"
rowcount2 :: Int64 <- fetchOne runIdentity
runSQL_ "SELECT run_at from consumers_test_jobs"
newRunTime :: UTCTime <- fetchOne runIdentity
liftIO $ do
T.assertEqual "The failed job should still be in the table" 1 rowcount2
areInSixHours [newRunTime]
-- Clean up.
runSQL_ "DELETE FROM consumers_test_jobs;"

-- Checking the failing mechanism for multiple jobs
let nbOfJobsToFail = 5
replicateM_ nbOfJobsToFail (putJob 1) >> commit
do
-- Move time forward again, since we enqueued new jobs.
modifyTestTime $ addUTCTime (2 * 60 * 60)
-- Jobs get processed one at a time, so we need to run this as many times as there are jobs
replicateM_ nbOfJobsToFail $ do
finalize
( localDomain "process" $
runConsumerWithIdleSignal consumerFailingAllJobsConfig connSource idleSignal
)
$ do
waitUntilTrue idleSignal
currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show)

runSQL_ "SELECT run_at from consumers_test_jobs"
newRunTimes :: [UTCTime] <- fetchMany runIdentity
liftIO $ do
T.assertEqual "All jobs should fail if the query to fetch them is wrong" 5 (length newRunTimes)
areInSixHours newRunTimes
-- Clean up.
runSQL_ "DELETE FROM consumers_test_jobs"

-- All done.
dropTables
where
waitUntilTrue tmvar = liftIO . atomically $ do
Expand All @@ -114,6 +163,13 @@ test = do
prog <- getProgName
putStrLn $ "Usage: " <> prog <> " <connection info string>"

areInSixHours toCheck = do
let inXHours n = addUTCTime (60 * 60 * n) <$> currentTime
in5Hours <- inXHours 5
in7Hours <- inXHours 7
liftIO . T.assertBool "The failed jobs should be planned to run more than 5 hours later" $ all (in5Hours <) toCheck
liftIO . T.assertBool "The failed jobs should be planned to run less than 7 hours later" $ all (in7Hours >) toCheck

definitions = emptyDbDefinitions {dbTables = [consumersTable, jobsTable]}
-- NB: order of migrations is important.
migrations =
Expand Down Expand Up @@ -156,6 +212,15 @@ test = do
, ccJobLogData = \(i, _) -> ["job_id" .= i]
}

simulatingFailure :: (Int64, Int32) -> (Int64, Int32)
simulatingFailure _ = error "Simulating row fetch error"

consumerFailingSingleJobConfig =
consumerConfig {ccJobFetcher = simulatingFailure}

consumerFailingAllJobsConfig =
consumerConfig {ccJobSelectors = ["id", "countdown::bigint"]}

putJob :: Int32 -> TestEnv ()
putJob countdown = localDomain "put" $ do
now <- currentTime
Expand Down
Loading