From 63062dbf37b3738417dd433c40cbbcbbca507ad7 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 15 Dec 2025 11:04:24 +0100 Subject: [PATCH 1/4] Update github tests workflow --- .github/workflows/test.yml | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2c2a63f..cbe838a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,50 +5,55 @@ on: branches: - master pull_request: - schedule: - # monthly - - cron: "0 0 1 * *" + types: + - opened + - synchronize env: #bump to clear caches ACTION_CACHE_VERSION: 'v1' +concurrency: + group: ${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: test-clj: strategy: matrix: - java: ['11', '17', '21', '24'] - runs-on: ubuntu-latest + java: ['11', '21', '25'] + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + - uses: actions/setup-java@v5 with: distribution: 'zulu' java-version: ${{ matrix.java }} - uses: DeLaGuardo/setup-clojure@13.4 with: - cli: 1.12.0.1530 + cli: 1.12.3.1577 - run: clojure -M:dev -m promesa.tests.main test-cljs: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 + - uses: actions/setup-java@v5 with: distribution: 'zulu' - java-version: '24' + java-version: '25' - uses: actions/setup-node@v4 with: - node-version: '22.15.0' + node-version: '24.12.0' + cache: 'npm' - uses: DeLaGuardo/setup-clojure@13.4 with: - cli: 1.12.0.1530 + cli: 1.12.3.1577 - run: | corepack enable @@ -57,7 +62,7 @@ jobs: yarn run test test-bb: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 From fb20718ec45ba98293874af290a876a36a24d315 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 15 Dec 2025 11:52:44 +0100 Subject: [PATCH 2/4] Update deps.edn --- deps.edn | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/deps.edn b/deps.edn index e172ec8..0af5136 100644 --- a/deps.edn +++ b/deps.edn @@ -13,11 +13,16 @@ :extra-paths ["test" "dev"]} :repl - {:main-opts ["-m" "rebel-readline.main"]} + {:main-opts ["-m" "rebel-readline.main"] + :jvm-opts ["--sun-misc-unsafe-memory-access=allow" + "-XX:+UnlockExperimentalVMOptions" + "-XX:CompileCommand=blackhole,criterium.blackhole.Blackhole::consume"]} :shadow-cljs {:main-opts ["-m" "shadow.cljs.devtools.cli"] - :jvm-opts ["--sun-misc-unsafe-memory-access=allow"]} + :jvm-opts ["--sun-misc-unsafe-memory-access=allow" + "-XX:+UnlockExperimentalVMOptions" + "-XX:CompileCommand=blackhole,criterium.blackhole.Blackhole::consume"]} :codox {:extra-deps From 03d01732a2150bdd68f459e1c9e1bbe0cfdf983b Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 15 Dec 2025 11:52:55 +0100 Subject: [PATCH 3/4] Update package.json --- package.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index dcc1e9b..4e2ca85 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "promesa", - "version": "11.0", + "version": "12.0.0", "description": "-", "author": "-", "license": "SEE LICENSE IN ", @@ -10,9 +10,9 @@ }, "scripts": { "test:watch": "clojure -M:dev:shadow-cljs watch test", - "test:compile": "clojure -M:dev:shadow-cljs compile test --config-merge '{:autorun false}'", + "test:build": "clojure -M:dev:shadow-cljs compile test --config-merge '{:autorun false}'", "test:run": "node target/tests.js", - "test": "yarn run test:compile && yarn run test:run" + "test": "yarn run test:build && yarn run test:run" }, "devDependencies": { "source-map-support": "^0.5.21" From f152066dbca7b8ad7e5daae20fd1baead63b60b4 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 15 Dec 2025 11:56:31 +0100 Subject: [PATCH 4/4] Mark CSP not experimental --- CHANGELOG.md | 10 + src/promesa/core.cljc | 4 +- src/promesa/exec.cljc | 4 +- src/promesa/exec/bulkhead.clj | 10 +- src/promesa/exec/csp.cljc | 134 ++++++----- src/promesa/exec/csp/buffers.cljc | 50 ++--- src/promesa/exec/csp/channel.cljc | 210 +++++++++--------- src/promesa/exec/csp/mutable_list.cljc | 19 +- src/promesa/exec/semaphore.clj | 34 ++- src/promesa/protocols.cljc | 66 +++--- src/promesa/util.cljc | 18 +- test/promesa/tests/exec_csp_buffers_test.cljc | 66 +++--- 12 files changed, 324 insertions(+), 301 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf6bb25..b78020f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog # +## Version 12.0.0-RC2 (Unreleased) + +- Mark the `promesa.exec.csp` namespace no longer experimental and unify all + related protocols with the current naming style, removing the usage of `!` + suffix from protocol function names. This is tecnicaly a **breaking change** + but CSP was experimental and this kind of changes was expected. Still several + csp helpers are marked as experimental on docs and they continue have that + label. + + ## Version 12.0.0-RC1 BREAKING CHANGES: diff --git a/src/promesa/core.cljc b/src/promesa/core.cljc index 73f815d..c6c22a7 100644 --- a/src/promesa/core.cljc +++ b/src/promesa/core.cljc @@ -387,7 +387,7 @@ (pt/-fnly (pt/-promise p) (fn [v exception] - (pt/-lock! lock) + (pt/-lock lock) (try (if exception (when-not (:resolved? @state) @@ -409,7 +409,7 @@ #?(:clj (c/run! pt/-cancel! pending)) (resolve v)))) (finally - (pt/-unlock! lock))))))))))) + (pt/-unlock lock))))))))))) (defn wait-all* "Given an array of promises, return a promise that is fulfilled when diff --git a/src/promesa/exec.cljc b/src/promesa/exec.cljc index e2ca83a..be7bc50 100644 --- a/src/promesa/exec.cljc +++ b/src/promesa/exec.cljc @@ -1195,6 +1195,6 @@ {:no-doc true :deprecated true} ([o] - (pt/-close! o)) + (pt/-close o)) ([o reason] - (pt/-close! o reason))) + (pt/-close o reason))) diff --git a/src/promesa/exec/bulkhead.clj b/src/promesa/exec/bulkhead.clj index 7224b00..f6d5959 100644 --- a/src/promesa/exec/bulkhead.clj +++ b/src/promesa/exec/bulkhead.clj @@ -62,7 +62,7 @@ (try (.run ^Runnable f) (finally - (psm/release! semaphore :permits 1) + (psm/release semaphore :permits 1) (log "cmd:" "Task/run" "f:" (hash f) "task:" (hash this) "permits:" (.availablePermits ^Semaphore semaphore) @@ -121,10 +121,10 @@ (log "cmd:" "Bulkhead/run" "queue:" (.size queue) "permits:" (.availablePermits semaphore)) (loop [] (log "cmd:" "Bulkhead/run$loop1" "queue:" (.size queue) "permits:" (.availablePermits semaphore)) - (when-let [task (when (pt/-try-acquire! semaphore) + (when-let [task (when (pt/-try-acquire semaphore) (if-let [task (-poll queue)] task - (pt/-release! semaphore)))] + (pt/-release semaphore)))] (log "cmd:" "Bulkhead/run$loop2" "task:" (hash task) "available-permits:" (.availablePermits semaphore)) (.execute ^Executor executor ^Runnable task) (recur))))) @@ -170,11 +170,11 @@ (throw (ex-info hint props)))) (try - (if (psm/acquire! semaphore :permits 1 :timeout timeout) + (if (psm/acquire semaphore :permits 1 :timeout timeout) (try (.run ^Runnable f) (finally - (psm/release! semaphore))) + (psm/release semaphore))) (let [props {:type :bulkhead-error :code :capacity-limit-reached :timeout timeout}] diff --git a/src/promesa/exec/csp.cljc b/src/promesa/exec/csp.cljc index 83a5163..28c5055 100644 --- a/src/promesa/exec/csp.cljc +++ b/src/promesa/exec/csp.cljc @@ -21,9 +21,7 @@ This code is implemented in CLJS for make available the channel abstraction to the CLJS, but the main use case for this ns is targeted to the JVM, where you will be able to take advantage of - virtual threads and seamless blocking operations on channels. - - **EXPERIMENTAL API**" + virtual threads and seamless blocking operations on channels." (:refer-clojure :exclude [take merge]) (:require [promesa.core :as p] @@ -58,8 +56,8 @@ [bindings & body] `(go (loop ~bindings ~@body))) -(declare offer!) -(declare close!) +(declare offer) +(declare close) (declare chan) (defmacro go-chan @@ -71,8 +69,8 @@ (->> (p/thread-call *executor* f#) (p/fnly (fn [v# e#] (if e# - (close! c# e#) - (offer! c# v#))))) + (close c# e#) + (offer c# v#))))) c#)) (defmacro thread-chan @@ -84,8 +82,8 @@ (->> (p/thread-call :thread f#) (p/fnly (fn [v# e#] (if e# - (close! c# e#) - (offer! c# v#))))) + (close c# e#) + (offer c# v#))))) c#)) (defn chan @@ -136,21 +134,21 @@ (defn put! "A blocking version of `put`." ([port val] - (p/await! (put port val))) + (p/join (put port val))) ([port val timeout-duration] - (p/await! (put port val timeout-duration nil))) + (p/join (put port val timeout-duration nil))) ([port val timeout-duration timeout-value] - (p/await! (put port val timeout-duration timeout-value))))) + (p/join (put port val timeout-duration timeout-value))))) #?(:clj (defn >! "A convenience alias for `put!`." ([port val] - (p/await! (put port val))) + (p/join (put port val))) ([port val timeout-duration] - (p/await! (put port val timeout-duration nil))) + (p/join (put port val timeout-duration nil))) ([port val timeout-duration timeout-value] - (p/await! (put port val timeout-duration timeout-value))))) + (p/join (put port val timeout-duration timeout-value))))) (defn take "Schedules a take operation on the channel. Returns a promise instance @@ -172,21 +170,21 @@ (defn take! "Blocking version of `take`." ([port] - (p/await! (take port))) + (p/join (take port))) ([port timeout-duration] - (p/await! (take port timeout-duration nil))) + (p/join (take port timeout-duration nil))) ([port timeout-duration timeout-value] - (p/await! (take port timeout-duration timeout-value))))) + (p/join (take port timeout-duration timeout-value))))) #?(:clj (defn handler o)) + (pt/-put port val (channel/volatile->handler o)) (first @o))) (defn offer! @@ -342,7 +340,7 @@ the value if succeeded, `nil` otherwise." [port] (let [o (volatile! nil)] - (pt/-take! port (channel/volatile->handler o)) + (pt/-take port (channel/volatile->handler o)) (let [[v c] (deref o)] (if c (throw c) @@ -369,7 +367,7 @@ (go-loop [] (let [v (take! from)] (if (nil? v) - (if close? (pt/-close! to)) + (if close? (pt/-close to)) (if (put! to v) (recur))))) :cljs @@ -378,7 +376,7 @@ (p/mcat (fn [v] (if (nil? v) (do - (when close? (pt/-close! to)) + (when close? (pt/-close to)) (p/resolved nil)) (->> (put to v) (p/map (fn [res] @@ -400,7 +398,7 @@ (if (and coll (put! ch (first coll))) (recur (next coll)) (when close? - (close! ch)))) + (close ch)))) :cljs (->> (p/loop [items (seq coll)] @@ -412,7 +410,7 @@ (p/recur nil))))))) (p/fnly (fn [_ _] (when close? - (pt/-close! ch)))))))) + (pt/-close ch)))))))) (defn onto-chan! "Puts the contents of coll into the supplied channel. @@ -433,40 +431,40 @@ values in because multiplexer implements the IWriteChannel protocol. Optionally accepts `close?` argument, that determines if the channel will - be closed when `close!` is called on multiplexer o not." + be closed when `close` is called on multiplexer o not." ([ch] (mult* ch false)) ([ch close?] (let [state (atom {}) mx (reify pt/IChannelMultiplexer - (-tap! [_ ch close?] + (-tap [_ ch close?] (swap! state assoc ch close?)) - (-untap! [_ ch] + (-untap [_ ch] (swap! state dissoc ch)) pt/ICloseable - (-close! [_] - (when close? (pt/-close! ch)) + (-close [_] + (when close? (pt/-close ch)) (->> @state (filter (comp true? peek)) - (run! (comp pt/-close! key)))) + (run! (comp pt/-close key)))) pt/IWriteChannel - (-put! [_ val handler] - (pt/-put! ch val handler)))] + (-put [_ val handler] + (pt/-put ch val handler)))] #?(:clj (go-loop [] (let [v (take! ch)] (if (nil? v) - (pt/-close! mx) + (pt/-close mx) (do - (p/await! + (p/join (p/wait-all* (for [ch (keys @state)] (->> (put ch v) (p/fnly (fn [v _] (when (nil? v) - (pt/-untap! mx ch)))))))) + (pt/-untap mx ch)))))))) (recur))))) :cljs (p/loop [] @@ -474,13 +472,13 @@ (p/mcat (fn [v] (if (nil? v) (do - (pt/-close! mx) + (pt/-close mx) (p/resolved nil)) (->> (p/wait-all* (for [ch (-> @state keys vec)] (->> (put ch v) (p/fnly (fn [v _] (when (nil? v) - (pt/-untap! mx ch))))))) + (pt/-untap mx ch))))))) (p/fmap (fn [_] (p/recur)))))))))) @@ -510,10 +508,10 @@ (defn tap "Copies the multiplexer source onto the provided channel." ([mult ch] - (pt/-tap! mult ch true) + (pt/-tap mult ch true) ch) ([mult ch close?] - (pt/-tap! mult ch close?) + (pt/-tap mult ch close?) ch)) (defn tap! @@ -527,7 +525,7 @@ (defn untap "Disconnects a channel from the multiplexer." [mult ch] - (pt/-untap! mult ch) + (pt/-untap mult ch) ch) (defn untap! @@ -573,7 +571,7 @@ (recur))) (prn \"RES: END\")) - (p/await! (sp/onto-chan! inp [\"1\" \"2\" \"3\" \"4\"] true)) + (p/join (sp/onto-chan! inp [\"1\" \"2\" \"3\" \"4\"] true)) Internally, uses 2 vthreads for pipeline internals processing. @@ -596,12 +594,12 @@ (f val rch) (recur))) (catch Throwable cause - (close! jch) - (close! rch) + (close jch) + (close rch) (let [exh (or exh close-with-exception)] (exh out cause) (when (and close? (not (closed? out))) - (close! out))))))) + (close out))))))) (map (fn [f] (p/thread-call typ f))))] @@ -612,11 +610,11 @@ (if (>! jch [val res-ch]) (if (>! rch res-ch) (recur) - (close! jch)) - (close! rch))) + (close jch)) + (close rch))) (do - (close! rch) - (close! jch)))) + (close rch) + (close jch)))) (go-loop [] (if-let [rch' (! out val) (recur) (do - (close! jch) - (close! rch))))) + (close jch) + (close rch))))) (recur)) (when close? - (close! out)))) + (close out)))) (-> (into #{} xfm (range n)) (p/wait-all*))))) @@ -651,5 +649,5 @@ (do (put! out v) (recur cs)))) - (close! out))) + (close out))) out))) diff --git a/src/promesa/exec/csp/buffers.cljc b/src/promesa/exec/csp/buffers.cljc index 4743c33..56c0d19 100644 --- a/src/promesa/exec/csp/buffers.cljc +++ b/src/promesa/exec/csp/buffers.cljc @@ -23,20 +23,20 @@ pt/IBuffer (-full? [_] (>= (mlist/size buf) n)) - (-poll! [this] - (mlist/remove-last! buf)) - (-offer! [this o] + (-poll [this] + (mlist/remove-last buf)) + (-offer [this o] (if (>= (mlist/size buf) n) false (do - (mlist/add-first! buf o) + (mlist/add-first buf o) true))) (-size [_] (mlist/size buf)) pt/ICloseable (-closed? [this] nil) - (-close! [this] nil)))) + (-close [this] nil)))) (defn expanding "Fixed but with the ability to expand. @@ -54,17 +54,17 @@ pt/IBuffer (-full? [_] (>= (mlist/size buf) n)) - (-poll! [this] - (mlist/remove-last! buf)) - (-offer! [this o] - (mlist/add-first! buf o) + (-poll [this] + (mlist/remove-last buf)) + (-offer [this o] + (mlist/add-first buf o) true) (-size [_] (mlist/size buf)) pt/ICloseable (-closed? [this] nil) - (-close! [this] nil)))) + (-close [this] nil)))) (defn dropping [n] @@ -72,18 +72,18 @@ (reify pt/IBuffer (-full? [_] false) - (-poll! [this] - (mlist/remove-last! buf)) - (-offer! [this o] + (-poll [this] + (mlist/remove-last buf)) + (-offer [this o] (when-not (>= (mlist/size buf) n) - (mlist/add-first! buf o)) + (mlist/add-first buf o)) true) (-size [_] (mlist/size buf)) pt/ICloseable (-closed? [this] nil) - (-close! [this] nil)))) + (-close [this] nil)))) (defn sliding "A buffer that works as sliding window, if max capacity is reached, @@ -93,19 +93,19 @@ (reify pt/IBuffer (-full? [_] false) - (-poll! [this] - (mlist/remove-last! buf)) - (-offer! [this o] + (-poll [this] + (mlist/remove-last buf)) + (-offer [this o] (when (= (mlist/size buf) n) - (mlist/remove-last! buf)) - (mlist/add-first! buf o) + (mlist/remove-last buf)) + (mlist/add-first buf o) true) (-size [_] (mlist/size buf)) pt/ICloseable (-closed? [this] nil) - (-close! [this] nil)))) + (-close [this] nil)))) (def no-val #?(:clj (Object.) @@ -115,13 +115,13 @@ ^:unsynchronized-mutable ^Boolean closed] pt/IBuffer (-full? [_] false) - (-poll! [this] + (-poll [this] (when-not closed value)) - (-offer! [this o] + (-offer [this o] (when (identical? value no-val) - (set! value o) + (set value o) true)) (-size [_] @@ -132,7 +132,7 @@ pt/ICloseable (-closed? [this] closed) - (-close! [this] (compare-and-set! closed false true))) + (-close [this] (compare-and-set! closed false true))) (defn once "Creates a promise like buffer that holds a single value and only diff --git a/src/promesa/exec/csp/channel.cljc b/src/promesa/exec/csp/channel.cljc index 36baeaa..8872e3c 100644 --- a/src/promesa/exec/csp/channel.cljc +++ b/src/promesa/exec/csp/channel.cljc @@ -22,13 +22,13 @@ active? (atom true)] (reify pt/ILock - (-lock! [_] (pt/-lock! lock)) - (-unlock! [_] (pt/-unlock! lock)) + (-lock [_] (pt/-lock lock)) + (-unlock [_] (pt/-unlock lock)) pt/IHandler (-active? [_] @active?) (-blockable? [_] false) - (-commit! [_] + (-commit [_] (and (compare-and-set! active? true false) (fn ([v] @@ -44,43 +44,43 @@ active? (atom true)] (reify pt/ILock - (-lock! [_] (pt/-lock! lock)) - (-unlock! [_] (pt/-unlock! lock)) + (-lock [_] (pt/-lock lock)) + (-unlock [_] (pt/-unlock lock)) pt/IHandler (-active? [_] (and (p/pending? p) (deref active?))) (-blockable? [_] blockable?) - (-commit! [_] + (-commit [_] (and (compare-and-set! active? true false) (fn ([v] (when (p/pending? p) - (p/resolve! p v))) + (p/resolve p v))) ([v c] (when (p/pending? p) (if c - (p/reject! p c) - (p/resolve! p v))))))))))) + (p/reject p c) + (p/resolve p v))))))))))) -(defn commit! +(defn commit "A convenience helper that locks, checks and return commited handler callbale given an instance of handler." {:no-doc true} [handler] (try - (pt/-lock! handler) + (pt/-lock handler) (when (pt/-active? handler) - (pt/-commit! handler)) + (pt/-commit handler)) (finally - (pt/-unlock! handler)))) + (pt/-unlock handler)))) -(defn- commit-and-run! +(defn- commit-and-run "A convenience helper that commits handler and if success, executes the handler immediatelly with the provided value as first argument." [handler rval] - (when-let [handler-fn (commit! handler)] + (when-let [handler-fn (commit handler)] (handler-fn rval) true)) @@ -89,46 +89,46 @@ function searches an active and valid put candidate that will match the take operation." [puts handler] - (letfn [(validate! [[putter val]] + (letfn [(validate [[putter val]] (try - (pt/-lock! putter) + (pt/-lock putter) (when (pt/-active? putter) - [(pt/-commit! putter) val]) + [(pt/-commit putter) val]) (finally - (pt/-unlock! putter))))] + (pt/-unlock putter))))] (try - (pt/-lock! handler) + (pt/-lock handler) (when (pt/-active? handler) (loop [] - (when-let [match (mlist/remove-first! puts)] - (if-let [match (validate! match)] - (conj match (pt/-commit! handler)) + (when-let [match (mlist/remove-first puts)] + (if-let [match (validate match)] + (conj match (pt/-commit handler)) (recur))))) (finally - (pt/-unlock! handler))))) + (pt/-unlock handler))))) (defn- lookup-put-transfer "On unbuffered channels when put operation is requested, this function searches an active and valid take candidate that will match the put operation." [takes handler] - (letfn [(validate! [taker] + (letfn [(validate [taker] (try - (pt/-lock! taker) + (pt/-lock taker) (when (pt/-active? taker) - [(pt/-commit! taker)]) + [(pt/-commit taker)]) (finally - (pt/-unlock! taker))))] + (pt/-unlock taker))))] (try - (pt/-lock! handler) + (pt/-lock handler) (when (pt/-active? handler) (loop [] - (when-let [match (mlist/remove-first! takes)] - (if-let [match (validate! match)] - (conj match (pt/-commit! handler)) + (when-let [match (mlist/remove-first takes)] + (if-let [match (validate match)] + (conj match (pt/-commit handler)) (recur))))) (finally - (pt/-unlock! handler))))) + (pt/-unlock handler))))) (defn- process-pending-puts "This is the loop that processes the pending puts after a succesfull @@ -137,11 +137,11 @@ (loop [done? false] (if (or (pt/-full? buf) done?) done? - (if-let [[putter val] (mlist/remove-first! puts)] + (if-let [[putter val] (mlist/remove-first puts)] (do - (if-let [put-fn (commit! putter)] + (if-let [put-fn (commit putter)] (let [done? (reduced? (add-fn this buf val))] - (px/exec! executor (partial put-fn true)) + (px/exec executor (partial put-fn true)) (recur done?)) (recur done?))) @@ -151,9 +151,9 @@ "Process all pending put handlers and execute them ignoring the values" [executor puts] (loop [] - (when-let [[putter] (mlist/remove-first! puts)] - (when-let [put-fn (commit! putter)] - (px/exec! executor (partial put-fn true))) + (when-let [[putter] (mlist/remove-first puts)] + (when-let [put-fn (commit putter)] + (px/exec executor (partial put-fn true))) (recur)))) (defn- process-pending-takes @@ -162,59 +162,59 @@ [executor takes buf] (loop [] (when (pos? (pt/-size buf)) - (when-let [taker (mlist/remove-first! takes)] - (when-let [take-fn (commit! taker)] - (px/exec! executor (partial take-fn (pt/-poll! buf)))) + (when-let [taker (mlist/remove-first takes)] + (when-let [take-fn (commit taker)] + (px/exec executor (partial take-fn (pt/-poll buf)))) (recur))))) -(defn- process-take-handler! +(defn- process-take-handler "When buffer is full or no buffer, we need to do a common task: if task is blockable, enqueue it and if task is not blockable we just cancel it." [takes handler] (if (pt/-blockable? handler) - (mlist/add-last! takes handler) - (commit-and-run! handler nil)) + (mlist/add-last takes handler) + (commit-and-run handler nil)) nil) -(defn- process-put-handler! +(defn- process-put-handler "When buffer is full or no buffer, if task is blockable, enqueue it and if task is not blockable we just cancel it." [puts handler val] (if (pt/-blockable? handler) - (mlist/add-last! puts [handler val]) - (commit-and-run! handler false)) + (mlist/add-last puts [handler val]) + (commit-and-run handler false)) nil) (defn take {:no-doc true} ([port] (let [d (p/deferred)] - (pt/-take! port (promise->handler d)) + (pt/-take port (promise->handler d)) d)) ([port timeout-duration timeout-value] (let [d (p/deferred) h (promise->handler d) - t (px/schedule! timeout-duration - #(when-let [f (commit! h)] - (f timeout-value)))] - (pt/-take! port h) - (p/finally d (fn [_ _] (p/cancel! t)))))) + t (px/schedule timeout-duration + #(when-let [f (commit h)] + (f timeout-value)))] + (pt/-take port h) + (p/finally d (fn [_ _] (p/cancel t)))))) (defn put {:no-doc true} ([port val] (let [d (p/deferred)] - (pt/-put! port val (promise->handler d)) + (pt/-put port val (promise->handler d)) d)) ([port val timeout-duration timeout-value] (let [d (p/deferred) h (promise->handler d) - t (px/schedule! timeout-duration - #(when-let [f (commit! h)] - (f timeout-value)))] - (pt/-put! port val h) - (p/finally d (fn [_ _] (p/cancel! t)))))) + t (px/schedule timeout-duration + #(when-let [f (commit h)] + (f timeout-value)))] + (pt/-put port val h) + (p/finally d (fn [_ _] (p/cancel t)))))) #?(:clj (defn chan->seq @@ -230,10 +230,10 @@ buf closed lock executor add-fn mdata] pt/ILock - (-lock! [_] - (pt/-lock! lock)) - (-unlock! [_] - (pt/-unlock! lock)) + (-lock [_] + (pt/-lock lock)) + (-unlock [_] + (pt/-unlock lock)) #?@(:bb [] :cljs @@ -253,66 +253,66 @@ (seq [this] (chan->seq this))]) pt/IChannelInternal - (-cleanup! [_] + (-cleanup [_] (loop [result (mlist/create)] - (if-let [taker (mlist/remove-first! takes)] + (if-let [taker (mlist/remove-first takes)] (if (pt/-active? taker) - (recur (mlist/add-last! result taker)) + (recur (mlist/add-last result taker)) (recur result)) (set! takes result))) (loop [result (mlist/create)] - (if-let [[putter val :as item] (mlist/remove-first! puts)] + (if-let [[putter val :as item] (mlist/remove-first puts)] (if (pt/-active? putter) - (recur (mlist/add-last! result item)) + (recur (mlist/add-last result item)) (recur result)) (set! puts result)))) pt/IWriteChannel - (-put! [this val handler] + (-put [this val handler] (when (nil? val) (throw (ex-info "Can't put nil on channel" {}))) - (pt/-lock! this) + (pt/-lock this) (try - (pt/-cleanup! this) + (pt/-cleanup this) (if @closed - (let [put-fn (commit! handler)] + (let [put-fn (commit handler)] (put-fn false) nil) (if buf (if (pt/-full? buf) (do (when (pt/-active? handler) - (process-put-handler! puts handler val)) + (process-put-handler puts handler val)) nil) (do - (when (commit-and-run! handler true) + (when (commit-and-run handler true) (when (reduced? (add-fn this buf val)) - (pt/-close! this)) + (pt/-close this)) (process-pending-takes executor takes buf) nil))) (if-let [[take-fn put-fn] (lookup-put-transfer takes handler)] (do (put-fn true) - (px/exec! executor (partial take-fn val)) + (px/exec executor (partial take-fn val)) nil) (when (pt/-active? handler) - (process-put-handler! puts handler val))))) + (process-put-handler puts handler val))))) (finally - (pt/-unlock! this)))) + (pt/-unlock this)))) pt/IReadChannel - (-take! [this handler] + (-take [this handler] (try - (pt/-lock! this) - (pt/-cleanup! this) + (pt/-lock this) + (pt/-cleanup this) (if (and (not (nil? buf)) (pos? (pt/-size buf))) - (when-let [take-fn (commit! handler)] - (let [val (pt/-poll! buf)] + (when-let [take-fn (commit handler)] + (let [val (pt/-poll buf)] (take-fn val nil)) ;; Proces pending puts @@ -321,7 +321,7 @@ (do (when (pos? (mlist/size puts)) (abort executor puts)) - (pt/-close! this)) + (pt/-close this)) (when (and @closed (zero? (mlist/size puts))) (add-fn this buf))) @@ -331,27 +331,27 @@ (if-let [[put-fn val take-fn] (lookup-take-transfer puts handler)] (do (take-fn val) - (px/exec! executor (partial put-fn true)) + (px/exec executor (partial put-fn true)) nil) (if @closed - (when-let [take-fn (commit! handler)] - (if-let [val (some-> buf pt/-poll!)] + (when-let [take-fn (commit handler)] + (if-let [val (some-> buf pt/-poll)] (take-fn val nil) (take-fn nil error))) (when (pt/-active? handler) - (process-take-handler! takes handler))))) + (process-take-handler takes handler))))) (finally - (pt/-unlock! this)))) + (pt/-unlock this)))) pt/ICloseable (-closed? [this] @closed) - (-close! [this] (pt/-close! this nil)) - (-close! [this cause] - (pt/-lock! this) + (-close [this] (pt/-close this nil)) + (-close [this cause] + (pt/-lock this) (try (when (compare-and-set! closed false true) ;; assign a new cause, only if the `error` field is not set @@ -367,22 +367,22 @@ ;; pending puts (when (pos? (mlist/size takes)) (loop [] - (when-let [taker (mlist/remove-first! takes)] - (when-let [take-fn (commit! taker)] - (if-let [val (some-> buf pt/-poll!)] - (px/exec! executor #(take-fn val nil)) - (px/exec! executor #(take-fn nil error)))) + (when-let [taker (mlist/remove-first takes)] + (when-let [take-fn (commit taker)] + (if-let [val (some-> buf pt/-poll)] + (px/exec executor #(take-fn val nil)) + (px/exec executor #(take-fn nil error)))) (recur))))) (finally - (some-> buf pt/-close!) - (pt/-unlock! this))))) + (some-> buf pt/-close) + (pt/-unlock this))))) (defn- add-fn ([b] b) ([b itm] (assert (not (nil? itm))) - (pt/-offer! b itm) + (pt/-offer b itm) b)) (defn channel? @@ -415,14 +415,14 @@ (defn throw-uncaught {:no-doc true} [_ cause] - #?(:clj (px/throw-uncaught! cause) + #?(:clj (px/throw-uncaught cause) :cljs (js/console.error cause)) nil) (defn close-with-exception "A exception handler that closes the channel with error if an error." [ch cause] - (pt/-close! ch cause) + (pt/-close ch cause) nil) (defn chan @@ -436,13 +436,13 @@ (add-fn buf) (catch #?(:clj Throwable :cljs :default) t (when-let [v (exh ch t)] - (pt/-offer! buf v))))) + (pt/-offer buf v))))) ([ch buf val] (try (add-fn buf val) (catch #?(:clj Throwable :cljs :default) t (when-let [v (exh ch t)] - (pt/-offer! buf v)))))) + (pt/-offer buf v)))))) ] (Channel. (mlist/create) (mlist/create) diff --git a/src/promesa/exec/csp/mutable_list.cljc b/src/promesa/exec/csp/mutable_list.cljc index da2dc82..050c74e 100644 --- a/src/promesa/exec/csp/mutable_list.cljc +++ b/src/promesa/exec/csp/mutable_list.cljc @@ -22,8 +22,6 @@ ^:mutable next ^:mutable prev])) -;; FIXME: simplify (?) - #?(:cljs (deftype LinkedList [^:mutable head ^:mutable tail @@ -35,7 +33,7 @@ (do (set! (.-head this) n) (set! (.-tail this) n)) (do (set! (.-next n) (.-head this)) - (set! (.-prev (.-head this)) n) + (set! (.-prev ^Node (.-head this)) n) (set! (.-head this) n))) (set! (.-size this) (inc (.-size this))) this)) @@ -56,7 +54,7 @@ (let [val (.-value h)] (set! (.-head this) (.-next h)) (if (.-head this) - (set! (.-prev (.-head this)) nil) + (set! (.-prev ^Node (.-head this)) nil) (set! (.-tail this) nil)) (set! (.-size this) (dec (.-size this))) val))) @@ -64,7 +62,7 @@ (removeLast [this] (when-let [t (.-tail this)] (let [val (.-value t)] - (set! (.-tail this) (.-prev t)) + (set! (.-tail this) (.-prev ^Node t)) (if (.-tail this) (set! (.-next (.-tail this)) nil) (set! (.-head this) nil)) @@ -84,18 +82,17 @@ #?(:clj (LinkedList.) :cljs (LinkedList. nil nil 0))) -(defn add-first! +(defn add-first [o v] (.addFirst ^LinkedList o v) o) -(defn add-last! +(defn add-last [o v] - #?(:clj (.add ^LinkedList o v) - :cljs (.addFirst ^LinkedList o v)) + (.addLast ^LinkedList o v) o) -(defn remove-last! +(defn remove-last "Remove the last element from list and return it. If no elements, `nil` is returned." [o] @@ -104,7 +101,7 @@ (catch #?(:clj java.util.NoSuchElementException :cljs :default) _ nil))) -(defn remove-first! +(defn remove-first "Remove the first element from list and return it. If no elements, `nil` is returned." [o] diff --git a/src/promesa/exec/semaphore.clj b/src/promesa/exec/semaphore.clj index d90e00f..c822a9b 100644 --- a/src/promesa/exec/semaphore.clj +++ b/src/promesa/exec/semaphore.clj @@ -17,7 +17,7 @@ (extend-type Semaphore pt/ISemaphore - (-try-acquire! + (-try-acquire ([this] (.tryAcquire ^Semaphore this)) ([this permits] (.tryAcquire ^Semaphore this (int permits))) ([this permits timeout] @@ -28,26 +28,38 @@ (int permits) (long timeout) TimeUnit/MILLISECONDS)))) - (-acquire! + (-acquire ([this] (.acquire ^Semaphore this) true) ([this permits] (.acquire ^Semaphore this (int permits)) true)) - (-release! + (-release ([this] (.release ^Semaphore this)) ([this permits] (.release ^Semaphore this (int permits))))) -(defn acquire! - ([sem] (pt/-acquire! sem)) +(defn acquire + ([sem] (pt/-acquire sem)) ([sem & {:keys [permits timeout blocking] :or {blocking true permits 1}}] (if timeout - (pt/-try-acquire! sem permits timeout) + (pt/-try-acquire sem permits timeout) (if blocking - (pt/-acquire! sem permits) - (pt/-try-acquire! sem permits))))) + (pt/-acquire sem permits) + (pt/-try-acquire sem permits))))) -(defn release! - ([sem] (pt/-release! sem)) +(defn acquire! + {:deprecated "12.0.0" + :no-doc true} + [& params] + (apply acquire params)) + +(defn release + ([sem] (pt/-release sem)) ([sem & {:keys [permits]}] - (pt/-release! sem permits))) + (pt/-release sem permits))) + +(defn release! + {:deprecated true + :no-doc true} + [& params] + (apply release params)) (defn create "Creates a Semaphore instance." diff --git a/src/promesa/protocols.cljc b/src/promesa/protocols.cljc index 74e50a6..5f711ad 100644 --- a/src/promesa/protocols.cljc +++ b/src/promesa/protocols.cljc @@ -58,56 +58,62 @@ "A generic abstraction for scheduler facilities." (-schedule! [it ms func] "Schedule a function to be executed in future.")) +#?(:clj + (defprotocol IJoinable + (-join [it] [it duration-or-ms] "block current thread await termination"))) + +#?(:clj + (defprotocol IInvoke + (-invoke [it f] [it f duration-or-ms] "Call a function f in a context with optional timeout"))) + (defprotocol ISemaphore - "An experimental semaphore protocol, used internally; no public api" - (-try-acquire! [it] [it n] [it n t] "Try acquire n or n permits, non-blocking or optional timeout") - (-acquire! [it] [it n] "Acquire 1 or N permits") - (-release! [it] [it n] "Release 1 or N permits")) + "A semaphore protocol, used internally; no public api" + (^:no-doc -try-acquire [it] [it n] [it n t] "Try acquire n or n permits, non-blocking or optional timeout") + (^:no-doc -acquire [it] [it n] "Acquire 1 or N permits") + (^:no-doc -release [it] [it n] "Release 1 or N permits")) (defprotocol ILock - "An experimental lock protocol, used internally; no public api" - (-lock! [it]) - (-unlock! [it])) + "A lock protocol, used internally; no public api" + (^:no-doc -lock [it]) + (^:no-doc -unlock [it])) (defprotocol IReadChannel - (-take! [it handler])) + "Used internally; no public api" + (^:no-doc -take [it handler])) (defprotocol IWriteChannel - (-put! [it val handler])) + "Used internally; no public api" + (^:no-doc -put [it val handler])) (defprotocol IChannelInternal - (^:no-doc -cleanup! [it])) + "Used internally; no public api" + (^:no-doc -cleanup [it])) (defprotocol IChannelMultiplexer - (^:no-doc -tap! [it ch close?]) - (^:no-doc -untap! [it ch])) + "Used internally; no public api" + (^:no-doc -tap [it ch close?]) + (^:no-doc -untap [it ch])) (defprotocol ICloseable - (-closed? [it]) - (-close! [it] [it reason])) + "Used internally; no public api" + (^:no-doc -closed? [it]) + (^:no-doc -close [it] [it reason])) (defprotocol IBuffer - (-full? [it]) - (-poll! [it]) - (-offer! [it val]) - (-size [it])) + "Used internally; no public api" + (^:no-doc -full? [it]) + (^:no-doc -poll [it]) + (^:no-doc -offer [it val]) + (^:no-doc -size [it])) (defprotocol IHandler - (-active? [it]) - (-commit! [it]) - (-blockable? [it])) + "Used internally; no public api" + (^:no-doc -active? [it]) + (^:no-doc -commit [it]) + (^:no-doc -blockable? [it])) #?(:clj ;; DEPRECATED (defprotocol IAwaitable (-await! [it] [it duration] "block current thread await termination"))) -#?(:clj - (defprotocol IJoinable - (-join [it] [it duration-or-ms] "block current thread await termination"))) - -#?(:clj - (defprotocol IInvoke - (-invoke [it f] [it f duration-or-ms] "Call a function f in a context with optional timeout"))) - - diff --git a/src/promesa/util.cljc b/src/promesa/util.cljc index 418a7f6..d2ba1ce 100644 --- a/src/promesa/util.cljc +++ b/src/promesa/util.cljc @@ -159,13 +159,13 @@ (let [m (ReentrantLock.)] (reify pt/ILock - (-lock! [_] (.lock m)) - (-unlock! [_] (.unlock m)))) + (-lock [_] (.lock m)) + (-unlock [_] (.unlock m)))) :cljs (reify pt/ILock - (-lock! [_]) - (-unlock! [_])))) + (-lock [_]) + (-unlock [_])))) (defn try* @@ -186,9 +186,9 @@ (defn close ([o] - (pt/-close! o)) + (pt/-close o)) ([o reason] - (pt/-close! o reason))) + (pt/-close o reason))) (defn close! {:deprecated "12.0.0"} @@ -200,7 +200,7 @@ java.util.concurrent.ExecutorService (-closed? [it] (.isTerminated it)) - (-close! [it] + (-close [it] (let [interrupted (volatile! false)] (loop [terminated? ^Boolean (.isTerminated it)] (when-not terminated? @@ -222,7 +222,7 @@ java.lang.AutoCloseable (-closed? [_] (throw (IllegalArgumentException. "not implemented"))) - (-close! [it] + (-close [it] (.close ^java.lang.AutoCloseable it)))) (defmacro with-open @@ -242,6 +242,6 @@ (let [target# ~(first bindings)] (if (instance? java.lang.AutoCloseable target#) (.close ^java.lang.AutoCloseable target#) - (pt/-close! target#))))))) + (pt/-close target#))))))) `(do ~@body) (reverse (partition 2 bindings)))) diff --git a/test/promesa/tests/exec_csp_buffers_test.cljc b/test/promesa/tests/exec_csp_buffers_test.cljc index e414eca..8b6732f 100644 --- a/test/promesa/tests/exec_csp_buffers_test.cljc +++ b/test/promesa/tests/exec_csp_buffers_test.cljc @@ -9,17 +9,17 @@ (t/deftest fixed-buffer (let [buf (buffers/fixed 2)] - (t/is (nil? (pt/-poll! buf))) + (t/is (nil? (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) - (t/is (true? (pt/-offer! buf :a))) - (t/is (true? (pt/-offer! buf :b))) - (t/is (false? (pt/-offer! buf :c))) + (t/is (true? (pt/-offer buf :a))) + (t/is (true? (pt/-offer buf :b))) + (t/is (false? (pt/-offer buf :c))) (t/is (true? (pt/-full? buf))) - (t/is (= :a (pt/-poll! buf))) - (t/is (= :b (pt/-poll! buf))) + (t/is (= :a (pt/-poll buf))) + (t/is (= :b (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) @@ -27,18 +27,18 @@ (t/deftest expanding-buffer (let [buf (buffers/expanding 2)] - (t/is (nil? (pt/-poll! buf))) + (t/is (nil? (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) - (t/is (true? (pt/-offer! buf :a))) - (t/is (true? (pt/-offer! buf :b))) - (t/is (true? (pt/-offer! buf :c))) + (t/is (true? (pt/-offer buf :a))) + (t/is (true? (pt/-offer buf :b))) + (t/is (true? (pt/-offer buf :c))) (t/is (true? (pt/-full? buf))) - (t/is (= :a (pt/-poll! buf))) - (t/is (= :b (pt/-poll! buf))) - (t/is (= :c (pt/-poll! buf))) + (t/is (= :a (pt/-poll buf))) + (t/is (= :b (pt/-poll buf))) + (t/is (= :c (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) @@ -46,19 +46,19 @@ (t/deftest dropping-buffer (let [buf (buffers/dropping 2)] - (t/is (nil? (pt/-poll! buf))) + (t/is (nil? (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) - (t/is (true? (pt/-offer! buf :a))) - (t/is (true? (pt/-offer! buf :b))) - (t/is (true? (pt/-offer! buf :c))) + (t/is (true? (pt/-offer buf :a))) + (t/is (true? (pt/-offer buf :b))) + (t/is (true? (pt/-offer buf :c))) (t/is (= 2 (pt/-size buf))) (t/is (false? (pt/-full? buf))) - (t/is (= :a (pt/-poll! buf))) - (t/is (= :b (pt/-poll! buf))) - (t/is (= nil (pt/-poll! buf))) + (t/is (= :a (pt/-poll buf))) + (t/is (= :b (pt/-poll buf))) + (t/is (= nil (pt/-poll buf))) (t/is (false? (pt/-full? buf))) @@ -67,19 +67,19 @@ (t/deftest sliding-buffer (let [buf (buffers/sliding 2)] - (t/is (nil? (pt/-poll! buf))) + (t/is (nil? (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) - (t/is (true? (pt/-offer! buf :a))) - (t/is (true? (pt/-offer! buf :b))) - (t/is (true? (pt/-offer! buf :c))) + (t/is (true? (pt/-offer buf :a))) + (t/is (true? (pt/-offer buf :b))) + (t/is (true? (pt/-offer buf :c))) (t/is (= 2 (pt/-size buf))) (t/is (false? (pt/-full? buf))) - (t/is (= :b (pt/-poll! buf))) - (t/is (= :c (pt/-poll! buf))) - (t/is (= nil (pt/-poll! buf))) + (t/is (= :b (pt/-poll buf))) + (t/is (= :c (pt/-poll buf))) + (t/is (= nil (pt/-poll buf))) (t/is (false? (pt/-full? buf))) (t/is (= 0 (pt/-size buf))) @@ -87,13 +87,13 @@ (t/deftest list-ops (let [o (mlist/create)] - (t/is (some? (mlist/add-first! o :a))) - (t/is (some? (mlist/add-first! o :b))) - (t/is (some? (mlist/add-first! o :c))) + (t/is (some? (mlist/add-first o "a"))) + (t/is (some? (mlist/add-first o "b"))) + (t/is (some? (mlist/add-first o "c"))) (t/is (= 3 (mlist/size o))) - (t/is (= :a (mlist/remove-last! o))) - (t/is (some? (mlist/add-last! o :d))) - (t/is (= :d (mlist/remove-last! o))) + (t/is (= "a" (mlist/remove-last o))) + (t/is (some? (mlist/add-last o "d"))) + (t/is (= "d" (mlist/remove-last o))) (t/is (= 2 (mlist/size o))) ))