From c3b6563a489b18d567dbb030896da3285caebe06 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:08 +0200 Subject: [PATCH 01/11] Add wrapper on msgs between listen and dispatch --- .../PostgreSQL/Consumers/Components.hs | 19 +++++++++---------- .../Database/PostgreSQL/Consumers/Utils.hs | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 316a177..4f4e33d 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -90,7 +90,7 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal logInfo_ "ccMaxRunningJobs < 1, not starting the consumer" pure $ pure () | otherwise = do - semaphore <- newMVar () + (triggerNotification, listenNotification) <- mkNotification runningJobsInfo <- liftBase $ newTVarIO M.empty runningJobs <- liftBase $ newTVarIO 0 @@ -102,7 +102,7 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal cid <- registerConsumer cc cs localData ["consumer_id" .= show cid] $ do - listener <- spawnListener cc cs semaphore + listener <- spawnListener cc cs triggerNotification monitor <- localDomain "monitor" $ spawnMonitor cc cs cid dispatcher <- localDomain "dispatcher" $ @@ -110,7 +110,7 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal cc cs cid - semaphore + listenNotification runningJobsInfo runningJobs mIdleSignal @@ -184,9 +184,9 @@ spawnListener :: (MonadBaseControl IO m, MonadMask m) => ConsumerConfig m idx job -> ConnectionSourceM m - -> MVar () + -> TriggerNotification m -> m ThreadId -spawnListener cc cs semaphore = +spawnListener cc cs outbox = forkP "listener" $ case ccNotificationChannel cc of Just chan -> @@ -204,8 +204,7 @@ spawnListener cc cs semaphore = liftBase . threadDelay $ ccNotificationTimeout cc signalDispatcher where - signalDispatcher = do - liftBase $ tryPutMVar semaphore () + signalDispatcher = triggerNotification outbox noTs = defaultTransactionSettings @@ -309,14 +308,14 @@ spawnDispatcher => ConsumerConfig m idx job -> ConnectionSourceM m -> ConsumerID - -> MVar () + -> ListenNotification m -> TVar (M.Map ThreadId idx) -> TVar Int -> Maybe (TMVar Bool) -> m ThreadId -spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs mIdleSignal = +spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mIdleSignal = forkP "dispatcher" . forever $ do - void $ takeMVar semaphore + listenNotification inbox someJobWasProcessed <- loop 1 if someJobWasProcessed then setIdle False diff --git a/consumers/src/Database/PostgreSQL/Consumers/Utils.hs b/consumers/src/Database/PostgreSQL/Consumers/Utils.hs index bd7a830..e2d5108 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Utils.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Utils.hs @@ -5,6 +5,9 @@ module Database.PostgreSQL.Consumers.Utils , forkP , gforkP , preparedSqlName + , TriggerNotification (triggerNotification) + , ListenNotification (listenNotification) + , mkNotification ) where import Control.Concurrent.Lifted @@ -86,3 +89,17 @@ forkImpl ffork tname m = E.mask $ \release -> do preparedSqlName :: T.Text -> RawSQL () -> QueryName preparedSqlName baseName tableName = QueryName . T.take 63 $ baseName <> "$" <> unRawSQL tableName + +---------------------------------------- + +newtype TriggerNotification m = TriggerNotification {triggerNotification :: m ()} + +newtype ListenNotification m = ListenNotification {listenNotification :: m ()} + +mkNotification :: MonadBaseControl IO m => m (TriggerNotification m, ListenNotification m) +mkNotification = do + notificationRef <- newEmptyMVar + pure + ( TriggerNotification $ tryPutMVar notificationRef () >> pure () + , ListenNotification $ takeMVar notificationRef + ) From f0799018dfa43d5d102933738d95ce84148a7682 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:27 +0200 Subject: [PATCH 02/11] Allow passing pg parameters via ENV in tests --- consumers/test/Test.hs | 47 ++++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index fd2b61d..f37af03 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -1,5 +1,6 @@ module Main where +import Control.Applicative ((<|>)) import Control.Concurrent.STM import Control.Exception import Control.Monad @@ -54,19 +55,43 @@ runTestEnv connSource logger = main :: IO () main = void . T.runTestTT $ T.TestCase test +getConnectionString :: IO T.Text +getConnectionString = do + connectionParamsString <- (<|>) <$> paramsFromGithub <*> paramsFromEnvironmentVariables + allArgs <- getArgs + case connectionParamsString of + Just params -> pure (stringFromParams params) + _ -> case allArgs of + connString : _args -> pure (T.pack connString) + [] -> printUsage *> exitFailure + where + printUsage = do + prog <- getProgName + putStrLn $ "Usage: " <> prog <> " " + + paramsFromGithub = + lookupEnv "GITHUB_ACTIONS" >>= \case + Just "true" -> pure $ Just ("postgres", "postgres", "postgres") + _ -> pure $ Nothing + paramsFromEnvironmentVariables = do + variables <- + sequence + [ lookupEnv "PGHOST" + , lookupEnv "PGUSER" + , lookupEnv "PGDATABASE" + ] + case variables of + [Just host, Just user, Just database] -> pure $ Just (host, user, database) + _ -> pure $ Nothing + stringFromParams (host, user, database) = + (T.pack ("host=" <> host <> " user=" <> user <> " dbname=" <> database)) + test :: IO () test = do - connString <- - getArgs >>= \case - connString : _args -> pure $ T.pack connString - [] -> - lookupEnv "GITHUB_ACTIONS" >>= \case - Just "true" -> pure "host=postgres user=postgres password=postgres" - _ -> printUsage >> exitFailure - + connectionParamsString <- getConnectionString let connSettings = defaultConnectionSettings - { csConnInfo = connString + { csConnInfo = connectionParamsString } ConnectionSource connSource = simpleSource connSettings @@ -110,10 +135,6 @@ test = do True -> pure () False -> retry - printUsage = do - prog <- getProgName - putStrLn $ "Usage: " <> prog <> " " - definitions = emptyDbDefinitions {dbTables = [consumersTable, jobsTable]} -- NB: order of migrations is important. migrations = From cbacdca690987de06e2bfdbf99e2c4141626e365 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:34 +0200 Subject: [PATCH 03/11] Ensure tables are cleaned up on test wrap-up --- consumers/consumers.cabal | 2 ++ consumers/test/Test.hs | 76 +++++++++++++++++++++------------------ 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/consumers/consumers.cabal b/consumers/consumers.cabal index 94a4a27..ac475ca 100644 --- a/consumers/consumers.cabal +++ b/consumers/consumers.cabal @@ -105,6 +105,8 @@ test-suite consumers-test monad-time, mtl, stm, + tasty, + tasty-hunit, text, time, transformers, diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index f37af03..bc20cda 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -2,7 +2,6 @@ module Main where import Control.Applicative ((<|>)) import Control.Concurrent.STM -import Control.Exception import Control.Monad import Control.Monad.Base import Control.Monad.Catch @@ -22,6 +21,8 @@ import Log.Backend.StandardOutput import System.Environment import System.Exit import Test.HUnit qualified as T +import Test.Tasty +import Test.Tasty.HUnit data TestEnvSt = TestEnvSt { teCurrentTime :: UTCTime @@ -53,7 +54,19 @@ runTestEnv connSource logger = . unTestEnv main :: IO () -main = void . T.runTestTT $ T.TestCase test +main = do + connectionParamsString <- getConnectionString + let connectionSettings = defaultConnectionSettings {csConnInfo = connectionParamsString} + defaultMain (allTests connectionSettings) + +allTests :: ConnectionSettings -> TestTree +allTests connectionSource = + testGroup + "consumers" + [ testCase "can consume queue" (testPipeline connectionSource) + ] + +-------------------- getConnectionString :: IO T.Text getConnectionString = do @@ -86,23 +99,30 @@ getConnectionString = do stringFromParams (host, user, database) = (T.pack ("host=" <> host <> " user=" <> user <> " dbname=" <> database)) -test :: IO () -test = do - connectionParamsString <- getConnectionString - let connSettings = - defaultConnectionSettings - { csConnInfo = connectionParamsString - } - ConnectionSource connSource = simpleSource connSettings - +testPipeline :: ConnectionSettings -> IO () +testPipeline connectionSettings = do + let ConnectionSource connSource = simpleSource connectionSettings withStdOutLogger $ \logger -> runTestEnv connSource logger $ do - createTables - idleSignal <- liftIO newEmptyTMVarIO - putJob 10 >> commit + bracket createTables (const dropTables) $ \_ -> do + idleSignal <- liftIO newEmptyTMVarIO + putJob 10 >> commit + + forM_ [1 .. 10 :: Int] $ \_ -> do + -- Move time forward 2hours, because jobs are scheduled 1 hour into future + modifyTestTime $ addUTCTime (2 * 60 * 60) + finalize + ( localDomain "process" $ + runConsumerWithIdleSignal consumerConfig connSource idleSignal + ) + $ do + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) - forM_ [1 .. 10 :: Int] $ \_ -> do - -- Move time forward 2hours, because jobs are scheduled 1 hour into future + -- Each job creates 2 new jobs, so there should be 1024 jobs in table. + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" + rowcount0 :: Int64 <- fetchOne runIdentity + -- Move time 2 hours forward modifyTestTime $ addUTCTime (2 * 60 * 60) finalize ( localDomain "process" $ @@ -110,25 +130,11 @@ test = do ) $ do waitUntilTrue idleSignal - currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) - - -- Each job creates 2 new jobs, so there should be 1024 jobs in table. - runSQL_ "SELECT COUNT(*) from consumers_test_jobs" - rowcount0 :: Int64 <- fetchOne runIdentity - -- Move time 2 hours forward - modifyTestTime $ addUTCTime (2 * 60 * 60) - finalize - ( localDomain "process" $ - runConsumerWithIdleSignal consumerConfig connSource idleSignal - ) - $ do - waitUntilTrue idleSignal - -- Jobs are designed to double only 10 times, so there should be no jobs left now. - runSQL_ "SELECT COUNT(*) from consumers_test_jobs" - rowcount1 :: Int64 <- fetchOne runIdentity - liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 - liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 - dropTables + -- Jobs are designed to double only 10 times, so there should be no jobs left now. + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" + rowcount1 :: Int64 <- fetchOne runIdentity + liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 + liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 where waitUntilTrue tmvar = liftIO . atomically $ do takeTMVar tmvar >>= \case From 522083ee770df280ae0422d74b0ac34e1f01c9cd Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:45 +0200 Subject: [PATCH 04/11] Split off test utilities in own module --- consumers/consumers.cabal | 1 + consumers/test/Test.hs | 308 +++++++++----------------------------- consumers/test/Util.hs | 211 ++++++++++++++++++++++++++ 3 files changed, 285 insertions(+), 235 deletions(-) create mode 100644 consumers/test/Util.hs diff --git a/consumers/consumers.cabal b/consumers/consumers.cabal index ac475ca..1332f60 100644 --- a/consumers/consumers.cabal +++ b/consumers/consumers.cabal @@ -116,3 +116,4 @@ test-suite consumers-test type: exitcode-stdio-1.0 main-is: Test.hs + other-modules: Util diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index bc20cda..2b15108 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -1,57 +1,25 @@ module Main where -import Control.Applicative ((<|>)) +import Control.Concurrent (threadDelay) import Control.Concurrent.STM import Control.Monad -import Control.Monad.Base import Control.Monad.Catch import Control.Monad.IO.Class -import Control.Monad.State.Strict +import Control.Monad.RWS import Control.Monad.Time -import Control.Monad.Trans.Control +import Data.IORef import Data.Int import Data.Text qualified as T import Data.Time import Database.PostgreSQL.Consumers import Database.PostgreSQL.PQTypes -import Database.PostgreSQL.PQTypes.Checks import Database.PostgreSQL.PQTypes.Model import Log import Log.Backend.StandardOutput -import System.Environment -import System.Exit import Test.HUnit qualified as T import Test.Tasty import Test.Tasty.HUnit - -data TestEnvSt = TestEnvSt - { teCurrentTime :: UTCTime - , teMonotonicTime :: Double - } - -type InnerTestEnv = StateT TestEnvSt (DBT (LogT IO)) - -newtype TestEnv a = TestEnv {unTestEnv :: InnerTestEnv a} - deriving (Applicative, Functor, Monad, MonadLog, MonadDB, MonadThrow, MonadCatch, MonadMask, MonadIO, MonadBase IO, MonadState TestEnvSt) - -instance MonadBaseControl IO TestEnv where - type StM TestEnv a = StM InnerTestEnv a - liftBaseWith f = TestEnv $ liftBaseWith (\run -> f $ run . unTestEnv) - restoreM = TestEnv . restoreM - -instance MonadTime TestEnv where - currentTime = gets teCurrentTime - monotonicTime = gets teMonotonicTime - -modifyTestTime :: MonadState TestEnvSt m => (UTCTime -> UTCTime) -> m () -modifyTestTime modtime = modify (\te -> te {teCurrentTime = modtime . teCurrentTime $ te}) - -runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestEnv a -> IO a -runTestEnv connSource logger = - runLogT "consumers-test" logger defaultLogLevel - . runDBT connSource defaultTransactionSettings - . (\m' -> fst <$> runStateT m' (TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) - . unTestEnv +import Util main :: IO () main = do @@ -63,136 +31,88 @@ allTests :: ConnectionSettings -> TestTree allTests connectionSource = testGroup "consumers" - [ testCase "can consume queue" (testPipeline connectionSource) + [ testCase "can grow the number of jobs ran concurrently" (testJobScheduleGrowth connectionSource) ] -------------------- -getConnectionString :: IO T.Text -getConnectionString = do - connectionParamsString <- (<|>) <$> paramsFromGithub <*> paramsFromEnvironmentVariables - allArgs <- getArgs - case connectionParamsString of - Just params -> pure (stringFromParams params) - _ -> case allArgs of - connString : _args -> pure (T.pack connString) - [] -> printUsage *> exitFailure - where - printUsage = do - prog <- getProgName - putStrLn $ "Usage: " <> prog <> " " - - paramsFromGithub = - lookupEnv "GITHUB_ACTIONS" >>= \case - Just "true" -> pure $ Just ("postgres", "postgres", "postgres") - _ -> pure $ Nothing - paramsFromEnvironmentVariables = do - variables <- - sequence - [ lookupEnv "PGHOST" - , lookupEnv "PGUSER" - , lookupEnv "PGDATABASE" - ] - case variables of - [Just host, Just user, Just database] -> pure $ Just (host, user, database) - _ -> pure $ Nothing - stringFromParams (host, user, database) = - (T.pack ("host=" <> host <> " user=" <> user <> " dbname=" <> database)) - -testPipeline :: ConnectionSettings -> IO () -testPipeline connectionSettings = do +testJobScheduleGrowth :: ConnectionSettings -> IO () +testJobScheduleGrowth connectionSettings = do let ConnectionSource connSource = simpleSource connectionSettings - withStdOutLogger $ \logger -> - runTestEnv connSource logger $ do - bracket createTables (const dropTables) $ \_ -> do - idleSignal <- liftIO newEmptyTMVarIO - putJob 10 >> commit - - forM_ [1 .. 10 :: Int] $ \_ -> do - -- Move time forward 2hours, because jobs are scheduled 1 hour into future - modifyTestTime $ addUTCTime (2 * 60 * 60) - finalize - ( localDomain "process" $ - runConsumerWithIdleSignal consumerConfig connSource idleSignal - ) - $ do - waitUntilTrue idleSignal - currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) - - -- Each job creates 2 new jobs, so there should be 1024 jobs in table. - runSQL_ "SELECT COUNT(*) from consumers_test_jobs" - rowcount0 :: Int64 <- fetchOne runIdentity - -- Move time 2 hours forward + withStdOutLogger $ \logger -> do + let additionalColumns = + [ tblColumn + { colName = "countdown" + , colType = IntegerT + , colNullable = False + } + ] + runTestEnv connSource logger (TestSetup "test_job_schedule_growth" additionalColumns) $ do + consumerConfig <- getConsumerConfig + TestEnvSt {..} <- get + idleSignal <- liftIO newEmptyTMVarIO + putJob 10 >> commit + + rowCountGrowth :: [Int64] <- forM [1 .. 10 :: Int] $ \_ -> do + -- Move time forward 2hours, because jobs are scheduled 1 hour into future modifyTestTime $ addUTCTime (2 * 60 * 60) finalize ( localDomain "process" $ runConsumerWithIdleSignal consumerConfig connSource idleSignal ) - $ do - waitUntilTrue idleSignal - -- Jobs are designed to double only 10 times, so there should be no jobs left now. - runSQL_ "SELECT COUNT(*) from consumers_test_jobs" - rowcount1 :: Int64 <- fetchOne runIdentity - liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 - liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 - where - waitUntilTrue tmvar = liftIO . atomically $ do - takeTMVar tmvar >>= \case - True -> pure () - False -> retry - - definitions = emptyDbDefinitions {dbTables = [consumersTable, jobsTable]} - -- NB: order of migrations is important. - migrations = - [ createTableMigration consumersTable - , createTableMigration jobsTable - ] - - createTables :: TestEnv () - createTables = do - migrateDatabase - defaultExtrasOptions - definitions - migrations - checkDatabase - defaultExtrasOptions - definitions + $ waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) - dropTables :: TestEnv () - dropTables = do - migrateDatabase - defaultExtrasOptions - emptyDbDefinitions - [ dropTableMigration jobsTable - , dropTableMigration consumersTable - ] - - consumerConfig = - ConsumerConfig - { ccJobsTable = "consumers_test_jobs" - , ccConsumersTable = "consumers_test_consumers" - , ccJobSelectors = ["id", "countdown"] - , ccJobFetcher = id - , ccJobIndex = \(i :: Int64, _ :: Int32) -> i - , ccNotificationChannel = Just "consumers_test_chan" - , -- select some small timeout - ccNotificationTimeout = 100 * 1000 -- 100 msec - , ccMaxRunningJobs = 20 - , ccProcessJob = processJob - , ccOnException = handleException - , ccJobLogData = \(i, _) -> ["job_id" .= i] - } + -- Each job creates 2 new jobs, so there should be 1024 jobs in table. + runSQL_ ("SELECT COUNT(*) from " <> raw teJobTableName) + fetchOne runIdentity + + -- Move time 2 hours forward + modifyTestTime $ addUTCTime (2 * 60 * 60) + finalize + ( localDomain "process" $ + runConsumerWithIdleSignal consumerConfig connSource idleSignal + ) + $ waitUntilTrue idleSignal + + -- Jobs are designed to double only 10 times, so there should be no jobs left now. + runSQL_ ("SELECT COUNT(*) from " <> raw teJobTableName) + rowcount1 :: Int64 <- fetchOne runIdentity + liftIO $ T.assertEqual "Number of jobs in table after 10 steps grows exponentially" [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] rowCountGrowth + liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 + where + getConsumerConfig :: TestEnv (ConsumerConfig TestEnv Int64 (Int64, Int32)) + getConsumerConfig = do + TestEnvSt {..} <- get + pure $ + ConsumerConfig + { ccJobsTable = teJobTableName + , ccConsumersTable = teConsumerTableName + , ccJobSelectors = ["id", "countdown"] + , ccJobFetcher = id + , ccJobIndex = \(i :: Int64, _ :: Int32) -> i + , ccNotificationChannel = Just teNotificationChannel + , -- select some small timeout + ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccMaxRunningJobs = 20 + , ccProcessJob = processJob + , ccOnException = handleException + , ccJobLogData = \(i, _) -> ["job_id" .= i] + , ccMutexColumn = Nothing + } putJob :: Int32 -> TestEnv () putJob countdown = localDomain "put" $ do + TestEnvSt {..} <- get now <- currentTime runSQL_ $ - "INSERT INTO consumers_test_jobs " + "INSERT INTO " + <> raw teJobTableName <> "(run_at, finished_at, reserved_by, attempts, countdown) " <> "VALUES (" now <> " + interval '1 hour', NULL, NULL, 0, " countdown <> ")" - notify "consumers_test_chan" "" + notify teNotificationChannel "" processJob :: (Int64, Int32) -> TestEnv Result processJob (_idx, countdown) = do @@ -202,93 +122,11 @@ testPipeline connectionSettings = do commit pure (Ok Remove) - handleException :: SomeException -> (Int64, Int32) -> TestEnv Action - handleException _ _ = pure . RerunAfter $ imicroseconds 500000 - -jobsTable :: Table -jobsTable = - tblTable - { tblName = "consumers_test_jobs" - , tblVersion = 1 - , tblColumns = - [ tblColumn - { colName = "id" - , colType = BigSerialT - , colNullable = False - } - , tblColumn - { colName = "run_at" - , colType = TimestampWithZoneT - , colNullable = True - } - , tblColumn - { colName = "finished_at" - , colType = TimestampWithZoneT - , colNullable = True - } - , tblColumn - { colName = "reserved_by" - , colType = BigIntT - , colNullable = True - } - , tblColumn - { colName = "attempts" - , colType = IntegerT - , colNullable = False - } - , -- The only non-obligatory field: - tblColumn - { colName = "countdown" - , colType = IntegerT - , colNullable = False - } - ] - , tblPrimaryKey = pkOnColumn "id" - , tblForeignKeys = - [ (fkOnColumn "reserved_by" "consumers_test_consumers" "id") - { fkOnDelete = ForeignKeySetNull - } - ] - } - -consumersTable :: Table -consumersTable = - tblTable - { tblName = "consumers_test_consumers" - , tblVersion = 1 - , tblColumns = - [ tblColumn - { colName = "id" - , colType = BigSerialT - , colNullable = False - } - , tblColumn - { colName = "name" - , colType = TextT - , colNullable = False - } - , tblColumn - { colName = "last_activity" - , colType = TimestampWithZoneT - , colNullable = False - } - ] - , tblPrimaryKey = pkOnColumn "id" - } - -createTableMigration :: MonadDB m => Table -> Migration m -createTableMigration tbl = - Migration - { mgrTableName = tblName tbl - , mgrFrom = 0 - , mgrAction = StandardMigration $ do - createTable True tbl - } +waitUntilTrue :: MonadIO m => TMVar Bool -> m () +waitUntilTrue tmvar = liftIO . atomically $ do + takeTMVar tmvar >>= \case + True -> pure () + False -> retry -dropTableMigration :: Table -> Migration m -dropTableMigration tbl = - Migration - { mgrTableName = tblName tbl - , mgrFrom = 1 - , mgrAction = DropTableMigration DropTableRestrict - } +handleException :: SomeException -> k -> TestEnv Action +handleException _ _ = pure . RerunAfter $ imicroseconds 500000 diff --git a/consumers/test/Util.hs b/consumers/test/Util.hs new file mode 100644 index 0000000..17dc37f --- /dev/null +++ b/consumers/test/Util.hs @@ -0,0 +1,211 @@ +{-# LANGUAGE BangPatterns #-} + +module Util where + +import Control.Applicative ((<|>)) +import Control.Monad.Base +import Control.Monad.Catch +import Control.Monad.IO.Class +import Control.Monad.State.Strict +import Control.Monad.Time +import Control.Monad.Trans.Control +import Data.Text qualified as T +import Data.Time +import Database.PostgreSQL.PQTypes +import Database.PostgreSQL.PQTypes.Checks +import Database.PostgreSQL.PQTypes.Model +import Log +import System.Environment +import System.Exit + +getConnectionString :: IO T.Text +getConnectionString = do + connectionParamsString <- (<|>) <$> paramsFromGithub <*> paramsFromEnvironmentVariables + allArgs <- getArgs + case connectionParamsString of + Just params -> pure (stringFromParams params) + _ -> case allArgs of + connString : _args -> pure (T.pack connString) + [] -> printUsage *> exitFailure + where + printUsage = do + prog <- getProgName + putStrLn $ "Usage: " <> prog <> " " + + paramsFromGithub = + lookupEnv "GITHUB_ACTIONS" >>= \case + Just "true" -> pure $ Just ("postgres", "postgres", "postgres") + _ -> pure $ Nothing + paramsFromEnvironmentVariables = do + variables <- + sequence + [ lookupEnv "PGHOST" + , lookupEnv "PGUSER" + , lookupEnv "PGDATABASE" + ] + case variables of + [Just host, Just user, Just database] -> pure $ Just (host, user, database) + _ -> pure $ Nothing + stringFromParams (host, user, database) = + (T.pack ("host=" <> host <> " user=" <> user <> " dbname=" <> database)) + +data TestEnvSt = TestEnvSt + { teCurrentTime :: UTCTime + , teMonotonicTime :: Double + , teJobTableName :: RawSQL () + , teConsumerTableName :: RawSQL () + , teNotificationChannel :: Channel + , teAdditionalCols :: [TableColumn] + } + +type InnerTestEnv = StateT TestEnvSt (DBT (LogT IO)) + +newtype TestEnv a = TestEnv {unTestEnv :: InnerTestEnv a} + deriving (Applicative, Functor, Monad, MonadLog, MonadDB, MonadThrow, MonadCatch, MonadMask, MonadIO, MonadBase IO, MonadState TestEnvSt) + +instance MonadBaseControl IO TestEnv where + type StM TestEnv a = StM InnerTestEnv a + liftBaseWith f = TestEnv $ liftBaseWith (\run -> f $ run . unTestEnv) + restoreM = TestEnv . restoreM + +instance MonadTime TestEnv where + currentTime = gets teCurrentTime + monotonicTime = gets teMonotonicTime + +data TestSetup = TestSetup + { tsTestSuffix :: RawSQL () + , tsAdditionalCols :: [TableColumn] + } + +modifyTestTime :: MonadState TestEnvSt m => (UTCTime -> UTCTime) -> m () +modifyTestTime modtime = modify (\te -> te {teCurrentTime = modtime . teCurrentTime $ te}) + +runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestSetup -> TestEnv a -> IO a +runTestEnv connSource logger TestSetup {..} test = do + runLogT "consumers-test" logger defaultLogLevel $ + runDBT connSource defaultTransactionSettings $ + (\m' -> fst <$> runStateT m' testEnvironment) $ + unTestEnv $ + bracket createTables (const dropTables) (const test) + where + jobTableName = "jobs_" <> tsTestSuffix + consumerTableName = "consumers_" <> tsTestSuffix + notificationChannelName = "notification_" <> tsTestSuffix + testEnvironment = + TestEnvSt + (UTCTime (ModifiedJulianDay 0) 0) + 0 + jobTableName + consumerTableName + (Channel notificationChannelName) + tsAdditionalCols + jobTable = mkJobsTable jobTableName consumerTableName tsAdditionalCols + consumerTable = mkConsumersTable consumerTableName + definitions = emptyDbDefinitions {dbTables = [consumerTable, jobTable]} + -- NB: order of migrations is important. + migrations = + [ createTableMigration consumerTable + , createTableMigration jobTable + ] + + createTables :: TestEnv () + createTables = do + migrateDatabase + defaultExtrasOptions + definitions + migrations + checkDatabase + defaultExtrasOptions + definitions + + dropTables :: TestEnv () + dropTables = do + migrateDatabase + defaultExtrasOptions + emptyDbDefinitions + [ dropTableMigration jobTable + , dropTableMigration consumerTable + ] + +mkJobsTable :: RawSQL () -> RawSQL () -> [TableColumn] -> Table +mkJobsTable tableName consumerTableName additionalCols = + tblTable + { tblName = tableName + , tblVersion = 1 + , tblColumns = + [ tblColumn + { colName = "id" + , colType = BigSerialT + , colNullable = False + } + , tblColumn + { colName = "run_at" + , colType = TimestampWithZoneT + , colNullable = True + } + , tblColumn + { colName = "finished_at" + , colType = TimestampWithZoneT + , colNullable = True + } + , tblColumn + { colName = "reserved_by" + , colType = BigIntT + , colNullable = True + } + , tblColumn + { colName = "attempts" + , colType = IntegerT + , colNullable = False + } + ] + ++ additionalCols + , tblPrimaryKey = pkOnColumn "id" + , tblForeignKeys = + [ (fkOnColumn "reserved_by" consumerTableName "id") + { fkOnDelete = ForeignKeySetNull + } + ] + } + +mkConsumersTable :: RawSQL () -> Table +mkConsumersTable tableName = + tblTable + { tblName = tableName + , tblVersion = 1 + , tblColumns = + [ tblColumn + { colName = "id" + , colType = BigSerialT + , colNullable = False + } + , tblColumn + { colName = "name" + , colType = TextT + , colNullable = False + } + , tblColumn + { colName = "last_activity" + , colType = TimestampWithZoneT + , colNullable = False + } + ] + , tblPrimaryKey = pkOnColumn "id" + } + +createTableMigration :: MonadDB m => Table -> Migration m +createTableMigration tbl = + Migration + { mgrTableName = tblName tbl + , mgrFrom = 0 + , mgrAction = StandardMigration $ do + createTable True tbl + } + +dropTableMigration :: Table -> Migration m +dropTableMigration tbl = + Migration + { mgrTableName = tblName tbl + , mgrFrom = 1 + , mgrAction = DropTableMigration DropTableRestrict + } From 662a08c963f2f9d0003debd0e628616aef7ddf1d Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:46 +0200 Subject: [PATCH 05/11] Add haddock to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 2fd2c86..e735e0b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ /dist-newstyle/ /cabal.project.local /cabal.project.freeze +/haddocks/ TAGS .ghc.environment.* .cabal-sandbox From 039fe1b3ccfa19d21398fcd00d3194acb7a1621e Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:47 +0200 Subject: [PATCH 06/11] Log how many items it was possible to batch --- consumers/src/Database/PostgreSQL/Consumers/Components.hs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 4f4e33d..cd5b50a 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -335,7 +335,9 @@ spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mId logInfo "Processing batch" $ object [ "batch_size" .= batchSize + , "limit" .= limit ] + -- Update runningJobs before forking so that we can adjust -- maxBatchSize appropriately later. We also need to mask asynchronous -- exceptions here as we rely on correct value of runningJobs to From 82000742272f7444d53fc6cf8a1862b49607e141 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:47 +0200 Subject: [PATCH 07/11] Extract more helpers for writing tests --- consumers/test/Test.hs | 51 +++++++++++++++++++++--------------------- consumers/test/Util.hs | 16 ++++++++----- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index 2b15108..c99d2b0 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -1,16 +1,13 @@ module Main where -import Control.Concurrent (threadDelay) import Control.Concurrent.STM import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Control.Monad.RWS import Control.Monad.Time -import Data.IORef import Data.Int import Data.Text qualified as T -import Data.Time import Database.PostgreSQL.Consumers import Database.PostgreSQL.PQTypes import Database.PostgreSQL.PQTypes.Model @@ -36,6 +33,8 @@ allTests connectionSource = -------------------- +-- | Test that when a batch is submitted, it is consumed completely and in an +-- accelerated fashion that grows the batch size exponentially. testJobScheduleGrowth :: ConnectionSettings -> IO () testJobScheduleGrowth connectionSettings = do let ConnectionSource connSource = simpleSource connectionSettings @@ -53,9 +52,9 @@ testJobScheduleGrowth connectionSettings = do idleSignal <- liftIO newEmptyTMVarIO putJob 10 >> commit - rowCountGrowth :: [Int64] <- forM [1 .. 10 :: Int] $ \_ -> do + rowCountGrowth :: [Int64] <- replicateM (10 :: Int) $ do -- Move time forward 2hours, because jobs are scheduled 1 hour into future - modifyTestTime $ addUTCTime (2 * 60 * 60) + shiftTestTimeHours 2 finalize ( localDomain "process" $ runConsumerWithIdleSignal consumerConfig connSource idleSignal @@ -68,7 +67,7 @@ testJobScheduleGrowth connectionSettings = do fetchOne runIdentity -- Move time 2 hours forward - modifyTestTime $ addUTCTime (2 * 60 * 60) + shiftTestTimeHours 2 finalize ( localDomain "process" $ runConsumerWithIdleSignal consumerConfig connSource idleSignal @@ -82,24 +81,7 @@ testJobScheduleGrowth connectionSettings = do liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 where getConsumerConfig :: TestEnv (ConsumerConfig TestEnv Int64 (Int64, Int32)) - getConsumerConfig = do - TestEnvSt {..} <- get - pure $ - ConsumerConfig - { ccJobsTable = teJobTableName - , ccConsumersTable = teConsumerTableName - , ccJobSelectors = ["id", "countdown"] - , ccJobFetcher = id - , ccJobIndex = \(i :: Int64, _ :: Int32) -> i - , ccNotificationChannel = Just teNotificationChannel - , -- select some small timeout - ccNotificationTimeout = 100 * 1000 -- 100 msec - , ccMaxRunningJobs = 20 - , ccProcessJob = processJob - , ccOnException = handleException - , ccJobLogData = \(i, _) -> ["job_id" .= i] - , ccMutexColumn = Nothing - } + getConsumerConfig = defaultConsumerConfig processJob ["id", "countdown"] fst putJob :: Int32 -> TestEnv () putJob countdown = localDomain "put" $ do @@ -128,5 +110,24 @@ waitUntilTrue tmvar = liftIO . atomically $ do True -> pure () False -> retry -handleException :: SomeException -> k -> TestEnv Action +handleException :: Applicative m => SomeException -> k -> m Action handleException _ _ = pure . RerunAfter $ imicroseconds 500000 + +defaultConsumerConfig :: (FromRow job, Applicative m) => (job -> m Result) -> [SQL] -> (job -> idx) -> TestEnv (ConsumerConfig m idx job) +defaultConsumerConfig processJob jobSelectors jobIndex = do + TestEnvSt {..} <- get + pure $ + ConsumerConfig + { ccJobsTable = teJobTableName + , ccConsumersTable = teConsumerTableName + , ccJobSelectors = jobSelectors + , ccJobFetcher = id + , ccJobIndex = jobIndex + , ccNotificationChannel = Just teNotificationChannel + , -- select some small timeout + ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccMaxRunningJobs = 20 + , ccProcessJob = processJob + , ccOnException = handleException + , ccJobLogData = const [] + } diff --git a/consumers/test/Util.hs b/consumers/test/Util.hs index 17dc37f..6383725 100644 --- a/consumers/test/Util.hs +++ b/consumers/test/Util.hs @@ -34,20 +34,21 @@ getConnectionString = do paramsFromGithub = lookupEnv "GITHUB_ACTIONS" >>= \case - Just "true" -> pure $ Just ("postgres", "postgres", "postgres") - _ -> pure $ Nothing + Just "true" -> pure $ Just ("postgres", "postgres", "postgres", "postgres") + _ -> pure Nothing paramsFromEnvironmentVariables = do variables <- sequence [ lookupEnv "PGHOST" , lookupEnv "PGUSER" , lookupEnv "PGDATABASE" + , lookupEnv "PGPASSWORD" ] case variables of - [Just host, Just user, Just database] -> pure $ Just (host, user, database) - _ -> pure $ Nothing - stringFromParams (host, user, database) = - (T.pack ("host=" <> host <> " user=" <> user <> " dbname=" <> database)) + [Just host, Just user, Just database, Just password] -> pure $ Just (host, user, database, password) + _ -> pure Nothing + stringFromParams (host, user, database, pass) = + T.pack ("host=" <> host <> " user=" <> user <> " dbname=" <> database <> " password=" <> pass) data TestEnvSt = TestEnvSt { teCurrentTime :: UTCTime @@ -80,6 +81,9 @@ data TestSetup = TestSetup modifyTestTime :: MonadState TestEnvSt m => (UTCTime -> UTCTime) -> m () modifyTestTime modtime = modify (\te -> te {teCurrentTime = modtime . teCurrentTime $ te}) +shiftTestTimeHours :: MonadState TestEnvSt m => NominalDiffTime -> m () +shiftTestTimeHours hr = modifyTestTime $ addUTCTime (hr * 60 * 60) + runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestSetup -> TestEnv a -> IO a runTestEnv connSource logger TestSetup {..} test = do runLogT "consumers-test" logger defaultLogLevel $ From c0e5879f13c9847fe22b1fcd1b181713e2cff116 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:48 +0200 Subject: [PATCH 08/11] Apply hlint hints --- .../src/Database/PostgreSQL/Consumers/Utils.hs | 3 ++- consumers/test/Util.hs | 17 ++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Utils.hs b/consumers/src/Database/PostgreSQL/Consumers/Utils.hs index e2d5108..f095094 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Utils.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Utils.hs @@ -17,6 +17,7 @@ import Control.Exception.Lifted qualified as E import Control.Monad.Base import Control.Monad.Catch import Control.Monad.Trans.Control +import Data.Functor (void) import Data.Maybe import Data.Text qualified as T import Database.PostgreSQL.PQTypes.Class @@ -100,6 +101,6 @@ mkNotification :: MonadBaseControl IO m => m (TriggerNotification m, ListenNotif mkNotification = do notificationRef <- newEmptyMVar pure - ( TriggerNotification $ tryPutMVar notificationRef () >> pure () + ( TriggerNotification . void $ tryPutMVar notificationRef () , ListenNotification $ takeMVar notificationRef ) diff --git a/consumers/test/Util.hs b/consumers/test/Util.hs index 6383725..2c7db77 100644 --- a/consumers/test/Util.hs +++ b/consumers/test/Util.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE BangPatterns #-} - module Util where import Control.Applicative ((<|>)) @@ -85,12 +83,10 @@ shiftTestTimeHours :: MonadState TestEnvSt m => NominalDiffTime -> m () shiftTestTimeHours hr = modifyTestTime $ addUTCTime (hr * 60 * 60) runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestSetup -> TestEnv a -> IO a -runTestEnv connSource logger TestSetup {..} test = do - runLogT "consumers-test" logger defaultLogLevel $ - runDBT connSource defaultTransactionSettings $ - (\m' -> fst <$> runStateT m' testEnvironment) $ - unTestEnv $ - bracket createTables (const dropTables) (const test) +runTestEnv connSource logger TestSetup {..} test = + (runLogT "consumers-test" logger defaultLogLevel . runDBT connSource defaultTransactionSettings) + . (`evalStateT` testEnvironment) + $ unTestEnv (bracket createTables (const dropTables) (const test)) where jobTableName = "jobs_" <> tsTestSuffix consumerTableName = "consumers_" <> tsTestSuffix @@ -123,7 +119,7 @@ runTestEnv connSource logger TestSetup {..} test = do definitions dropTables :: TestEnv () - dropTables = do + dropTables = migrateDatabase defaultExtrasOptions emptyDbDefinitions @@ -202,8 +198,7 @@ createTableMigration tbl = Migration { mgrTableName = tblName tbl , mgrFrom = 0 - , mgrAction = StandardMigration $ do - createTable True tbl + , mgrAction = StandardMigration $ createTable True tbl } dropTableMigration :: Table -> Migration m From 3e2592bde8e0c978ed7488f2f90b20155182a30c Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:48 +0200 Subject: [PATCH 09/11] Expose inlining info so users can specialize --- .../Database/PostgreSQL/Consumers/Components.hs | 16 ++++++++++++++-- .../src/Database/PostgreSQL/Consumers/Utils.hs | 6 ++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index cd5b50a..17358d5 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -50,6 +50,7 @@ runConsumer -- ^ The consumer. -> ConnectionSourceM m -> m (m ()) +{-# INLINEABLE runConsumer #-} runConsumer cc cs = runConsumerWithMaybeIdleSignal cc cs Nothing runConsumerWithIdleSignal @@ -67,6 +68,7 @@ runConsumerWithIdleSignal -> ConnectionSourceM m -> TMVar Bool -> m (m ()) +{-# INLINEABLE runConsumerWithIdleSignal #-} runConsumerWithIdleSignal cc cs idleSignal = runConsumerWithMaybeIdleSignal cc cs (Just idleSignal) -- | Run the consumer and also signal whenever the consumer is waiting for @@ -85,6 +87,7 @@ runConsumerWithMaybeIdleSignal -> ConnectionSourceM m -> Maybe (TMVar Bool) -> m (m ()) +{-# INLINEABLE runConsumerWithMaybeIdleSignal #-} runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal | ccMaxRunningJobs cc < 1 = do logInfo_ "ccMaxRunningJobs < 1, not starting the consumer" @@ -186,6 +189,7 @@ spawnListener -> ConnectionSourceM m -> TriggerNotification m -> m ThreadId +{-# INLINEABLE spawnListener #-} spawnListener cc cs outbox = forkP "listener" $ case ccNotificationChannel cc of @@ -227,6 +231,7 @@ spawnMonitor -> ConnectionSourceM m -> ConsumerID -> m ThreadId +{-# INLINEABLE spawnMonitor #-} spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do runDBT cs ts $ do now <- currentTime @@ -313,9 +318,13 @@ spawnDispatcher -> TVar Int -> Maybe (TMVar Bool) -> m ThreadId +{-# INLINEABLE spawnDispatcher #-} spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mIdleSignal = forkP "dispatcher" . forever $ do listenNotification inbox + -- When awoken, we always start slow, processing only a single job in a + -- batch. Each time we can fill a batch completely with jobs, we grow the maximum + -- batch size. someJobWasProcessed <- loop 1 if someJobWasProcessed then setIdle False @@ -350,9 +359,11 @@ spawnDispatcher ConsumerConfig {..} cs cid inbox runningJobsInfo runningJobs mId . forkP "batch processor" . (`finally` subtractJobs) . restore - $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + $ mapM startJob batch >>= mapM joinJob >>= updateJobs + -- Induce some backpressure. If the number of running jobs by all batch + -- processors exceed the global limit, we wait. If it does not, start a + -- new iteration with a double the limit when (batchSize == limit) $ do maxBatchSize <- atomically $ do jobs <- readTVar runningJobs @@ -434,6 +445,7 @@ updateJobsQuery -> [(idx, Result)] -> UTCTime -> SQL +{-# INLINEABLE updateJobsQuery #-} updateJobsQuery jobsTable results now = smconcat [ "WITH removed AS (" diff --git a/consumers/src/Database/PostgreSQL/Consumers/Utils.hs b/consumers/src/Database/PostgreSQL/Consumers/Utils.hs index f095094..d2712b5 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Utils.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Utils.hs @@ -26,6 +26,7 @@ import Database.PostgreSQL.PQTypes.SQL.Raw -- | Run an action 'm' that returns a finalizer and perform the returned -- finalizer after the action 'action' completes. finalize :: (MonadMask m, MonadBase IO m) => m (m ()) -> m a -> m a +{-# INLINEABLE finalize #-} finalize m action = do finalizer <- newEmptyMVar flip finally (tryTakeMVar finalizer >>= fromMaybe (pure ())) $ do @@ -53,6 +54,7 @@ instance Exception ThrownFrom -- | Stop execution of a thread. stopExecution :: MonadBase IO m => ThreadId -> m () +{-# INLINEABLE stopExecution #-} stopExecution = flip throwTo StopExecution ---------------------------------------- @@ -60,6 +62,7 @@ stopExecution = flip throwTo StopExecution -- | Modified version of 'fork' that propagates thrown exceptions to the parent -- thread. forkP :: MonadBaseControl IO m => String -> m () -> m ThreadId +{-# INLINEABLE forkP #-} forkP = forkImpl fork -- | Modified version of 'TG.fork' that propagates thrown exceptions to the @@ -70,6 +73,7 @@ gforkP -> String -> m () -> m (ThreadId, m (T.Result ())) +{-# INLINEABLE gforkP #-} gforkP = forkImpl . TG.fork ---------------------------------------- @@ -80,6 +84,7 @@ forkImpl -> String -> m () -> m a +{-# INLINEABLE forkImpl #-} forkImpl ffork tname m = E.mask $ \release -> do parent <- myThreadId ffork $ @@ -98,6 +103,7 @@ newtype TriggerNotification m = TriggerNotification {triggerNotification :: m () newtype ListenNotification m = ListenNotification {listenNotification :: m ()} mkNotification :: MonadBaseControl IO m => m (TriggerNotification m, ListenNotification m) +{-# INLINEABLE mkNotification #-} mkNotification = do notificationRef <- newEmptyMVar pure From 50074395e1a192147d448edc9cccdc82c6aa4704 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:48 +0200 Subject: [PATCH 10/11] Update changelog --- consumers/CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/consumers/CHANGELOG.md b/consumers/CHANGELOG.md index 36e7fd2..5aea51b 100644 --- a/consumers/CHANGELOG.md +++ b/consumers/CHANGELOG.md @@ -1,3 +1,12 @@ +# consumers-2.3.3.2 (XXX-XX-XX) +* Log batch size limits when processing. +* Split off testing utilities into separate module. +* Ensure tables are cleaned up on test-teardown. +* Allow passing in PG variables through environment variables. +* Expose inlining information so it is possible to specialize consumers at + call-sites. +* Bump Ubuntu image used in CI. + # consumers-2.3.3.1 (2025-04-03) * Do not prepare query that updates jobs in the monitor thread. From 9cb469bb44bd975a3b01d7f07cb64f8c92405e19 Mon Sep 17 00:00:00 2001 From: Curtis Chin Jen Sem Date: Mon, 1 Sep 2025 15:04:49 +0200 Subject: [PATCH 11/11] Bump Ubuntu image used in GitHub Actions --- .github/workflows/haskell-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 663bb73..cc6c1ef 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -23,7 +23,7 @@ on: jobs: linux: name: Haskell-CI - Linux - ${{ matrix.compiler }} - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 container: