From 7ce7d62996fd3f7a78fc8c74005e1dbca7a74ed1 Mon Sep 17 00:00:00 2001 From: Tim Whiting Date: Sat, 24 Jan 2026 19:51:37 -0700 Subject: [PATCH 1/6] Improve refcount for continuations --- lib/std/core/hnd.kk | 52 ++++++++++++++++++-------------- lib/std/core/inline/hnd.c | 44 +++++++++++++++++++-------- test/cgen/simple-refcount.kk | 22 ++++++++++++++ test/cgen/simple-refcount.kk.out | 2 ++ 4 files changed, 84 insertions(+), 36 deletions(-) create mode 100644 test/cgen/simple-refcount.kk create mode 100644 test/cgen/simple-refcount.kk.out diff --git a/lib/std/core/hnd.kk b/lib/std/core/hnd.kk index 1fc25ab2c..2213adace 100644 --- a/lib/std/core/hnd.kk +++ b/lib/std/core/hnd.kk @@ -603,31 +603,37 @@ fun get( ref: ref) : ,div> a inline extern unsafe-st(f : () -> |e> a ) : (() -> e a) inline "#1" -fun protect-prompt( resumed : ref, k : resume-result -> e r, res : r ) : e r - val did-resume : bool = (unsafe-st{ !resumed })() - if did-resume then - // if resumed, we no longer need to protect - res - elif !yielding() then - // otherwise, if we are not yielding, resume k with finalization (to run all finally clauses) - k(Finalize(res)) - elif yielding-non-final() then - // if we yield non-final to an operation, extend the continuation with this prompt (so we keep protecting after being resumed) - yield-cont( fn(cont,x) protect-prompt(pretend-decreasing(resumed),k,cont(x)) ) - else - // if we are in a final yield, capture it, resume k with finalization, and reyield - val yld = yield-capture() - k(Finalize(res)) - if yielding() return yield-extend( fn(_x) unsafe-reyield(yld) ) // yikes, a finally clause is itself yielding... - unsafe-reyield(yld) +type protect-state + NeedsFinalization(k : resume-result -> e r) + NoFinalization + +fun protect-prompt( resumed : ref>, res : r) : e r + val hdiv=@Hnodiv // Needed since resumed is in global state. + val did-resume : some protect-state = (unsafe-st{ !resumed })() + match did-resume + NoFinalization -> // resumed already, no need to protect + res + NeedsFinalization(k) -> + if !yielding() then + // otherwise, if we are not yielding, resume k with finalization (to run all finally clauses) + k(Finalize(res)) + elif yielding-non-final() then + // if we yield non-final to an operation, extend the continuation with this prompt (so we keep protecting after being resumed) + yield-cont( fn(cont,x) protect-prompt(pretend-decreasing(resumed),cont(x)) ) + else + // if we are in a final yield, capture it, resume k with finalization, and reyield + val yld = yield-capture() + k(Finalize(res)) + if yielding() return yield-extend( fn(_x) unsafe-reyield(yld) ) // yikes, a finally clause is itself yielding... + unsafe-reyield(yld) fun protect( x : a, clause : (x:a, k: b -> e r) -> e r, k : resume-result -> e r ) : e r - val resumed = (unsafe-st{ref(False)})() + val resumed = (unsafe-st{ref(NeedsFinalization(k))})() fun kprotect(ret) - (unsafe-st{resumed := True})() + (unsafe-st{resumed := NoFinalization})() k(Deep(ret)) val res = clause(x,kprotect) - protect-prompt(resumed,k,res) + protect-prompt(resumed,res) /* pub fun clause-control1( clause : (x:a, k: b -> e r) -> e r ) : clause1 @@ -715,12 +721,12 @@ fun under2( ev : ev, op : (a1,a2) -> e b, x1 : a1, x2 : a2 ) : e b z fun protect2( x1 : a1, x2:a2, clause : (x:a1,x:a2, k: b -> e r) -> e r, k : resume-result -> e r ) : e r - val resumed = (unsafe-st{ref(False)})() + val resumed = (unsafe-st{ref(NeedsFinalization(k))})() fun kprotect(ret) - (unsafe-st{ resumed := True })() + (unsafe-st{ resumed := NoFinalization })() k(Deep(ret)) val res = clause(x1,x2,kprotect) - protect-prompt(resumed,k,res) + protect-prompt(resumed,res) pub fun clause-control2( clause : (x1:a1, x2:a2, k: b -> e r) -> e r ) : clause2 Clause2(fn(m,_ev,x1,x2){ yield-to(m, fn(k){ protect2(x1,x2,clause,k) }) }) diff --git a/lib/std/core/inline/hnd.c b/lib/std/core/inline/hnd.c index c6336872b..4fb98aebb 100644 --- a/lib/std/core/inline/hnd.c +++ b/lib/std/core/inline/hnd.c @@ -268,22 +268,40 @@ static kk_box_t kcompose( kk_function_t fself, kk_box_t x, kk_context_t* ctx) { kk_intx_t count = kk_intf_unbox(self->count); kk_function_t* conts = &self->conts[0]; // call each continuation in order - for(kk_intx_t i = 0; i < count; i++) { - // todo: take uniqueness of fself into account to avoid dup_function - kk_function_t f = kk_function_dup(conts[i],ctx); - x = kk_function_call(kk_box_t, (kk_function_t, kk_box_t, kk_context_t*), f, (f, x, ctx), ctx); - if (kk_yielding(ctx)) { - // if yielding, `yield_next` all continuations that still need to be done - while(++i < count) { - // todo: if fself is unique, we could copy without dup? - kk_yield_extend(kk_function_dup(conts[i],ctx),ctx); + if kk_likely(kk_datatype_ptr_is_unique(fself, ctx)) { + // Special handling for unique continuation function to avoid dup/drop overhead for the continuation and captured variables. + for(kk_intx_t i = 0; i < count; i++) { + kk_function_t f = conts[i]; + x = kk_function_call(kk_box_t, (kk_function_t, kk_box_t, kk_context_t*), f, (f, x, ctx), ctx); + if (kk_yielding(ctx)) { + // if yielding, `yield_next` all continuations that still need to be done + while(++i < count) { + kk_yield_extend(conts[i],ctx); // just move the continuation (no dup needed since it's parent is unique and being dropped) + } + kk_free((void*)self, ctx); + kk_box_drop(x,ctx); // still drop even though we yield as it may release a boxed value type? + return kk_box_any(ctx); // return yielding } - kk_function_drop(fself,ctx); - kk_box_drop(x,ctx); // still drop even though we yield as it may release a boxed value type? - return kk_box_any(ctx); // return yielding } + kk_free((void*)self, ctx); + // kk_function_drop(self,ctx); Can't do this, since all of it's child functions are dropped! + } else { + for(kk_intx_t i = 0; i < count; i++) { + // todo: take uniqueness of fself into account to avoid dup_function + kk_function_t f = kk_function_dup(conts[i],ctx); + x = kk_function_call(kk_box_t, (kk_function_t, kk_box_t, kk_context_t*), f, (f, x, ctx), ctx); + if (kk_yielding(ctx)) { + // if yielding, `yield_next` all continuations that still need to be done + while(++i < count) { + kk_yield_extend(kk_function_dup(conts[i],ctx),ctx); + } + kk_function_drop(fself,ctx); + kk_box_drop(x,ctx); // still drop even though we yield as it may release a boxed value type? + return kk_box_any(ctx); // return yielding + } + } + kk_function_drop(fself,ctx); } - kk_function_drop(fself,ctx); return x; } diff --git a/test/cgen/simple-refcount.kk b/test/cgen/simple-refcount.kk new file mode 100644 index 000000000..437033af3 --- /dev/null +++ b/test/cgen/simple-refcount.kk @@ -0,0 +1,22 @@ +import std/num/int32 + +extern create-box(): io-noexn any + c inline "kk_cptr_raw_box(&kk_free_fun, kk_malloc(sizeof(int), _ctx), _ctx)" + +extern get-refcount(^box: any): io-noexn int32 + c inline "kk_block_refcount(kk_box_to_ptr(box, _ctx))" + +effect ctl catch(): () + +fun main() + with handler ctl catch() + resume(()) + resume(()) + val box = create-box() + val count0 = box.get-refcount() + catch() + val count1 = box.get-refcount() + println((count0.int, count1.int).show) + // Expected output: + // (1,1) + // (1,0) \ No newline at end of file diff --git a/test/cgen/simple-refcount.kk.out b/test/cgen/simple-refcount.kk.out new file mode 100644 index 000000000..c124f7a23 --- /dev/null +++ b/test/cgen/simple-refcount.kk.out @@ -0,0 +1,2 @@ +(1,1) +(1,0) \ No newline at end of file From 998975c55eeaf7226c8f5a1129e0a84cb7c763e9 Mon Sep 17 00:00:00 2001 From: Tim Whiting Date: Mon, 15 Dec 2025 19:42:25 -0700 Subject: [PATCH 2/6] basics for libuv --- lib/std/async.kk | 3 + lib/std/async/async.kk | 671 ++++++++++++++++++++++++++++++++++ lib/std/async/inline/timer.js | 26 ++ lib/std/async/null.kk | 38 ++ lib/uv/event-loop.kk | 73 ++++ lib/uv/inline/event-loop.c | 123 +++++++ lib/uv/inline/timer.c | 226 ++++++++++++ lib/uv/inline/timer.h | 22 ++ lib/uv/inline/timer.js | 26 ++ lib/uv/inline/utils.c | 195 ++++++++++ lib/uv/inline/utils.h | 131 +++++++ lib/uv/timer.kk | 89 +++++ lib/uv/utils.kk | 203 ++++++++++ samples/async/timer.kk | 19 + src/Compile/CodeGen.hs | 16 +- src/Compile/Options.hs | 1 + 16 files changed, 1855 insertions(+), 7 deletions(-) create mode 100644 lib/std/async.kk create mode 100644 lib/std/async/async.kk create mode 100644 lib/std/async/inline/timer.js create mode 100644 lib/std/async/null.kk create mode 100644 lib/uv/event-loop.kk create mode 100644 lib/uv/inline/event-loop.c create mode 100644 lib/uv/inline/timer.c create mode 100644 lib/uv/inline/timer.h create mode 100644 lib/uv/inline/timer.js create mode 100644 lib/uv/inline/utils.c create mode 100644 lib/uv/inline/utils.h create mode 100644 lib/uv/timer.kk create mode 100644 lib/uv/utils.kk create mode 100644 samples/async/timer.kk diff --git a/lib/std/async.kk b/lib/std/async.kk new file mode 100644 index 000000000..cd386a181 --- /dev/null +++ b/lib/std/async.kk @@ -0,0 +1,3 @@ +pub import std/async/async +pub import uv/timer +pub import uv/event-loop \ No newline at end of file diff --git a/lib/std/async/async.kk b/lib/std/async/async.kk new file mode 100644 index 000000000..127ca2be4 --- /dev/null +++ b/lib/std/async/async.kk @@ -0,0 +1,671 @@ +/*--------------------------------------------------------------------------- + Copyright 2012-2021, Microsoft Research, Daan Leijen. + + This is free software; you can redistribute it and/or modify it under the + terms of the Apache License, Version 2.0. A copy of the License can be + found in the LICENSE file at the root of this distribution. +---------------------------------------------------------------------------*/ + +/* Asynchronous primitives + +This module is based closely on [@Leijen:async] and aims to have robust and composable asynchronous +primitives. In particular, any outstanding asynchronous operation can be canceled (through `cancel`) +within a certain scope which allows for composable primitives like `timeout` and `firstof`. + + +## References {- +~ Bibliography { caption:"0"~~ BibItem { #Leijen:async; bibitem-label:"[1]"; searchterm:"Leijen+Daan+Structured+Asynchrony+with+Algebraic+Effects"Daan Leijen. +_Structured Asynchrony with Algebraic Effects_. +Microsoft Research technical report MSR-TR-2017-21, May 2017. +[pdf](https://www.microsoft.com/en-us/research/wp-content/uploads/2017/05/asynceffects-msr-tr-2017-21.pdf) +~~ +~ +\/ +*/ +module std/async/async + +import std/data/dict +import std/data/array +import std/num/int32 +import std/num/ddouble // for C# backend +import std/time/duration +import std/async/null +import std/core/unsafe +pub import std/num/ddouble +pub import std/num/float64 +import std/time/timestamp + + // js is just using primitives + +// A type alias for asynchronous operations that can raise exceptions non-deterministically. +// This is common for almost all `:async` operations since `cancel` and `timeout` can +// cancel operations non-deterministically which raises the `Cancel` exception and cancels +// outstanding asynchronous requests. +pub alias asyncx = + +pub alias async-exn = + +// ---------------------------------------------------------------------------- +// Promises +// ---------------------------------------------------------------------------- + +// A _promise_ that carries a value of type `:a`. A promise is initially empty +// but can be `await`ed asynchronously until it gets `resolve`d unblocking any +// `await` operations. After that a promise stays resolved and any `await` will +// return immediately. It is an error to try to resolve a promise more than once. +abstract struct promise + state : ref> + + +abstract value type promise-state + Resolved( value : a ) + Awaiting( listeners : list io ()> ) + +// Create a new promise. +pub fun promise() : async promise + async-iox { Promise(ref(Awaiting([]))) } + +// Await a promise; returns immediately if the promise was already resolved and otherwise +// waits asynchronously. +pub fun promise/await( p : promise ) : asyncx a + fun setup(cb : _ -> io-noexn ()) + val r = p.state + match (!r) + Awaiting(listeners) -> r := Awaiting(Cons(cb,listeners)) + Resolved(value) -> io-noexn1(cb,value) // resume right away; should not happen due to try-await + match p.try-await + Just(v) -> v + Nothing -> await1(setup) + +// Returns immediately if the promise was already resolved and otherwise return `Nothing`. +pub fun try-await( p : promise ) : maybe + async-io-noexn + val r = p.state + match !r + Resolved(value) -> Just(value) + _ -> Nothing + +// Resolve a promise to `value`. Raises an exception if the promise was already resolved. +pub fun resolve( p : promise, value : a ) : asyncx () + async-io + if !try-resolve-io(p, value) then + throw("Promise was already resolved") + +// Resolve a promise to `value`. Returns False if the promise was already resolved. +// TODO: make this io-noexn +pub fun try-resolve-io( p : promise, value : a ) : io bool + val r = p.state + match !r + Awaiting(listeners) -> + r := Resolved(value) + listeners.foreach fn(cbx) // todo: through set-immediate? + cbx(value) // set-immediate1( cbx, value ) + True + _ -> False + +// ---------------------------------------------------------------------------- +// Channels +// ---------------------------------------------------------------------------- + +// A _channel_ of values of type `:a`. Values can be asynchronously `emit`ed into +// a channel, and asynchronously `receive`d. +abstract value struct channel( + chid : int, + state : ref> +) + +// todo: use queue data type for the values and listeners for better complexity +abstract type channel-state + Empty + Values( value : a, values : list = [] ) + Waiting( listener : a -> io-noexn (), listeners : list io-noexn ()> = [] ) + +fun from-values(values : list ) : channel-state + match values + Nil -> Empty + Cons(v,vs) -> Values(v,vs) + +fun from-waiting(listeners : list io-noexn ()>) : channel-state + match listeners + Nil -> Empty + Cons(l,ls) -> Waiting(l,ls) + +// Create a new asynchronous channel. +pub fun channel() : async channel + async-iox + Channel(unique(), ref(Empty)) + +// Receive (and remove) a value from the channel: returns immediately if a value is available and otherwise +// waits asynchronously until a value becomes available. +pub fun receive( ch : channel ) : asyncx a + ch.receivex.untry + +fun receivex( ch : channel, cancelable : bool = True ) : error + fun setup( cb : (_,_) -> io-noexn () ) + fun cbr(x) cb(Ok(x),True) + val r = ch.state + match !r + Empty -> r := Waiting(cbr,[]) + Waiting(l,ls) -> r := Waiting(l,ls ++ [cbr]) + Values(v,vs)-> // this case should not happen due to `try-receive` + r := from-values(vs) + cbr(v) + Nothing + + match ch.try-receive + Just(v) -> Ok(v) + Nothing -> do-await(setup,empty-scope,cancelable) + +// Return immediately if a value is available on the channel and otherwise returns `Nothing`. +pub fun try-receive( ch : channel ) : maybe + async-io-noexn + val r = ch.state + match (!r) + Values(v, vs) -> + r := from-values(vs) + Just(v) + _ -> Nothing + +fun emit-io( ch : channel, value : a ) : io-noexn () + val r = ch.state + match !r + Empty -> r := Values(value, []) + Values(v,vs) -> r := Values(v,vs ++ [value]) + Waiting(l,ls) -> + r := from-waiting(ls) + l(value) + +// Emit a value asynchronously into a channel. +pub fun emit( ch : channel, value : a ) : asyncx () + async-io + emit-io(ch,value) + +fun trace-channel( msg : string, ch : channel ) : () + async-io-noexn + trace-channel-io( msg, ch ) + +fun trace-channel-io( msg : string, ch : channel ) : io-noexn () + val msgx = msg ++ ": id=" ++ ch.chid.show + val r = ch.state + match !r + Empty -> trace(msgx ++ ", empty") + Values(v,vs) -> trace-any(msgx ++ ", full: " ++ (1 + vs.length).show ++ ": ", v ) + Waiting(_,ls) -> trace(msgx ++ ", listeners: " ++ (1 + ls.length).show) + +fun trace-anyx( s : string, x : a ) : async () + trace-any(s,x) + +// ---------------------------------------------------------------------------- +// Asynchronous timeout and waiting +// ---------------------------------------------------------------------------- + +// Execute `action` but if it is not finished within `secs` seconds duration +// `cancel` it (and return `Nothing`). Due to the generality of `cancel`, this `timeout` +// abstraction can reliably time out over any composition of asynchronous operations +// and is therefore quite expressive. + +pub fun timeout( secs : duration, action : () -> a, ?set-timeout: (unit-cb, int32) -> io-noexn any, ?clear-timeout: (any) -> io-noexn () ) : maybe + firstof { duration/wait(secs); Nothing} { Just(action()) } + +// Execute `a` and `b` interleaved. As soon as one of them finishes, +// `cancel` the other one and return the result of the first. +pub fun firstof( a : () -> a, b : () -> a ) : a + cancelable + val (ra,rb) = interleavedx { val x = mask behind{ a() }; cancel(); x } + { val x = mask behind{ b() }; cancel(); x } + match ra + Error(exn) | exn.is-cancel -> rb.untry + _ -> ra.untry + + + +// Wait (asynchronously) for `secs` seconds as a `:double`. +// Use `yield()` to yield to other asynchronous operations. +pub fun float/wait( secs : float64, ?set-timeout: (unit-cb, int32) -> io-noexn any, ?clear-timeout: (any) -> io-noexn () ) : asyncx () + wait(secs.duration) + +// Wait (asynchronously) for optional `secs` seconds `:duration` (`= 0.seconds`). +// Use `yield()` to yield generally to other asynchronous operations. +pub fun duration/wait( secs : duration = zero, ?set-timeout: (unit-cb, int32) -> io-noexn any, ?clear-timeout: (any) -> io-noexn () ) : asyncx () + if secs <= duration/zero then return yield() + val msecs = max(zero:int32,secs.milli-seconds.int32) + await fn(cb) + val tid = async/set-timeout( fn(){ cb(Ok(())) }, msecs ) + Just( { async/clear-timeout(tid) } ) + +// Yield to other asynchronous operations. Same as `wait(0)`. +pub fun yield(?set-timeout: (unit-cb, int32) -> io-noexn any) : asyncx () + await0 fn(cb) + async/set-timeout( cb, int32/zero ) + () + +// abstract wid for timeout handlers +abstract struct timeout-id( + timer : any +) + +alias unit-cb = () -> io-noexn () + +fun async/set-timeout( cb : unit-cb, ms : int32, ?set-timeout: (unit-cb, int32) -> io-noexn any) : io-noexn timeout-id + Timeout-id(?set-timeout(cb,max(ms,zero))) + + +fun async/clear-timeout( tid : timeout-id , ?clear-timeout: (any) -> io-noexn ()) : io-noexn () + ?clear-timeout(tid.timer) + +// ---------------------------------------------------------------------------- +// Interleaved strands of execution +// ---------------------------------------------------------------------------- + +// Interleave two actions around their asynchronous operations. +pub fun two/interleaved( action1 : () -> a, action2 : () -> b ) : (a,b) + val (ra,rb) = interleavedx( {mask behind(action1)}, {mask behind(action2)} ) + [ra.maybe-exn,rb.maybe-exn].ordered_throw + (ra.untry,rb.untry) + +// Interleave a list of actions around their asynchronous operations. +pub fun list/interleaved( xs : list<() -> a> ) : list + val ress = xs.map( fn(f) { return { mask behind(f) } } ).interleavedx + //ress.map(maybe).ordered_throw + ress.map(untry) + +fun maybe-exn( err : error ) : maybe + match err + Error(exn) -> Just(exn) + _ -> Nothing + +fun ordered_throw( xs : list> ) : exn () + var mexn := Nothing + xs.foreach fn(x) + match x + Nothing -> () + Just(exn) -> match mexn + Nothing -> mexn := x + Just(exnx) -> + if ((exn.is-finalize && !exnx.is-finalize) || (exnx.is-cancel && !exn.is-cancel)) + then mexn := x + match mexn + Just(exn) -> rethrow(exn) + Nothing -> () + +// Interleave two actions around their asynchronous operations and explicitly returning either +// their result or their exception. +pub fun interleavedx( action1 : () -> a, action2 : () -> b ) : (error,error) + fun act1() Left(action1()) + fun act2() Right(action2()) + match interleavedx([act1,act2]) + Cons(x,Cons(y)) -> (x.unleft,y.unright) + _ -> + // this will never happen.. + val exn = Exception("invalid interleaved result",ExnInternal("std/async/interleavedx(action1,action2)")) + (Error(exn),Error(exn)) + +fun unleft( x : error> ) : error + match x + Ok(Left(l)) -> Ok(l) + Error(exn) -> Error(exn) + _ -> Error(Exception("invalid left interleaved result",ExnInternal("std/async/interleavedx(action1,action2)"))) +fun unright( x : error> ) : error + match x + Ok(Right(r)) -> Ok(r) + Error(exn) -> Error(exn) + _ -> Error(Exception("invalid right interleaved result",ExnInternal("std/async/interleavedx(action1,action2)"))) + +// Private effect to keep track of when a strand in an interleaving is done. +// Preferred over using built-in state as this works well if there is an outer handler +// over the state that resumes more than once -- redoing part of the interleaving. +// See `test/algeff/async5.js` +effect strands + // Are there still strands that need to be resumed? + fun strands-are-busy() : bool + // Call this when a strand is done. + fun strand-done(idx : int, result : error) : () + +// Insert in order with an accumulating list. +fun insert-acc( xs : list<(int,a)>, idx : int, value : a, acc : list<(int,a)> ) : list<(int,a)> + match xs + Cons(x,xx) | x.fst < idx -> insert-acc(xx, idx, value, Cons(x,acc)) + _ -> reverse-append( acc, Cons((idx,value),xs) ) + +// Insert in order +fun insert( xs : list<(int,a)>, idx : int, value : a, n : int = 0 ) : list<(int,a)> + if n > 100 + then insert-acc( xs, idx, value, [] ) + else match xs + Cons(x,xx) | x.fst < idx -> Cons(x, insert(xx, idx, value, n + 1)) + _ -> Cons((idx,value),xs) + + +// Interleave a list actions around their asynchronous operations and explicitly returning either +// either their result or their exception. +pub fun list/interleavedx( xs : list<() -> a> ) : list> + val n = xs.length + if n==0 then [] + elif n==1 then xs.map(unsafe-try-all) + else interleavedn(n,xs) + +fun interleavedn( n : int, xs : list<() -> a> ) : list> + unsafe-no-ndet-div + var cr : some (int,list<(int,error)>) := (n,[]) + with handler + return(x) + cr.snd.map( snd ) + fun strands-are-busy() + cr.fst > 0 + fun strand-done(idx,res) + cr := (cr.fst - 1, cr.snd.insert(idx,res)) + mask + interleaved-div(xs) + + +inline extern unsafe-no-ndet-div-cast : forall (() -> a) -> (() -> e a) + inline "#1" + +fun unsafe-no-ndet-div( action : () -> a ) : e a + unsafe-no-ndet-div-cast(action)() + +inline extern inject-effects : forall (() -> e a) -> total (() -> ,ndet,div|e> a) + inline "#1" + +fun error/is-finalize( t : error ) : bool + match t + Error(exn) -> exn.is-finalize + _ -> False + +fun error/is-cancel( t : error ) : bool + match t + Error(exn) -> exn.is-cancel + _ -> False + +fun interleaved-div( xs : list<() -> a> ) : |e> () + val strands = xs.map-indexed fn(i,action) + return fn() + val res = unsafe-try-all(inject-effects(action)) + strand-done(i,res) + if res.is-finalize then cancel() // cancel others if finalization happens + val ch : some channel<() -> a> = channel() + val handle-strand = handler + raw ctl do-await(setup,scope,c) + no-await( setup, scope, c) fn(res) + // emit a resumption of this strand into the channel + ch.emit-io( /* no-cps */ { rcontext.resume(res) } ) + () // stop the strand at this point + // redirect all other operations + fun no-await(setup,scope,c,f) + no-await(setup,scope,c,f) + fun async-iox(f) + async-iox(f) + fun cancel(scope) + cancel(scope) + + strands.foreach fn(strand) + handle-strand{ mask behind(strand) } + + while { strands-are-busy() } // while there are resumptions on the strands.. + // the only way receive can throw is through a cancelation -- but in that case + // we should not cancel but instead await all canceled strands; so keep listening on the channel. + match(ch.receivex(False)) + Error(_exn) -> () // ignore cancelation on receive + Ok(strand-resume) -> strand-resume() + () + + + +// ---------------------------------------------------------------------------- +// Await wrappers +// ---------------------------------------------------------------------------- + +// Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception. +pub fun await-exn0( setup : (cb : (null) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : asyncx () + await fn(cb) + setup( fn(nexn) cb(nexn.unnull(())) ) + +// Convenience function for awaiting a NodeJS style callback where the first argument is a possible exception +// and the second argument the possible result value. +pub fun await-exn1( setup : (cb : (null,a) -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : asyncx a + await fn(cb) + setup( fn(nexn,x) cb(nexn.unnull(x)) ) + +fun unnull( nexn : null, x : a ) : error + match nexn.maybe + Nothing -> Ok(x) + Just(exn) -> Error(exn) + +// Convenience function for awaiting a zero argument callback. +pub fun await0( setup : (cb : () -> io-noexn () ) -> io () ) : asyncx () + await fn(cb) + setup( fn() cb(Ok(())) ) + Nothing + +// Convenience function for awaiting a single argument callback. +pub fun await1( setup : (cb : (a) -> io-noexn () ) -> io () ) : asyncx a + await fn(cb) + setup( fn(x) cb(Ok(x)) ) + Nothing + +// Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: +// it takes either an exception or a result `a`. Usually `setup` returns `Nothing` but you can return a `Just(cleanup)` +// value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`). +// The callback should be invoked exactly once -- when that happens `await` is resumed with the result using `untry` +// either raise an exception or return the plain result. +pub fun setup/await( setup : (cb : error -> io-noexn () ) -> io maybe<() -> io-noexn ()> ) : asyncx a + await-exn(setup).untry + + + + +// ---------------------------------------------------------------------------- +// Async effect +// ---------------------------------------------------------------------------- +alias await-result = error +alias await-setup = (cb : (error,bool) -> io-noexn ()) -> io-noexn (maybe<() -> io-noexn ()>) + +// Asynchronous operations have the `:async` effect. +pub effect async + ctl do-await( setup : await-setup, scope : scope, cancelable : bool ) : error + ctl no-await( setup : await-setup, scope : scope, cancelable : bool, f : error -> io-noexn () ) : () + ctl async-iox( action : () -> io-noexn a ) : a + ctl cancel( scope : scope ) : () + + +// The `cancel` operations cancels any outstanding asynchronous operation under the innermost +// `cancelable` handler by returning the `Cancel` exception. The `cancel` operation itself returns normally +// without raising a `Cancel` exception. +pub fun noscope/cancel() : async () + cancel(empty-scope) + +// Primitive: Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: +// it takes either an exception or a result `a`. Usually `setup` returns `Nothing` but you can return a `Just(cleanup)` +// value where the `cleanup` functions is invoked on cancellation to dispose of any resources (see the implementation of `wait`). +// The callback should be invoked exactly once -- when that happens `await-exn` is resumed with the result. +pub fun await-exn( setup : (cb : (error) -> io-noexn ()) -> io (maybe<() -> io-noexn ()>) ) : async error + do-await(fn(cb) + match (try{ setup(fn(res) cb(res,True) ) }) + Ok(mcleanup) -> mcleanup + Error(exn) -> + cb(Error(exn),True) + Nothing + , empty-scope, True) + +// Primitive: Execute `setup` to set up an asynchronous callback with the host platform. Invoke `cb` as the callback: it takes either +// an exception or a result value, together with boolean parameter whether the callback is done. +// The callback `cb` will eventually emit the result into the given channel `ch` after applying the transformation `f` to the result.\ +// Note: once you exit the `cancelable` scope where `await-to-channel` was called, the callback is invoked with a `Cancel` exception. +// The channel should always be awaited within the same `cancelable` scope as the `await-to-channel` invokation. +pub fun await-to-channel( setup : (cb : (error,bool) -> io-noexn ()) -> io (maybe<() -> io-noexn ()>), ch : channel, f : error -> b ) : async channel + no-await(fn(cb) + match(try{setup(cb)}) + Ok(mcleanup) -> mcleanup + Error(exn) -> + cb(Error(exn),True) + Nothing + , empty-scope,True, fn(res) + ch.emit-io( f(res) ) + ) + ch + +fun async-io-noexn( f : () -> io-noexn a ) : a + async-iox(f) + +// Perform an I/O operation at the outer level; exceptions are propagated back. +fun async-io( f : () -> io a ) : asyncx a + async-io-noexn( { try(f) } ).untry + + +// ---------------------------------------------------------------------------- +// Async handlers: cancelable +// ---------------------------------------------------------------------------- + +inline extern interject-async( action : () -> a) : total ( () -> a) + inline "#1" + +// Execute `action` in a cancelable scope. If `cancel` is called within `action`, +// any outstanding asynchronous operations started in the cancelable scope are canceled. +// (Outstanding operations outside the cancelable scope are not canceled). +pub fun cancelable( action : () -> a ) : a + val cid = async-iox{ unique() } + fun extend(scope : scope ) + parent-scope(cid,scope) + + handle ({mask behind(action)}) + return(x) -> + // cancel any outstanding operations still in our scope. + // this might be needed for `no-await` operations. + cancel(empty-scope.extend) + x + fun do-await(setup,scope,c) -> do-await(setup,scope.extend,c) + fun no-await(setup,scope,c,f) -> no-await(setup,scope.extend,c,f) + fun cancel(scope) -> cancel(scope.extend) + fun async-iox(f) -> async-iox(f) + +// ---------------------------------------------------------------------------- +// Async handle +// ---------------------------------------------------------------------------- + +pub fun @default-async(action) + async/handle(action) + +fun nodispose() : io-noexn () + () + +// The outer `:async` effect handler. This is automatically applied by the compiler +// around the `main` function if it has an `:async` effect. +pub fun async/handle(action : () -> () ) : io-noexn () + val callbacks : ref io-noexn ())>> = unsafe-total{ref([])} + fun handle-await( setup : await-setup, scope : scope, f : error -> io-noexn (), cancelable : bool) : io-noexn () + val cscope = child-scope(unique(),scope) + val dispose = ref(nodispose) + fun cb( res : error<_>, is-done : bool ) : io-noexn () + if ((!callbacks).contains(cscope)) then + if is-done then + callbacks := (!callbacks).remove(cscope) + if res.is-error then try(!dispose).default(()) + f(res) + + // trace("register: " + cscope.show) + callbacks := Cons((cscope, if cancelable then fn(){ cb(Error(Exception("cancel",Cancel)),True) } else nodispose), !callbacks) + try { + // setup the callback which returns a possible dispose function + match(setup(cb)) + Just(d) -> dispose := d + Nothing -> () + } fn(exn) + // if setup fails, immediately resume with the exception + cb(Error(exn),True) + + fun handle-cancel( scope : scope ) : io-noexn () + (!callbacks).foreach fn(entry) + val (cscope,cb) = entry + if (cscope.in-scope-of(scope)) then cb() + + handle(action) + raw ctl do-await( setup, scope, c ) + handle-await(setup,scope, fn(x) rcontext.resume(x), c) // returns to outer event loop + fun no-await( setup, scope, c, f ) + handle-await(setup,scope,f,c) + fun cancel( scope ) + handle-cancel(scope) + fun async-iox( f ) + f() + +fun io-noexn( f : () -> io-noexn a ) : io a + f() + +fun io-noexn1( f : (a1) -> io-noexn a, x1 : a1 ) : io a + f(x1) + +// ---------------------------------------------------------------------------- +// Scope identifiers +// ---------------------------------------------------------------------------- + +abstract struct scope( : list ) + +val empty-scope = Scope([]) + +fun parent-scope( cid : int, scope : scope ) : scope + match scope + Scope(cids) -> Scope(Cons(cid,cids)) + +fun child-scope( id : int, scope : scope ) : scope + match scope + Scope(cids) -> Scope(cids ++ [id]) + +fun ids/in-scope-of( child : list, parent : list ) : bool + match parent + Nil -> True + Cons(p,ps) -> match child + Cons(c,cs) -> (c == p && in-scope-of(cs,ps)) + Nil -> False + +fun in-scope-of( child : scope, parent : scope ) : bool + match parent + Scope(pids) -> match child + Scope(cids) -> in-scope-of(cids,pids) + +fun scope/(==)(scope1 : scope, scope2 : scope ) : bool + match scope1 + Scope(ids1) -> match scope2 + Scope(ids2) -> ids1==ids2 + +// Convenience functions for scope maps +fun remove( xs : list<(scope,a)>, scope : scope ) : list<(scope,a)> + xs.remove( fn(x:(scope,_)) { x.fst == scope }) + +fun lookup( xs : list<(scope,a)>, scope : scope ) : maybe + xs.lookup( fn(x:scope) { x == scope }) + +fun contains( xs : list<(scope,a)>, scope : scope ) : bool + xs.lookup(scope).bool + +fun show( s : scope ) : string + match s + Scope(ids) -> ids.map(show).join("-") + + + +abstract extend type exception-info + pub con Cancel + pub con Finalize(yld:yield-info) + +// Was this a cancelation exception? +fun exn/is-cancel( exn : exception ) : bool + match exn.info + Cancel -> True + _ -> False + +// Was this a finalization exception? +fun exn/is-finalize(exn : exception) : bool + match exn.info + Finalize -> True + _ -> False + +fun unsafe-try-all( action : () -> a ) : e error + val fin = unsafe-try-finalize{ try(action) } + match fin + Right(t) -> t + Left(yld) -> Error(Exception("finalize",Finalize(yld))) + +fun rethrow( exn : exception ) : exn a + match exn.info + Finalize(yld) -> unsafe-reyield(yld) + _ -> throw-exn(exn) diff --git a/lib/std/async/inline/timer.js b/lib/std/async/inline/timer.js new file mode 100644 index 000000000..9420375f8 --- /dev/null +++ b/lib/std/async/inline/timer.js @@ -0,0 +1,26 @@ + +function _init_timer(){ + return {}; +} + +function _start_timer(timer, ms, repeat, fcn){ + const rp = Number(repeat) + const msx = Number(ms) + if (rp != 0) { + timer.id = setInterval(fcn, rp); + timer.repeat = rp; + } else { + timer.id = setTimeout(fcn, msx); + } +} + +function _stop_timer(timer){ + if (timer.id) { + if (timer.repeat != 0) { + clearInterval(timer.id); + } else { + clearTimeout(timer.id); + } + timer.id = null; + } +} \ No newline at end of file diff --git a/lib/std/async/null.kk b/lib/std/async/null.kk new file mode 100644 index 000000000..75694801a --- /dev/null +++ b/lib/std/async/null.kk @@ -0,0 +1,38 @@ +// Used for external interfaces +module std/async/null + +// Abstract type used for passing `null` values to external functions +pub type null + +// Unsafe: transform any type to a `null` type; used internally by the compiler. +pub extern @null-any(x : a) : null + c inline "(kk_box_is_null(#1) ? kk_datatype_null() : kk_datatype_unbox(#1))" + cs inline "#1" + js inline "(#1==null ? null : #1)" // undefined -> null + +// Transform a `:maybe` type to a `:null` type (using `null` for `Nothing`). +pub extern maybe/null(x : maybe) : null + c inline "(kk_std_core_types__is_Nothing(#1,kk_context()) ? kk_datatype_null() : kk_datatype_unbox((#1)._cons.Just.value) /* kk_datatype_unbox(kk_datatype_unjust(#1,kk_context())) */ )" + cs inline "(#1.tag_ == __std_core._maybe_Tag.Nothing ? default(##1) : #1.@value)" + js inline "(#1==null ? null : #1.value)" + +// Transform a `:null` type to a `:maybe` type. Note that it is not +// always the case that `id(x) == maybe(null(x))` (e.g. when `x = Just(Nothing)`). +pub extern null/maybe( n : null ) : maybe + c inline "(kk_datatype_is_null(#1) ? kk_std_core_types__new_Nothing(kk_context()) : kk_std_core_types__new_Just(kk_datatype_box(#1),kk_context()))" + cs inline "(EqualityComparer<##1>.Default.Equals(#1,default(##1)) ? __std_core._maybe<##1>.Nothing_ : new __std_core._maybe<##1>(#1))" + js inline "(#1==null ? $std_core_types.Nothing : $std_core_types.Just(#1))" + +// Cast a integer that is zero to a null +pub fun int/null( i : int ) : null + i.maybe.null + +// Cast an empty string a null +pub fun string/null( s : string ) : null + s.maybe.null + +// Cast a boolean `False` to null +pub fun bool/null( b : bool ) : null<()> + if b then Just(()).null else Nothing.null + +// val null-const : forall null = null(Nothing) diff --git a/lib/uv/event-loop.kk b/lib/uv/event-loop.kk new file mode 100644 index 000000000..0f3dbaa69 --- /dev/null +++ b/lib/uv/event-loop.kk @@ -0,0 +1,73 @@ + +module uv/event-loop + +pub import std/time/duration +pub import std/time/timestamp +pub import uv/timer +pub import std/async/async +import std/num/int32 +import uv/utils + +extern import + c { conan="libuv[>=1.47.0]"; vcpkg="libuv"; library="uv" } + +extern import + c file "inline/event-loop.c" + +// Sets a timeout for libuv / javascript / C# event loops +pub extern set-timeout( cb : () -> io-noexn (), ms : int32 ) : io-noexn any + js "setTimeout" + c "kk_set_timeout" + +// Clears a timeout for libuv / javascript / C# event loops +pub extern clear-timeout( tid : any) : io-noexn () + js "clearTimeout" + c "kk_clear_timeout" + +// Runs an async action on the default uv event loop, and exceptions at the top level exit the async loop. +pub fun default-async-uv(action: () -> a): io a + val result = ref(Error(Exception("Unreachable", ExnInternal("Unreachable")))) + val _ = + with default-event-loop + with @default-async + result := try(action) + (!result).untry + +// Handles a uv loop +pub fun default-event-loop(action) + if host() == "libc" then + handle-loop(action) + else // TODO: Support event loop on other platforms + action() + +// Configures uv with Koka's allocators when importing this file +val @initialize = init-uv-alloc() + +// Runs a UV loop +fun handle-loop(action) + init-loop() + val res = action() + run-loop() + close-loop() + res + +// Runs a uv loop (or the emscripten loop on wasm) +extern run-loop(): io-noexn () + c "kk_async_loop_run" + js inline "" + +// Initializes a uv loop +extern init-loop(): io-noexn () + c "kk_async_loop_init" + js inline "" + +// initializes only the allocators (not an event loop) +// needed for some file operations which alloc - kk_uv_fs_mkdtemp, kk_uv_fs_mkstemp +pub extern init-uv-alloc(): () + c "kk_async_alloc_init" + js inline "" + +// Closes a uv loop +extern close-loop(): io-noexn () + c "kk_async_loop_close" + js inline "" diff --git a/lib/uv/inline/event-loop.c b/lib/uv/inline/event-loop.c new file mode 100644 index 000000000..9d2bab390 --- /dev/null +++ b/lib/uv/inline/event-loop.c @@ -0,0 +1,123 @@ +#ifdef __EMSCRIPTEN__ +#include +#include +////////////////////////////////////////////////////// +// Event Loop for Emscripten +////////////////////////////////////////////////////// + +void one_iter() { + // Can do a render loop to the screen here, etc. (this is the tick..) + // puts("one iteration"); + return; +} +void kk_emscripten_loop_run(kk_context_t* _ctx){ + emscripten_set_main_loop(one_iter, 0, true); +} +#else + +////////////////////////////////////////////////////// +// Allocators +////////////////////////////////////////////////////// +static inline void* kk_malloc_ctx(size_t size) { + return kk_malloc(size, kk_get_context()); +} + +static inline void* kk_realloc_ctx(void* p, size_t size) { + return kk_realloc(p, size, kk_get_context()); +} + +static inline void* kk_calloc_ctx(size_t count, size_t size) { + void* p = kk_malloc(count*size, kk_get_context()); + kk_memset(p, 0, count*size); + return p; +} + +static inline void kk_free_ctx(void* p) { + kk_free(p, kk_get_context()); +} + +static inline void kk_uv_alloc_init(kk_context_t* _ctx){ + uv_replace_allocator(kk_malloc_ctx, kk_realloc_ctx, kk_calloc_ctx, kk_free_ctx); +} + +////////////////////////////////////////////////////// +// UV Event Loop +////////////////////////////////////////////////////// +static void kk_uv_loop_init(kk_context_t* _ctx) { + uv_loop_t* loop = kk_malloc(sizeof(uv_loop_t), kk_context()); + kk_set_uv_loop(loop); // Set thread local loop + uv_loop_init(loop); +} + +void kk_uv_loop_run(kk_context_t* _ctx){ + // Run the event loop after the initial startup of the program + int ret = uv_run(uvloop(), UV_RUN_DEFAULT); + if (ret != 0){ + kk_warning_message("Event loop closed with status %s\n", uv_err_name(ret)); + } +} + +static void kk_uv_loop_close(kk_context_t* _ctx) { + int ret = uv_loop_close(uvloop()); + if (ret != 0) { + kk_warning_message("Event loop closed %s\n", uv_err_name(ret)); + } + kk_free(uvloop(), _ctx); +} + +#endif + +////////////////////////////////////////////////////// +// Generic APIs for both emscripten and UV +////////////////////////////////////////////////////// +static inline void kk_async_alloc_init(kk_context_t* _ctx){ + #if __EMSCRIPTEN__ + return; + #else + return kk_uv_alloc_init(_ctx); + #endif +} + +static void kk_async_loop_init(kk_context_t* _ctx) { + #if __EMSCRIPTEN__ + return; + #else + return kk_uv_loop_init(_ctx); + #endif +} + +void kk_async_loop_run(kk_context_t* _ctx){ + // Run the event loop after the initial startup of the program + #if __EMSCRIPTEN__ + return kk_emscripten_loop_run(_ctx); + #else + return kk_uv_loop_run(_ctx); + #endif + +} + +static void kk_async_loop_close(kk_context_t* _ctx) { + #if __EMSCRIPTEN__ + return; + #else + return kk_uv_loop_close(_ctx); + #endif +} + +////////////////////////////////////////////////////// +// Event Scheduling APIs +////////////////////////////////////////////////////// +kk_box_t kk_set_timeout(kk_function_t cb, int64_t time, kk_context_t* _ctx) { + kk_uv_timer__timer t = kk_uv_timer_timer_init(_ctx); + if (!kk_std_core_exn__is_Ok(kk_uv_timer_start(t, time, 0, cb, _ctx), _ctx)) { + kk_fatal_error(EFAULT, "Failed to start timeout timer"); + } + return kk_uv_timer__timer_box(t, _ctx); +} + +kk_unit_t kk_clear_timeout(kk_box_t boxed_timer, kk_context_t* _ctx) { + kk_uv_timer__timer timer = kk_uv_timer__timer_unbox(boxed_timer, KK_OWNED, _ctx); + kk_uv_timer_stop(timer, _ctx); + kk_uv_timer_release_callback(timer, _ctx); + return kk_Unit; +} diff --git a/lib/uv/inline/timer.c b/lib/uv/inline/timer.c new file mode 100644 index 000000000..73933da38 --- /dev/null +++ b/lib/uv/inline/timer.c @@ -0,0 +1,226 @@ + +#if __EMSCRIPTEN__ + +void kk_handle_free(void *p, kk_block_t *block, kk_context_t *_ctx) { + kk_wasm_timer_t* hndcb = (kk_wasm_timer_t*)p; + kk_free(hndcb, kk_context()); // Free the memory used for the callback and box + kk_free(block, kk_context()); // Free the block memory +} + +#define kk_tm_to_uv(hnd) kk_owned_handle_to_uv_handle(wasm_timer, hnd) + +EMSCRIPTEN_KEEPALIVE void wasm_timer_callback(kk_wasm_timer_t* timer_info){ + kk_context_t* _ctx = kk_get_context(); + kk_function_t callback = timer_info->callback; + if (timer_info->repeat_ms == 0) { + kk_unit_t res = kk_unit_callback(callback, kk_context()); + return; + } else { + callback = kk_function_dup(callback, kk_context()); + kk_unit_t res = kk_unit_callback(callback, kk_context()); + return; + } +} + +EM_JS(int, start_timer, (kk_wasm_timer_t* timer_info, int64_t timeout, int64_t repeat), { + function wasm_callback() { + _wasm_timer_callback(timer_info); + } + const n_repeat = Number(repeat); + const n_timeout = Number(timeout); + if (n_repeat != 0) { + return setInterval(wasm_callback, n_repeat); + } else { + return setTimeout(wasm_callback, n_timeout); + } +}); + +EM_JS(void, stop_timer, (int timer, bool repeating), { + if (timer) { + if (repeating) { + clearInterval(timer); + } else { + clearTimeout(timer); + } + } +}); + +kk_uv_timer__timer kk_wasm_timer_init(kk_context_t* _ctx) { + kk_wasm_timer_t* timer_info = kk_malloc(sizeof(kk_wasm_timer_t), kk_context()); + kk_uv_timer__timer t = uv_handle_to_owned_kk_handle(timer_info, kk_handle_free, timer, Timer); + timer_info->callback = kk_function_null(kk_context()); + return t; +} + +kk_unit_t kk_wasm_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) { + kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); + if (kk_likely(!kk_function_is_null(timer_info->callback, kk_context()))) { + kk_function_drop(timer_info->callback, kk_context()); + } + kk_uv_timer__timer_drop(timer, kk_context()); + return kk_Unit; +} + +kk_unit_t kk_wasm_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { + kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); + if (kk_likely(timer_info->timer != 0)) { + stop_timer(timer_info->timer, timer_info->repeat_ms != 0); + } + return kk_Unit; +} + +kk_std_core_exn__error kk_wasm_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { + kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); + timer_info->callback = callback; + timer_info->repeat_ms = repeat; + timer_info->timer = start_timer(timer_info, timeout, repeat); + return kk_std_core_exn__new_Ok(kk_unit_box(kk_Unit), kk_context()); +} + +#else + +#define kk_tm_to_uv(hnd) kk_owned_handle_to_uv_handle(timer, hnd) + +// Initialize the timer handle +kk_uv_timer__timer kk_libuv_timer_init(kk_context_t* _ctx) { + kk_timer_t* handle = kk_malloc(sizeof(kk_timer_t), kk_context()); + handle->callback = kk_function_null(kk_context()); + // Wrap the uv / kk struct in a reference counted box value type + kk_uv_timer__timer t = uv_handle_to_owned_kk_handle(handle, kk_timer_free, timer, Timer); + uv_timer_init(uvloop(), (uv_timer_t*)handle); // Timer initialization never fails + return t; +} + +// Stop / pause the timer (doesn't clean up) - the timer can be restarted with the same callback with kk_libuv_timer_again +kk_unit_t kk_libuv_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { + uv_timer_t* uv_timer = (uv_timer_t*)kk_tm_to_uv(timer); + uv_timer_stop(uv_timer); + return kk_Unit; +} + +// Actually clean up the timer +// This drops the callback first in case it is holding onto the timer - as it does for uv/timer/timer() +kk_unit_t kk_libuv_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) { + kk_timer_t* kk_timer = kk_tm_to_uv(timer); + if (kk_likely(!kk_function_is_null(kk_timer->callback, kk_context()))) { + kk_function_drop(kk_timer->callback, kk_context()); + } + kk_uv_timer__timer_drop(timer, kk_context()); + return kk_Unit; +} + +// The uv callback for the timer +void kk_uv_timer_unit_callback(uv_timer_t* uv_timer) { + kk_context_t* _ctx = kk_get_context(); + kk_timer_t* kk_timer = (kk_timer_t*)uv_timer; + kk_function_t callback = kk_timer->callback; // Get the callback + if (uv_timer_get_repeat(uv_timer) == 0) { // If this is a one-shot timer, just call the callback + kk_unit_callback(callback, kk_context()); + return; + } else { // Otherwise, we need to dup the callback, as it will be called again + callback = kk_function_dup(callback, kk_context()); + kk_unit_callback(callback, kk_context()); + return; + } +} + +kk_std_core_exn__error kk_libuv_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { + kk_timer_t* uv_timer = kk_tm_to_uv(timer); + // TODO: Drop previous callback if any? + uv_timer->callback = callback; + int status = uv_timer_start((uv_timer_t*)uv_timer, kk_uv_timer_unit_callback, timeout, repeat); + // On error, report, and drop callback + kk_uv_check_err_drops(status, { + uv_timer->callback = kk_function_null(kk_context()); + kk_function_drop(callback, kk_context()); + }) +} + +kk_std_core_exn__error kk_libuv_timer_again(kk_uv_timer__timer timer, kk_context_t* _ctx) { + int status = uv_timer_again((uv_timer_t*)kk_tm_to_uv(timer)); + kk_uv_check(status) +} + +kk_unit_t kk_libuv_timer_set_repeat(kk_uv_timer__timer timer, int64_t repeat, kk_context_t* _ctx) { + uv_timer_set_repeat((uv_timer_t*)kk_tm_to_uv(timer), repeat); + return kk_Unit; +} + +int64_t kk_libuv_timer_get_repeat(kk_uv_timer__timer timer, kk_context_t* _ctx) { + uint64_t repeat = uv_timer_get_repeat((uv_timer_t*)kk_tm_to_uv(timer)); + return repeat; +} + +int64_t kk_libuv_timer_get_due_in(kk_uv_timer__timer timer, kk_context_t* _ctx) { + uint64_t due_in = uv_timer_get_due_in((uv_timer_t*)kk_tm_to_uv(timer)); + return due_in; +} +#endif + +////////////////////////////////////////////////////// +// Commonalities between emscripten / UV +// Patch over the differences in a few cases +////////////////////////////////////////////////////// +kk_uv_timer__timer kk_timer_init(kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return kk_wasm_timer_init(kk_context()); + #else + return kk_libuv_timer_init(kk_context()); + #endif +} + +kk_unit_t kk_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return kk_wasm_timer_stop(timer, kk_context()); + #else + return kk_libuv_timer_stop(timer, kk_context()); + #endif +} + +kk_unit_t kk_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return kk_wasm_timer_finish(timer, kk_context()); + #else + return kk_libuv_timer_finish(timer, kk_context()); + #endif +} + +kk_std_core_exn__error kk_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return kk_wasm_timer_start(timer, timeout, repeat, callback, kk_context()); + #else + return kk_libuv_timer_start(timer, timeout, repeat, callback, kk_context()); + #endif +} + +kk_std_core_exn__error kk_timer_again(kk_uv_timer__timer timer, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return kk_std_core_exn__new_Ok(kk_unit_box(kk_Unit), kk_context()); + #else + return kk_libuv_timer_again(timer, kk_context()); + #endif +} + +kk_unit_t kk_timer_set_repeat(kk_uv_timer__timer timer, int64_t repeat, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return kk_Unit; + #else + return kk_libuv_timer_set_repeat(timer, repeat, kk_context()); + #endif +} + +int64_t kk_timer_get_repeat(kk_uv_timer__timer timer, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return -1; + #else + return kk_libuv_timer_get_repeat(timer, kk_context()); + #endif +} + +int64_t kk_timer_get_due_in(kk_uv_timer__timer timer, kk_context_t* _ctx) { + #ifdef __EMSCRIPTEN__ + return -1; + #else + return kk_libuv_timer_get_due_in(timer, kk_context()); + #endif +} \ No newline at end of file diff --git a/lib/uv/inline/timer.h b/lib/uv/inline/timer.h new file mode 100644 index 000000000..8753ecad5 --- /dev/null +++ b/lib/uv/inline/timer.h @@ -0,0 +1,22 @@ +#ifdef __EMSCRIPTEN__ +#include + +////////////////////////////////////////////////////// +// Create the wrapper type +// For wasm, we need to keep extra information +// such as repeat time and the timer id from JS +////////////////////////////////////////////////////// +typedef struct kk_wasm_timer_s { + kk_function_t callback; + int64_t repeat_ms; + int timer; +} kk_wasm_timer_t; + +EMSCRIPTEN_KEEPALIVE void wasm_timer_callback(kk_wasm_timer_t* timer_info); +#else +#include +////////////////////////////////////////////////////// +// Create the UV wrapper type (see utils.h) +////////////////////////////////////////////////////// +kk_uv_handle(timer); +#endif diff --git a/lib/uv/inline/timer.js b/lib/uv/inline/timer.js new file mode 100644 index 000000000..4829707f5 --- /dev/null +++ b/lib/uv/inline/timer.js @@ -0,0 +1,26 @@ +function _init_timer(){ + return {}; +} +// Start the timer, either as a one-shot or repeating timer +function _start_timer(timer, ms, repeat, fcn){ + const rp = Number(repeat) + const msx = Number(ms) + if (rp != 0) { + timer.id = setInterval(fcn, rp); + timer.repeat = rp; + } else { + timer.id = setTimeout(fcn, msx); + } +} + +// Stop the timer +function _stop_timer(timer){ + if (timer.id) { + if (timer.repeat != 0) { + clearInterval(timer.id); + } else { + clearTimeout(timer.id); + } + timer.id = null; + } +} \ No newline at end of file diff --git a/lib/uv/inline/utils.c b/lib/uv/inline/utils.c new file mode 100644 index 000000000..903f49472 --- /dev/null +++ b/lib/uv/inline/utils.c @@ -0,0 +1,195 @@ +#ifdef __EMSCRIPTEN__ +// In the emscripten case, we just return OK for all status codes UV error codes don't apply +static kk_uv_utils__uv_status_code kk_uv_status_to_status_code(int32_t status, kk_context_t *_ctx) { + return kk_uv_utils_UV__OK; +} +#else + +// Set the thread local uv loop +void kk_set_uv_loop(uv_loop_t* loop) { + kk_uv_loop_default = loop; +} +// Get the thread local uv loop +uv_loop_t* uvloop() { + return kk_uv_loop_default; +} + +// Map a libuv status code to a Koka enum +static kk_uv_utils__uv_status_code kk_uv_status_to_status_code(int32_t status, kk_context_t *_ctx) { + switch (status) { + case 0: + return kk_uv_utils_UV__OK; + case UV_E2BIG: + return kk_uv_utils_UV__E2BIG; + case UV_EACCES: + return kk_uv_utils_UV__EACCES; + case UV_EADDRINUSE: + return kk_uv_utils_UV__EADDRINUSE; + case UV_EADDRNOTAVAIL: + return kk_uv_utils_UV__EADDRNOTAVAIL; + case UV_EAFNOSUPPORT: + return kk_uv_utils_UV__EAFNOSUPPORT; + case UV_EAGAIN: + return kk_uv_utils_UV__EAGAIN; + case UV_EAI_ADDRFAMILY: + return kk_uv_utils_UV__EAI__ADDRFAMILY; + case UV_EAI_AGAIN: + return kk_uv_utils_UV__EAI__AGAIN; + case UV_EAI_BADFLAGS: + return kk_uv_utils_UV__EAI__BADFLAGS; + case UV_EAI_BADHINTS: + return kk_uv_utils_UV__EAI__BADHINTS; + case UV_EAI_CANCELED: + return kk_uv_utils_UV__EAI__CANCELED; + case UV_EAI_FAIL: + return kk_uv_utils_UV__EAI__FAIL; + case UV_EAI_FAMILY: + return kk_uv_utils_UV__EAI__FAMILY; + case UV_EAI_MEMORY: + return kk_uv_utils_UV__EAI__MEMORY; + case UV_EAI_NODATA: + return kk_uv_utils_UV__EAI__NODATA; + case UV_EAI_NONAME: + return kk_uv_utils_UV__EAI__NONAME; + case UV_EAI_OVERFLOW: + return kk_uv_utils_UV__EAI__OVERFLOW; + case UV_EAI_PROTOCOL: + return kk_uv_utils_UV__EAI__PROTOCOL; + case UV_EAI_SERVICE: + return kk_uv_utils_UV__EAI__SERVICE; + case UV_EAI_SOCKTYPE: + return kk_uv_utils_UV__EAI__SOCKTYPE; + case UV_EALREADY: + return kk_uv_utils_UV__EALREADY; + case UV_EBADF: + return kk_uv_utils_UV__EBADF; + case UV_EBUSY: + return kk_uv_utils_UV__EBUSY; + case UV_ECANCELED: + return kk_uv_utils_UV__ECANCELED; + case UV_ECHARSET: + return kk_uv_utils_UV__ECHARSET; + case UV_ECONNABORTED: + return kk_uv_utils_UV__ECONNABORTED; + case UV_ECONNREFUSED: + return kk_uv_utils_UV__ECONNREFUSED; + case UV_ECONNRESET: + return kk_uv_utils_UV__ECONNRESET; + case UV_EDESTADDRREQ: + return kk_uv_utils_UV__EDESTADDRREQ; + case UV_EEXIST: + return kk_uv_utils_UV__EEXIST; + case UV_EFAULT: + return kk_uv_utils_UV__EFAULT; + case UV_EFBIG: + return kk_uv_utils_UV__EFBIG; + case UV_EHOSTUNREACH: + return kk_uv_utils_UV__EHOSTUNREACH; + case UV_EINTR: + return kk_uv_utils_UV__EINTR; + case UV_EINVAL: + return kk_uv_utils_UV__EINVAL; + case UV_EIO: + return kk_uv_utils_UV__EIO; + case UV_EISCONN: + return kk_uv_utils_UV__EISCONN; + case UV_EISDIR: + return kk_uv_utils_UV__EISDIR; + case UV_ELOOP: + return kk_uv_utils_UV__ELOOP; + case UV_EMFILE: + return kk_uv_utils_UV__EMFILE; + case UV_EMSGSIZE: + return kk_uv_utils_UV__EMSGSIZE; + case UV_ENAMETOOLONG: + return kk_uv_utils_UV__ENAMETOOLONG; + case UV_ENETDOWN: + return kk_uv_utils_UV__ENETDOWN; + case UV_ENETUNREACH: + return kk_uv_utils_UV__ENETUNREACH; + case UV_ENFILE: + return kk_uv_utils_UV__ENFILE; + case UV_ENOBUFS: + return kk_uv_utils_UV__ENOBUFS; + case UV_ENODEV: + return kk_uv_utils_UV__ENODEV; + case UV_ENOENT: + return kk_uv_utils_UV__ENOENT; + case UV_ENOMEM: + return kk_uv_utils_UV__ENOMEM; + case UV_ENONET: + return kk_uv_utils_UV__ENONET; + case UV_ENOPROTOOPT: + return kk_uv_utils_UV__ENOPROTOOPT; + case UV_ENOSPC: + return kk_uv_utils_UV__ENOSPC; + case UV_ENOSYS: + return kk_uv_utils_UV__ENOSYS; + case UV_ENOTCONN: + return kk_uv_utils_UV__ENOTCONN; + case UV_ENOTDIR: + return kk_uv_utils_UV__ENOTDIR; + case UV_ENOTEMPTY: + return kk_uv_utils_UV__ENOTEMPTY; + case UV_ENOTSOCK: + return kk_uv_utils_UV__ENOTSOCK; + case UV_ENOTSUP: + return kk_uv_utils_UV__ENOTSUP; + case UV_EOVERFLOW: + return kk_uv_utils_UV__EOVERFLOW; + case UV_EPERM: + return kk_uv_utils_UV__EPERM; + case UV_EPIPE: + return kk_uv_utils_UV__EPIPE; + case UV_EPROTO: + return kk_uv_utils_UV__EPROTO; + case UV_EPROTONOSUPPORT: + return kk_uv_utils_UV__EPROTONOSUPPORT; + case UV_EPROTOTYPE: + return kk_uv_utils_UV__EPROTOTYPE; + case UV_ERANGE: + return kk_uv_utils_UV__ERANGE; + case UV_EROFS: + return kk_uv_utils_UV__EROFS; + case UV_ESHUTDOWN: + return kk_uv_utils_UV__ESHUTDOWN; + case UV_ESPIPE: + return kk_uv_utils_UV__ESPIPE; + case UV_ESRCH: + return kk_uv_utils_UV__ESRCH; + case UV_ETIMEDOUT: + return kk_uv_utils_UV__ETIMEDOUT; + case UV_ETXTBSY: + return kk_uv_utils_UV__ETXTBSY; + case UV_EXDEV: + return kk_uv_utils_UV__EXDEV; + case UV_UNKNOWN: + return kk_uv_utils_UV__UNKNOWN; + case UV_EOF: + return kk_uv_utils_UV__EOF; + case UV_ENXIO: + return kk_uv_utils_UV__ENXIO; + case UV_EMLINK: + return kk_uv_utils_UV__EMLINK; + case UV_ENOTTY: + return kk_uv_utils_UV__ENOTTY; + case UV_EFTYPE: + return kk_uv_utils_UV__EFTYPE; + case UV_EILSEQ: + return kk_uv_utils_UV__EILSEQ; + case UV_ESOCKTNOSUPPORT: + return kk_uv_utils_UV__ESOCKTNOSUPPORT; + default: + return kk_uv_utils_UV__UNKNOWN; + // case UV_EUNACH: + // return kk_uv_utils_UV__EUNATCH; + } +} + +// Map a libuv status code to a Koka Error value with uv status code enum +kk_std_core_exn__error kk_uv_async_error_from_errno( int err, kk_context_t* _ctx ) { + kk_uv_utils__uv_status_code code = kk_uv_status_to_status_code(err, _ctx); + kk_string_t msg = kk_uv_utils_message(code, _ctx); + return kk_std_core_exn__new_Error( kk_std_core_exn__new_Exception( msg, kk_uv_utils__new_AsyncExn(kk_reuse_null, 0, code, _ctx), _ctx), _ctx ); +} +#endif diff --git a/lib/uv/inline/utils.h b/lib/uv/inline/utils.h new file mode 100644 index 000000000..a9a4ebb31 --- /dev/null +++ b/lib/uv/inline/utils.h @@ -0,0 +1,131 @@ +// Derive a handle type from a uv type name +#define kk_uv_handle_tp(uv_tp) kk_##uv_tp##_t + +// Convert to/from Koka boxed wrapper types, where the `internal` field is an `any` boxed & reference counted C pointer +#define kk_owned_handle_to_uv_handle(uv_tp, hndl) \ + ((kk_uv_handle_tp(uv_tp)*)kk_cptr_unbox_borrowed(hndl.internal, kk_context())) +// `mod` is the module name, `uv_tp` is the uv handle type +// `free_fn` is the function to handle freeing the uv handle. +#define uv_handle_to_owned_kk_handle(hndl, free_fn, mod, Kk_tp) \ + kk_uv_##mod##__new_##Kk_tp(kk_cptr_raw_box(&free_fn, (void*)hndl, kk_context()), kk_context()) +// Since the wrapper type is a value struct, we sometimes need to `box/unbox` them as well. +#define uv_handle_to_owned_kk_handle_box(hndl, free_fn, mod, kk_tp, Kk_tp) \ + kk_uv_##mod##__##kk_tp##_box(uv_handle_to_owned_kk_handle(hndl,free_fn,mod,Kk_tp), kk_context()) + +// Call a unit callback +static inline kk_unit_t kk_unit_callback(kk_function_t callback, kk_context_t* _ctx) { + return kk_function_call(kk_unit_t, (kk_function_t, kk_context_t*), callback, (callback, kk_context()), kk_context()); +} + +#ifdef __EMSCRIPTEN__ +#include +#else +#include + +// Thread local uv loop variable +static kk_decl_thread uv_loop_t* kk_uv_loop_default; +void kk_set_uv_loop(uv_loop_t* loop); +uv_loop_t* uvloop(); + +// UV Okay status code +#define UV_OK 0 +// Map a libuv status code to a Koka Error value with uv status code enum +kk_std_core_exn__error kk_uv_async_error_from_errno( int err, kk_context_t* ctx ); +// Call a callback with a uv status code wrapped in an Error or Ok (can't be a function due to the type of result not being known) +#define kk_uv_exn_callback(callback, result) \ + kk_function_call(kk_unit_t, (kk_function_t, kk_std_core_exn__error, kk_context_t*), callback, (callback, result, kk_context()), kk_context()); +// Call a callback with a uv ok wrapped in an Ok (can't be function due to the type of result not being known) +#define kk_uv_okay_callback(callback, result) \ + kk_uv_exn_callback(callback, kk_std_core_exn__new_Ok(result, kk_context())) +// Call a callback with a uv status code enum value (can't be static inline function due to kk_uv_utils__uv_status_code not being defined yet) +#define kk_uv_status_code_callback(callback, status) \ + kk_function_call(kk_unit_t, (kk_function_t, kk_uv_utils__uv_status_code, kk_context_t*), callback, (callback, kk_uv_utils_int_fs_status_code(status, _ctx), kk_context()), kk_context()); +// Call a callback with a uv error wrapped in an Error +static inline void kk_uv_error_callback(kk_function_t callback, int result, kk_context_t* _ctx) { + kk_uv_exn_callback(callback, kk_uv_async_error_from_errno(result, kk_context())); +} + +// Invariant: handles only have a single outstanding callback +// multiple concurrent calls using the same handle not supported +// TODO: Relax this restriction? +#define kk_uv_handle(uv_hnd_tp) \ + typedef struct { \ + /* The uv handle struct (embedded as first member) */ \ + uv_##uv_hnd_tp##_t handle; \ + /* + The Koka callback function + Needs to be dupped every time it is called, so that it always can be called again by libuv + */ \ + kk_function_t callback; \ + } kk_uv_handle_tp(uv_hnd_tp); \ + /* Handles freeing a kk_uv_handle struct (see definition below) */ \ + static inline void kk_##uv_hnd_tp##_free(void *p, kk_block_t *block, kk_context_t *_ctx) { \ + uv_handle_t *handle = (uv_handle_t *)p; \ + kk_uv_handle_tp(uv_hnd_tp)* hndcb = (kk_uv_handle_tp(uv_hnd_tp)*)handle; \ + /* the callback should have been cleaned up prior to this point */ \ + kk_assert_internal(kk_function_is_null(hndcb->callback, kk_context())); \ + /* block will be freed by kk_uv_handle_close_callback after uv has cleaned up its state */ \ + handle->data = block; \ + uv_close(handle, &kk_uv_handle_close_callback); \ + } // define the free function + + +// This handles actually freeing the memory when the uv_handle is closed +static inline void kk_uv_handle_close_callback(uv_handle_t* handle) { + kk_context_t* _ctx = kk_get_context(); + if (kk_likely(handle->data != NULL)) { + kk_free(handle->data, kk_context()); // Free the box memory + } + kk_free(handle, kk_context()); // Free the struct memory +} + +// TODO: Change all apis to return status code or return error, not a mix +// Decide which to use for sync errors versus callbacks + +// If the status is not OK, drop before returning the status code +#define kk_uv_check_status_drops(status, drops) \ + if (kk_unlikely(status < UV_OK)) { \ + do drops while (0); \ + } \ + return kk_uv_utils_int_fs_status_code(status, kk_context()); \ + +// Sometimes the return value is a file descriptor which is why this is a < UV_OK check instead of == UV_OK +#define kk_uv_check_return(err, result) \ + if (kk_unlikely(err < UV_OK)) { \ + return kk_uv_async_error_from_errno(err, kk_context()); \ + } else { \ + return kk_std_core_exn__new_Ok(result, kk_context()); \ + } + +// Typically used to clean up when an error occurs +#define kk_uv_check_return_err_drops(err, result, drops) \ + if (kk_unlikely(err < UV_OK)) { \ + do drops while (0); \ + return kk_uv_async_error_from_errno(err, kk_context()); \ + } else { \ + return kk_std_core_exn__new_Ok(result, kk_context()); \ + } + +// Typically used when cleaning up a handle +#define kk_uv_check_return_ok_drops(err, result, drops) \ + if (kk_unlikely(err < UV_OK)) { \ + return kk_uv_async_error_from_errno(err, kk_context()); \ + } else { \ + do drops while (0); \ + return kk_std_core_exn__new_Ok(result, kk_context()); \ + } + +// Check the uv status code and return a kk_std_core_exn__error Ok or Error +#define kk_uv_check(err) kk_uv_check_return(err, kk_unit_box(kk_Unit)) + +// Check the uv status code and return a kk_std_core_exn__error Ok or Error +// Dropping the references if it was an error +#define kk_uv_check_err_drops(err, drops) \ + kk_uv_check_return_err_drops(err, kk_unit_box(kk_Unit), drops) + +// Check the uv status code and return a kk_std_core_exn__error Ok or Error +// Dropping the references if the result is Okay +#define kk_uv_check_ok_drops(err, drops) \ + kk_uv_check_return_ok_drops(err, kk_unit_box(kk_Unit), drops) + +#endif diff --git a/lib/uv/timer.kk b/lib/uv/timer.kk new file mode 100644 index 000000000..9247fe1bc --- /dev/null +++ b/lib/uv/timer.kk @@ -0,0 +1,89 @@ + +module uv/timer +pub import std/time/duration +pub import std/time/timestamp +import std/num/ddouble +import std/num/int64 +import uv/utils + +extern import + c file "inline/timer" + js file "inline/timer.js" + +abstract struct timer + internal: any + +// Initialize a timer handle +pub extern timer-init(): io-noexn timer + c inline "kk_timer_init(kk_context())" + js inline "_init_timer()" + +// Start the timer. +// +// The parameters `timeout` and `repeat` are in milliseconds. +// If `timeout` is zero, the callback fires on the next event loop iteration. +// If `repeat` is non-zero +// The C backend calls the callback first after `timeout` milliseconds and then repeatedly after `repeat` milliseconds. +// JavaScript / WASM backend calls the callback repeatedly every `repeat` milliseconds +pub extern start(^t: timer, timeout: int64, repeat: int64, cb: () -> io-noexn ()): io-noexn error<()> + c "kk_timer_start" + js inline "_start_timer(#1,#2,#3,#4)" + +// Stop the timer. +pub extern stop(^t: timer): io-noexn () + c "kk_timer_stop" + js inline "_stop_timer(#1)" + +// Release the callback, which if it references the timer will allow it to be garbage collected. +pub extern release-callback(t: timer): io-noexn () + c "kk_timer_finish" + js inline "" + +// Stop the timer, and if it is repeating restart it using the repeat value as the timeout. +// If the timer has never been started before it returns UV_EINVAL +extern again(^t: timer): io-noexn error<()> + c "kk_timer_again" + js inline "" + +// Set the repeat interval value in milliseconds. +// +// The timer will be scheduled to run on the given interval, +// regardless of the callback execution duration, and will follow +// normal timer semantics in the case of a time-slice overrun. +// +// For example, if a 50ms repeating timer first runs for 17ms, +// it will be scheduled to run again 33ms later. If other tasks +// consume more than the 33ms following the first timer callback, +// then the next timer callback will run as soon as possible. +// +// NOTE: If the repeat value is set from a timer callback it does not immediately take effect. +// If the timer was non-repeating before, it will have been stopped. If it was repeating, +// then the old repeat value will have been used to schedule the next timeout +extern set-repeat(^t: timer, repeat: int64): io-noexn () + c "kk_timer_set_repeat" + js inline "" + +extern get-repeat(^t: timer): io-noexn int64 + c "kk_timer_get_repeat" + js inline "" + +// Get the timer due value or 0 if it has expired. -1 is returned on unsupported platforms +// The time is relative to uv_now() +extern get-due-in(^t: timer): io-noexn int64 + c "kk_timer_get_due_in" + js inline "" + +// Creates a timer that repeats every `d` duration and calls `f` with the timer as argument. +// +// The timer stops repeating when `f` returns `False`. +pub fun timer(d: duration, f: (timer) -> io-noexn bool): io timer + val ms = d.milli-seconds.int64 + val t = timer-init() + val res = t.start(ms, ms) fn() + if !f(t) then + t.stop() + t.release-callback() + () + match res + Ok() -> t + Error(exn) -> exn.throw-exn diff --git a/lib/uv/utils.kk b/lib/uv/utils.kk new file mode 100644 index 000000000..27f145276 --- /dev/null +++ b/lib/uv/utils.kk @@ -0,0 +1,203 @@ + +module uv/utils +import std/num/int32 + +extern import + c { conan="libuv[>=1.47.0]"; vcpkg="libuv"; library="uv" } + +extern import + c file "inline/utils" + +// The base uv handle type +pub value struct uv-handle { internal: any } + +// Exception info for uv errors +abstract extend type exception-info + pub con AsyncExn( status-code : uv-status-code ) + +// Throw a uv error if the status code is not uv_OK otherwise return `result` +pub inline fun result/untry(code: uv-status-code, result: a): exn a + if code.is-uv_OK then result + else throw-exn(Exception(code.message, AsyncExn(code))) // TODO: Errno refactoring + +// Throw a uv error if the status code is not uv_OK +pub inline fun unit/untry(code: uv-status-code): exn () + if code.is-uv_OK then () + else throw-exn(Exception(code.message, AsyncExn(code))) + +// Convert the integer representation of a status code to a uv-status-code +pub extern int/status-code(code: int32): uv-status-code + c "kk_uv_status_to_status_code" + cs inline "" + js inline "" + +// UV status codes +pub type uv-status-code + UV_OK + UV_E2BIG + UV_EACCES + UV_EADDRINUSE + UV_EADDRNOTAVAIL + UV_EAFNOSUPPORT + UV_EAGAIN + UV_EAI_ADDRFAMILY + UV_EAI_AGAIN + UV_EAI_BADFLAGS + UV_EAI_BADHINTS + UV_EAI_CANCELED + UV_EAI_FAIL + UV_EAI_FAMILY + UV_EAI_MEMORY + UV_EAI_NODATA + UV_EAI_NONAME + UV_EAI_OVERFLOW + UV_EAI_PROTOCOL + UV_EAI_SERVICE + UV_EAI_SOCKTYPE + UV_EALREADY + UV_EBADF + UV_EBUSY + UV_ECANCELED + UV_ECHARSET + UV_ECONNABORTED + UV_ECONNREFUSED + UV_ECONNRESET + UV_EDESTADDRREQ + UV_EEXIST + UV_EFAULT + UV_EFBIG + UV_EHOSTUNREACH + UV_EINTR + UV_EINVAL + UV_EIO + UV_EISCONN + UV_EISDIR + UV_ELOOP + UV_EMFILE + UV_EMSGSIZE + UV_ENAMETOOLONG + UV_ENETDOWN + UV_ENETUNREACH + UV_ENFILE + UV_ENOBUFS + UV_ENODEV + UV_ENOENT + UV_ENOMEM + UV_ENONET + UV_ENOPROTOOPT + UV_ENOSPC + UV_ENOSYS + UV_ENOTCONN + UV_ENOTDIR + UV_ENOTEMPTY + UV_ENOTSOCK + UV_ENOTSUP + UV_EOVERFLOW + UV_EPERM + UV_EPIPE + UV_EPROTO + UV_EPROTONOSUPPORT + UV_EPROTOTYPE + UV_ERANGE + UV_EROFS + UV_ESHUTDOWN + UV_ESPIPE + UV_ESRCH + UV_ETIMEDOUT + UV_ETXTBSY + UV_EXDEV + UV_UNKNOWN + UV_EOF + UV_ENXIO + UV_EMLINK + UV_ENOTTY + UV_EFTYPE + UV_EILSEQ + UV_ESOCKTNOSUPPORT + UV_EUNATCH + +// UV status code messages +pub fun message(code: uv-status-code): string + match code + UV_OK -> "no error" + UV_E2BIG -> "argument list too long" + UV_EACCES -> "permission denied" + UV_EADDRINUSE -> "address already in use" + UV_EADDRNOTAVAIL -> "address not available" + UV_EAFNOSUPPORT -> "address family not supported" + UV_EAGAIN -> "resource temporarily unavailable" + UV_EAI_ADDRFAMILY -> "address family not supported" + UV_EAI_AGAIN -> "temporary failure" + UV_EAI_BADFLAGS -> "bad ai_flags value" + UV_EAI_BADHINTS -> "invalid value for hints" + UV_EAI_CANCELED -> "request canceled" + UV_EAI_FAIL -> "permanent failure" + UV_EAI_FAMILY -> "ai_family not supported" + UV_EAI_MEMORY -> "out of memory" + UV_EAI_NODATA -> "no address" + UV_EAI_NONAME -> "unknown node or service" + UV_EAI_OVERFLOW -> "argument buffer overflow" + UV_EAI_PROTOCOL -> "resolved protocol is unknown" + UV_EAI_SERVICE -> "service not available for socket type" + UV_EAI_SOCKTYPE -> "socket type not supported" + UV_EALREADY -> "connection already in progress" + UV_EBADF -> "bad file descriptor" + UV_EBUSY -> "resource busy or locked" + UV_ECANCELED -> "operation canceled" + UV_ECHARSET -> "invalid Unicode character" + UV_ECONNABORTED -> "software caused connection abort" + UV_ECONNREFUSED -> "connection refused" + UV_ECONNRESET -> "connection reset by peer" + UV_EDESTADDRREQ -> "destination address required" + UV_EEXIST -> "file already exists" + UV_EFAULT -> "bad address in system call argument" + UV_EFBIG -> "file too large" + UV_EHOSTUNREACH -> "host is unreachable" + UV_EINTR -> "interrupted system call" + UV_EINVAL -> "invalid argument" + UV_EIO -> "i/o error" + UV_EISCONN -> "socket is already connected" + UV_EISDIR -> "illegal operation on a directory" + UV_ELOOP -> "too many symbolic links encountered" + UV_EMFILE -> "too many open files" + UV_EMSGSIZE -> "message too long" + UV_ENAMETOOLONG -> "name too long" + UV_ENETDOWN -> "network is down" + UV_ENETUNREACH -> "network is unreachable" + UV_ENFILE -> "file table overflow" + UV_ENOBUFS -> "no buffer space available" + UV_ENODEV -> "no such device" + UV_ENOENT -> "no such file or directory" + UV_ENOMEM -> "not enough memory" + UV_ENONET -> "machine is not on the network" + UV_ENOPROTOOPT -> "protocol not available" + UV_ENOSPC -> "no space left on device" + UV_ENOSYS -> "function not implemented" + UV_ENOTCONN -> "socket is not connected" + UV_ENOTDIR -> "not a directory" + UV_ENOTEMPTY -> "directory not empty" + UV_ENOTSOCK -> "socket operation on non-socket" + UV_ENOTSUP -> "operation not supported on socket" + UV_EOVERFLOW -> "value too large to be stored in data type" + UV_EPERM -> "operation not permitted" + UV_EPIPE -> "broken pipe" + UV_EPROTO -> "protocol error" + UV_EPROTONOSUPPORT -> "protocol not supported" + UV_EPROTOTYPE -> "protocol wrong type for socket" + UV_ERANGE -> "result too large" + UV_EROFS -> "read-only file system" + UV_ESHUTDOWN -> "cannot send after transport endpoint shutdown" + UV_ESPIPE -> "invalid seek" + UV_ESRCH -> "no such process" + UV_ETIMEDOUT -> "connection timed out" + UV_ETXTBSY -> "text file is busy" + UV_EXDEV -> "cross-device link not permitted" + UV_UNKNOWN -> "unknown error" + UV_EOF -> "end of file" + UV_ENXIO -> "no such device or address" + UV_EMLINK -> "too many links" + UV_ENOTTY -> "inappropriate ioctl for device" + UV_EFTYPE -> "inappropriate file type or format" + UV_EILSEQ -> "illegal byte sequence" + UV_ESOCKTNOSUPPORT -> "socket type not supported" + UV_EUNATCH -> "protocol driver not attached" diff --git a/samples/async/timer.kk b/samples/async/timer.kk new file mode 100644 index 000000000..a8c6493f4 --- /dev/null +++ b/samples/async/timer.kk @@ -0,0 +1,19 @@ +import std/async +import uv/event-loop +import std/time/duration +import std/time/timestamp + +fun main() + with default-async-uv + val res = try + val x = ref(0) + timer(1000.milli-seconds) fn(t) + "Hello From Timer".println + x := !x + 1 + if !x == 3 then False else True + wait(1.seconds) + "Before timeout".println + wait(5.seconds) + "Hello After Timeout".println + println("Done!") + \ No newline at end of file diff --git a/src/Compile/CodeGen.hs b/src/Compile/CodeGen.hs index a8031f24d..eb08aa536 100644 --- a/src/Compile/CodeGen.hs +++ b/src/Compile/CodeGen.hs @@ -149,7 +149,7 @@ codeGen term flags sequential newtypes borrowed kgamma gamma entry imported mod -- imported modules. newtypesAll = foldr1 newtypesCompose (map (extractNewtypes . modCore) (loadedModule loaded : loadedModules loaded)) in -} - codeGenC (modSourcePath mod) newtypes borrowed 0 {-unique-} + codeGenC (modSourcePath mod) newtypes borrowed imported 0 {-unique-} {--------------------------------------------------------------- @@ -269,10 +269,10 @@ codeGenJS term flags sequential entry outBase core C backend ---------------------------------------------------------------} -codeGenC :: FilePath -> Newtypes -> Borrowed -> Int +codeGenC :: FilePath -> Newtypes -> Borrowed -> [Module] -> Int -> Terminal -> Flags -> (IO () -> IO ()) -> Maybe (Name,Type) ->FilePath -> Core.Core -> IO Link -codeGenC sourceFile newtypes borrowed0 unique0 term flags sequential entry outBase core0 +codeGenC sourceFile newtypes borrowed0 imported unique0 term flags sequential entry outBase core0 = do let outC = outBase ++ ".c" outH = outBase ++ ".h" sourceDir = dirname sourceFile @@ -304,9 +304,11 @@ codeGenC sourceFile newtypes borrowed0 unique0 term flags sequential entry outBa when (showAsmC flags) (termInfo term (hdoc cdoc)) -- copy libraries - let cc = ccomp flags - eimports = externalImportsFromCore (target flags) bcore - clibs = clibsFromCore flags bcore + let importcores = map (fromJust . modCore) imported + cores = bcore:importcores + cc = ccomp flags + eimports = concatMap (externalImportsFromCore (target flags)) cores + clibs = concatMap (clibsFromCore flags) cores extraIncDirs <- concat <$> mapM (copyCLibrary term flags sequential cc (dirname outBase)) eimports -- return the C compilation and final link as a separate IO action to increase concurrency @@ -628,7 +630,7 @@ vcpkgCLibrary term flags sequential cc eimport clib pkg install rootDir libDir vcpkgCmd = do let packageDir = joinPaths [rootDir,"packages",pkg ++ "_" ++ vcpkgTriplet flags] pkgExist <- doesDirectoryExist packageDir - when (pkgExist) $ + when pkgExist $ termWarning term flags $ text "vcpkg" <+> clrSource (text pkg) <+> text "is installed but the library" <+> clrSource (text clib) <+> diff --git a/src/Compile/Options.hs b/src/Compile/Options.hs index 62e21107b..e19430248 100644 --- a/src/Compile/Options.hs +++ b/src/Compile/Options.hs @@ -1269,6 +1269,7 @@ ccFromPath flags path } emcc = (ccGcc name path False) { ccFlagsCompile = ccFlagsCompile gcc ++ ["-D__wasi__"], + ccFlagsLink = ccFlagsLink gcc ++ ["-sWASM_BIGINT=1"], ccFlagStack = (\stksize -> if stksize == 0 then [] else ["-s","TOTAL_STACK=" ++ show stksize]), ccFlagHeap = (\hpsize -> if hpsize == 0 then [] else ["-s","TOTAL_MEMORY=" ++ show hpsize]), ccTargetExe = (\out -> ["-o", out ++ targetExeExtension (target flags)]), From 3ca91262d8127d7e85c0c52a18700523e6cd4bd2 Mon Sep 17 00:00:00 2001 From: Tim Cuthbertson Date: Sat, 28 Feb 2026 15:28:36 +1100 Subject: [PATCH 3/6] uv timer: reduce API surface and simplify callback management --- lib/uv/inline/event-loop.c | 1 - lib/uv/inline/timer.c | 114 +++++++++---------------------------- lib/uv/timer.kk | 40 ------------- 3 files changed, 27 insertions(+), 128 deletions(-) diff --git a/lib/uv/inline/event-loop.c b/lib/uv/inline/event-loop.c index 9d2bab390..8d60a8e51 100644 --- a/lib/uv/inline/event-loop.c +++ b/lib/uv/inline/event-loop.c @@ -118,6 +118,5 @@ kk_box_t kk_set_timeout(kk_function_t cb, int64_t time, kk_context_t* _ctx) { kk_unit_t kk_clear_timeout(kk_box_t boxed_timer, kk_context_t* _ctx) { kk_uv_timer__timer timer = kk_uv_timer__timer_unbox(boxed_timer, KK_OWNED, _ctx); kk_uv_timer_stop(timer, _ctx); - kk_uv_timer_release_callback(timer, _ctx); return kk_Unit; } diff --git a/lib/uv/inline/timer.c b/lib/uv/inline/timer.c index 73933da38..f095a4328 100644 --- a/lib/uv/inline/timer.c +++ b/lib/uv/inline/timer.c @@ -13,6 +13,7 @@ EMSCRIPTEN_KEEPALIVE void wasm_timer_callback(kk_wasm_timer_t* timer_info){ kk_context_t* _ctx = kk_get_context(); kk_function_t callback = timer_info->callback; if (timer_info->repeat_ms == 0) { + timer_info->callback = kk_function_null(kk_context()); kk_unit_t res = kk_unit_callback(callback, kk_context()); return; } else { @@ -52,25 +53,25 @@ kk_uv_timer__timer kk_wasm_timer_init(kk_context_t* _ctx) { return t; } -kk_unit_t kk_wasm_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) { - kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); - if (kk_likely(!kk_function_is_null(timer_info->callback, kk_context()))) { - kk_function_drop(timer_info->callback, kk_context()); - } - kk_uv_timer__timer_drop(timer, kk_context()); - return kk_Unit; -} - kk_unit_t kk_wasm_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); if (kk_likely(timer_info->timer != 0)) { stop_timer(timer_info->timer, timer_info->repeat_ms != 0); } + if (kk_likely(!kk_function_is_null(timer_info->callback, kk_context()))) { + kk_function_drop(timer_info->callback, kk_context()); + timer_info->callback = kk_function_null(kk_context()); + } return kk_Unit; } kk_std_core_exn__error kk_wasm_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); + if (kk_unlikely(!kk_function_is_null(timer_info->callback, kk_context()))) { + // If there's already a callback, the timer is still busy on a previous request + kk_function_drop(callback, kk_context()); + return kk_uv_error_from_errno(UV_EBUSY, kk_context()); + } timer_info->callback = callback; timer_info->repeat_ms = repeat; timer_info->timer = start_timer(timer_info, timeout, repeat); @@ -91,21 +92,15 @@ kk_uv_timer__timer kk_libuv_timer_init(kk_context_t* _ctx) { return t; } -// Stop / pause the timer (doesn't clean up) - the timer can be restarted with the same callback with kk_libuv_timer_again +// Stop timer and remove callback - the timer can be reused with kk_libuv_timer_start kk_unit_t kk_libuv_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { - uv_timer_t* uv_timer = (uv_timer_t*)kk_tm_to_uv(timer); - uv_timer_stop(uv_timer); - return kk_Unit; -} - -// Actually clean up the timer -// This drops the callback first in case it is holding onto the timer - as it does for uv/timer/timer() -kk_unit_t kk_libuv_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) { kk_timer_t* kk_timer = kk_tm_to_uv(timer); + uv_timer_t* uv_timer = (uv_timer_t*)kk_timer; if (kk_likely(!kk_function_is_null(kk_timer->callback, kk_context()))) { kk_function_drop(kk_timer->callback, kk_context()); + kk_timer->callback = kk_function_null(kk_context()); } - kk_uv_timer__timer_drop(timer, kk_context()); + uv_timer_stop(uv_timer); return kk_Unit; } @@ -114,47 +109,32 @@ void kk_uv_timer_unit_callback(uv_timer_t* uv_timer) { kk_context_t* _ctx = kk_get_context(); kk_timer_t* kk_timer = (kk_timer_t*)uv_timer; kk_function_t callback = kk_timer->callback; // Get the callback - if (uv_timer_get_repeat(uv_timer) == 0) { // If this is a one-shot timer, just call the callback - kk_unit_callback(callback, kk_context()); - return; + if (uv_timer_get_repeat(uv_timer) == 0) { // If this is a one-shot timer, remove it + kk_timer->callback = kk_function_null(kk_context()); } else { // Otherwise, we need to dup the callback, as it will be called again callback = kk_function_dup(callback, kk_context()); - kk_unit_callback(callback, kk_context()); - return; } + kk_unit_callback(callback, kk_context()); } kk_std_core_exn__error kk_libuv_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { kk_timer_t* uv_timer = kk_tm_to_uv(timer); - // TODO: Drop previous callback if any? - uv_timer->callback = callback; - int status = uv_timer_start((uv_timer_t*)uv_timer, kk_uv_timer_unit_callback, timeout, repeat); - // On error, report, and drop callback + int status = UV_OK; + + if (kk_unlikely(!kk_function_is_null(uv_timer->callback, kk_context()))) { + // If there's already a callback, the timer is still busy with a previous request + status = UV_EBUSY; + } else { + uv_timer->callback = callback; + status = uv_timer_start((uv_timer_t*)uv_timer, kk_uv_timer_unit_callback, timeout, repeat); + } + kk_uv_check_err_drops(status, { uv_timer->callback = kk_function_null(kk_context()); kk_function_drop(callback, kk_context()); }) } -kk_std_core_exn__error kk_libuv_timer_again(kk_uv_timer__timer timer, kk_context_t* _ctx) { - int status = uv_timer_again((uv_timer_t*)kk_tm_to_uv(timer)); - kk_uv_check(status) -} - -kk_unit_t kk_libuv_timer_set_repeat(kk_uv_timer__timer timer, int64_t repeat, kk_context_t* _ctx) { - uv_timer_set_repeat((uv_timer_t*)kk_tm_to_uv(timer), repeat); - return kk_Unit; -} - -int64_t kk_libuv_timer_get_repeat(kk_uv_timer__timer timer, kk_context_t* _ctx) { - uint64_t repeat = uv_timer_get_repeat((uv_timer_t*)kk_tm_to_uv(timer)); - return repeat; -} - -int64_t kk_libuv_timer_get_due_in(kk_uv_timer__timer timer, kk_context_t* _ctx) { - uint64_t due_in = uv_timer_get_due_in((uv_timer_t*)kk_tm_to_uv(timer)); - return due_in; -} #endif ////////////////////////////////////////////////////// @@ -177,14 +157,6 @@ kk_unit_t kk_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { #endif } -kk_unit_t kk_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) { - #ifdef __EMSCRIPTEN__ - return kk_wasm_timer_finish(timer, kk_context()); - #else - return kk_libuv_timer_finish(timer, kk_context()); - #endif -} - kk_std_core_exn__error kk_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { #ifdef __EMSCRIPTEN__ return kk_wasm_timer_start(timer, timeout, repeat, callback, kk_context()); @@ -192,35 +164,3 @@ kk_std_core_exn__error kk_timer_start(kk_uv_timer__timer timer, int64_t timeout, return kk_libuv_timer_start(timer, timeout, repeat, callback, kk_context()); #endif } - -kk_std_core_exn__error kk_timer_again(kk_uv_timer__timer timer, kk_context_t* _ctx) { - #ifdef __EMSCRIPTEN__ - return kk_std_core_exn__new_Ok(kk_unit_box(kk_Unit), kk_context()); - #else - return kk_libuv_timer_again(timer, kk_context()); - #endif -} - -kk_unit_t kk_timer_set_repeat(kk_uv_timer__timer timer, int64_t repeat, kk_context_t* _ctx) { - #ifdef __EMSCRIPTEN__ - return kk_Unit; - #else - return kk_libuv_timer_set_repeat(timer, repeat, kk_context()); - #endif -} - -int64_t kk_timer_get_repeat(kk_uv_timer__timer timer, kk_context_t* _ctx) { - #ifdef __EMSCRIPTEN__ - return -1; - #else - return kk_libuv_timer_get_repeat(timer, kk_context()); - #endif -} - -int64_t kk_timer_get_due_in(kk_uv_timer__timer timer, kk_context_t* _ctx) { - #ifdef __EMSCRIPTEN__ - return -1; - #else - return kk_libuv_timer_get_due_in(timer, kk_context()); - #endif -} \ No newline at end of file diff --git a/lib/uv/timer.kk b/lib/uv/timer.kk index 9247fe1bc..21846a641 100644 --- a/lib/uv/timer.kk +++ b/lib/uv/timer.kk @@ -34,45 +34,6 @@ pub extern stop(^t: timer): io-noexn () c "kk_timer_stop" js inline "_stop_timer(#1)" -// Release the callback, which if it references the timer will allow it to be garbage collected. -pub extern release-callback(t: timer): io-noexn () - c "kk_timer_finish" - js inline "" - -// Stop the timer, and if it is repeating restart it using the repeat value as the timeout. -// If the timer has never been started before it returns UV_EINVAL -extern again(^t: timer): io-noexn error<()> - c "kk_timer_again" - js inline "" - -// Set the repeat interval value in milliseconds. -// -// The timer will be scheduled to run on the given interval, -// regardless of the callback execution duration, and will follow -// normal timer semantics in the case of a time-slice overrun. -// -// For example, if a 50ms repeating timer first runs for 17ms, -// it will be scheduled to run again 33ms later. If other tasks -// consume more than the 33ms following the first timer callback, -// then the next timer callback will run as soon as possible. -// -// NOTE: If the repeat value is set from a timer callback it does not immediately take effect. -// If the timer was non-repeating before, it will have been stopped. If it was repeating, -// then the old repeat value will have been used to schedule the next timeout -extern set-repeat(^t: timer, repeat: int64): io-noexn () - c "kk_timer_set_repeat" - js inline "" - -extern get-repeat(^t: timer): io-noexn int64 - c "kk_timer_get_repeat" - js inline "" - -// Get the timer due value or 0 if it has expired. -1 is returned on unsupported platforms -// The time is relative to uv_now() -extern get-due-in(^t: timer): io-noexn int64 - c "kk_timer_get_due_in" - js inline "" - // Creates a timer that repeats every `d` duration and calls `f` with the timer as argument. // // The timer stops repeating when `f` returns `False`. @@ -82,7 +43,6 @@ pub fun timer(d: duration, f: (timer) -> io-noexn bool): io timer val res = t.start(ms, ms) fn() if !f(t) then t.stop() - t.release-callback() () match res Ok() -> t From 6fe77b6039e81b12835d047c3747c58ec9b90539 Mon Sep 17 00:00:00 2001 From: Tim Cuthbertson Date: Sat, 28 Feb 2026 15:36:23 +1100 Subject: [PATCH 4/6] cleanup uv/utils --- lib/uv/inline/timer.c | 29 +++++++----- lib/uv/inline/utils.c | 2 +- lib/uv/inline/utils.h | 107 +++++++++--------------------------------- 3 files changed, 39 insertions(+), 99 deletions(-) diff --git a/lib/uv/inline/timer.c b/lib/uv/inline/timer.c index f095a4328..69d322295 100644 --- a/lib/uv/inline/timer.c +++ b/lib/uv/inline/timer.c @@ -7,7 +7,7 @@ void kk_handle_free(void *p, kk_block_t *block, kk_context_t *_ctx) { kk_free(block, kk_context()); // Free the block memory } -#define kk_tm_to_uv(hnd) kk_owned_handle_to_uv_handle(wasm_timer, hnd) +#define kk_tm_borrow_internal(hnd) kk_borrow_internal_as(wasm_timer, hnd) EMSCRIPTEN_KEEPALIVE void wasm_timer_callback(kk_wasm_timer_t* timer_info){ kk_context_t* _ctx = kk_get_context(); @@ -48,13 +48,14 @@ EM_JS(void, stop_timer, (int timer, bool repeating), { kk_uv_timer__timer kk_wasm_timer_init(kk_context_t* _ctx) { kk_wasm_timer_t* timer_info = kk_malloc(sizeof(kk_wasm_timer_t), kk_context()); - kk_uv_timer__timer t = uv_handle_to_owned_kk_handle(timer_info, kk_handle_free, timer, Timer); + kk_box_t timer_box = kk_cptr_raw_box(&kk_handle_free, (void*)timer_info, kk_context()); + kk_uv_timer__timer t = kk_uv_timer__new_Timer(timer_box, kk_context()); timer_info->callback = kk_function_null(kk_context()); return t; } kk_unit_t kk_wasm_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { - kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); + kk_wasm_timer_t* timer_info = kk_tm_borrow_internal(timer); if (kk_likely(timer_info->timer != 0)) { stop_timer(timer_info->timer, timer_info->repeat_ms != 0); } @@ -66,7 +67,7 @@ kk_unit_t kk_wasm_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { } kk_std_core_exn__error kk_wasm_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { - kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer); + kk_wasm_timer_t* timer_info = kk_tm_borrow_internal(timer); if (kk_unlikely(!kk_function_is_null(timer_info->callback, kk_context()))) { // If there's already a callback, the timer is still busy on a previous request kk_function_drop(callback, kk_context()); @@ -80,27 +81,26 @@ kk_std_core_exn__error kk_wasm_timer_start(kk_uv_timer__timer timer, int64_t tim #else -#define kk_tm_to_uv(hnd) kk_owned_handle_to_uv_handle(timer, hnd) +#define kk_tm_borrow_internal(hnd) kk_borrow_internal_as(timer, hnd) // Initialize the timer handle kk_uv_timer__timer kk_libuv_timer_init(kk_context_t* _ctx) { kk_timer_t* handle = kk_malloc(sizeof(kk_timer_t), kk_context()); handle->callback = kk_function_null(kk_context()); // Wrap the uv / kk struct in a reference counted box value type - kk_uv_timer__timer t = uv_handle_to_owned_kk_handle(handle, kk_timer_free, timer, Timer); - uv_timer_init(uvloop(), (uv_timer_t*)handle); // Timer initialization never fails + kk_uv_timer__timer t = kk_uv_timer__new_Timer(kk_timer_box(handle, _ctx), _ctx); + uv_timer_init(uvloop(), &handle->uv); // Timer initialization never fails return t; } // Stop timer and remove callback - the timer can be reused with kk_libuv_timer_start kk_unit_t kk_libuv_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) { - kk_timer_t* kk_timer = kk_tm_to_uv(timer); - uv_timer_t* uv_timer = (uv_timer_t*)kk_timer; + kk_timer_t* kk_timer = kk_tm_borrow_internal(timer); if (kk_likely(!kk_function_is_null(kk_timer->callback, kk_context()))) { kk_function_drop(kk_timer->callback, kk_context()); kk_timer->callback = kk_function_null(kk_context()); } - uv_timer_stop(uv_timer); + uv_timer_stop(&kk_timer->uv); return kk_Unit; } @@ -118,7 +118,7 @@ void kk_uv_timer_unit_callback(uv_timer_t* uv_timer) { } kk_std_core_exn__error kk_libuv_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) { - kk_timer_t* uv_timer = kk_tm_to_uv(timer); + kk_timer_t* uv_timer = kk_tm_borrow_internal(timer); int status = UV_OK; if (kk_unlikely(!kk_function_is_null(uv_timer->callback, kk_context()))) { @@ -129,10 +129,13 @@ kk_std_core_exn__error kk_libuv_timer_start(kk_uv_timer__timer timer, int64_t ti status = uv_timer_start((uv_timer_t*)uv_timer, kk_uv_timer_unit_callback, timeout, repeat); } - kk_uv_check_err_drops(status, { + if (status != UV_OK) { uv_timer->callback = kk_function_null(kk_context()); kk_function_drop(callback, kk_context()); - }) + return kk_uv_error_from_errno(status, kk_context()); + } else { + return kk_std_core_exn__new_Ok(kk_unit_box(kk_Unit), kk_context()); + } } #endif diff --git a/lib/uv/inline/utils.c b/lib/uv/inline/utils.c index 903f49472..b08a86c8f 100644 --- a/lib/uv/inline/utils.c +++ b/lib/uv/inline/utils.c @@ -187,7 +187,7 @@ static kk_uv_utils__uv_status_code kk_uv_status_to_status_code(int32_t status, k } // Map a libuv status code to a Koka Error value with uv status code enum -kk_std_core_exn__error kk_uv_async_error_from_errno( int err, kk_context_t* _ctx ) { +kk_std_core_exn__error kk_uv_error_from_errno( int err, kk_context_t* _ctx ) { kk_uv_utils__uv_status_code code = kk_uv_status_to_status_code(err, _ctx); kk_string_t msg = kk_uv_utils_message(code, _ctx); return kk_std_core_exn__new_Error( kk_std_core_exn__new_Exception( msg, kk_uv_utils__new_AsyncExn(kk_reuse_null, 0, code, _ctx), _ctx), _ctx ); diff --git a/lib/uv/inline/utils.h b/lib/uv/inline/utils.h index a9a4ebb31..4f46ec25e 100644 --- a/lib/uv/inline/utils.h +++ b/lib/uv/inline/utils.h @@ -1,16 +1,7 @@ -// Derive a handle type from a uv type name -#define kk_uv_handle_tp(uv_tp) kk_##uv_tp##_t - -// Convert to/from Koka boxed wrapper types, where the `internal` field is an `any` boxed & reference counted C pointer -#define kk_owned_handle_to_uv_handle(uv_tp, hndl) \ - ((kk_uv_handle_tp(uv_tp)*)kk_cptr_unbox_borrowed(hndl.internal, kk_context())) -// `mod` is the module name, `uv_tp` is the uv handle type -// `free_fn` is the function to handle freeing the uv handle. -#define uv_handle_to_owned_kk_handle(hndl, free_fn, mod, Kk_tp) \ - kk_uv_##mod##__new_##Kk_tp(kk_cptr_raw_box(&free_fn, (void*)hndl, kk_context()), kk_context()) -// Since the wrapper type is a value struct, we sometimes need to `box/unbox` them as well. -#define uv_handle_to_owned_kk_handle_box(hndl, free_fn, mod, kk_tp, Kk_tp) \ - kk_uv_##mod##__##kk_tp##_box(uv_handle_to_owned_kk_handle(hndl,free_fn,mod,Kk_tp), kk_context()) +// Borrow the `internal` struct of a koka wrapper, +// which should be a pointer to a kk_uv_* wrapper struct +#define kk_borrow_internal_as(uv_tp, hndl) \ + uv_##uv_tp##_as_kk((uv_##uv_tp##_t*)kk_cptr_unbox_borrowed(hndl.internal, kk_context()), kk_context()) // Call a unit callback static inline kk_unit_t kk_unit_callback(kk_function_t callback, kk_context_t* _ctx) { @@ -30,44 +21,39 @@ uv_loop_t* uvloop(); // UV Okay status code #define UV_OK 0 // Map a libuv status code to a Koka Error value with uv status code enum -kk_std_core_exn__error kk_uv_async_error_from_errno( int err, kk_context_t* ctx ); -// Call a callback with a uv status code wrapped in an Error or Ok (can't be a function due to the type of result not being known) -#define kk_uv_exn_callback(callback, result) \ - kk_function_call(kk_unit_t, (kk_function_t, kk_std_core_exn__error, kk_context_t*), callback, (callback, result, kk_context()), kk_context()); -// Call a callback with a uv ok wrapped in an Ok (can't be function due to the type of result not being known) -#define kk_uv_okay_callback(callback, result) \ - kk_uv_exn_callback(callback, kk_std_core_exn__new_Ok(result, kk_context())) -// Call a callback with a uv status code enum value (can't be static inline function due to kk_uv_utils__uv_status_code not being defined yet) -#define kk_uv_status_code_callback(callback, status) \ - kk_function_call(kk_unit_t, (kk_function_t, kk_uv_utils__uv_status_code, kk_context_t*), callback, (callback, kk_uv_utils_int_fs_status_code(status, _ctx), kk_context()), kk_context()); -// Call a callback with a uv error wrapped in an Error -static inline void kk_uv_error_callback(kk_function_t callback, int result, kk_context_t* _ctx) { - kk_uv_exn_callback(callback, kk_uv_async_error_from_errno(result, kk_context())); -} +kk_std_core_exn__error kk_uv_error_from_errno( int err, kk_context_t* ctx ); // Invariant: handles only have a single outstanding callback // multiple concurrent calls using the same handle not supported // TODO: Relax this restriction? #define kk_uv_handle(uv_hnd_tp) \ - typedef struct { \ + typedef struct kk_##uv_hnd_tp##_s { \ /* The uv handle struct (embedded as first member) */ \ - uv_##uv_hnd_tp##_t handle; \ + uv_##uv_hnd_tp##_t uv; \ /* The Koka callback function Needs to be dupped every time it is called, so that it always can be called again by libuv */ \ kk_function_t callback; \ - } kk_uv_handle_tp(uv_hnd_tp); \ + } kk_##uv_hnd_tp##_t; \ + /* get a pointer to the kk_* wrapper containing the given uv struct */ \ + static inline kk_##uv_hnd_tp##_t* uv_##uv_hnd_tp##_as_kk(void *p, kk_context_t *_ctx) { \ + return (kk_##uv_hnd_tp##_t *) (((char*)p) - offsetof(kk_##uv_hnd_tp##_t, uv)); \ + } \ /* Handles freeing a kk_uv_handle struct (see definition below) */ \ static inline void kk_##uv_hnd_tp##_free(void *p, kk_block_t *block, kk_context_t *_ctx) { \ - uv_handle_t *handle = (uv_handle_t *)p; \ - kk_uv_handle_tp(uv_hnd_tp)* hndcb = (kk_uv_handle_tp(uv_hnd_tp)*)handle; \ + kk_##uv_hnd_tp##_t* kk_handle = (kk_##uv_hnd_tp##_t*)p; \ + uv_handle_t *uv_handle = (uv_handle_t *)(&kk_handle->uv); \ /* the callback should have been cleaned up prior to this point */ \ - kk_assert_internal(kk_function_is_null(hndcb->callback, kk_context())); \ + kk_assert_internal(kk_function_is_null(kk_handle->callback, kk_context())); \ /* block will be freed by kk_uv_handle_close_callback after uv has cleaned up its state */ \ - handle->data = block; \ - uv_close(handle, &kk_uv_handle_close_callback); \ - } // define the free function + uv_handle->data = block; \ + uv_close(uv_handle, &kk_uv_handle_close_callback); \ + } \ + /* box a C struct into an `any` koka type */ \ + static inline kk_box_t kk_##uv_hnd_tp##_box(kk_##uv_hnd_tp##_t * hnd, kk_context_t *_ctx) { \ + return kk_cptr_raw_box(&kk_##uv_hnd_tp##_free, (void*)hnd, _ctx); \ + } // This handles actually freeing the memory when the uv_handle is closed @@ -79,53 +65,4 @@ static inline void kk_uv_handle_close_callback(uv_handle_t* handle) { kk_free(handle, kk_context()); // Free the struct memory } -// TODO: Change all apis to return status code or return error, not a mix -// Decide which to use for sync errors versus callbacks - -// If the status is not OK, drop before returning the status code -#define kk_uv_check_status_drops(status, drops) \ - if (kk_unlikely(status < UV_OK)) { \ - do drops while (0); \ - } \ - return kk_uv_utils_int_fs_status_code(status, kk_context()); \ - -// Sometimes the return value is a file descriptor which is why this is a < UV_OK check instead of == UV_OK -#define kk_uv_check_return(err, result) \ - if (kk_unlikely(err < UV_OK)) { \ - return kk_uv_async_error_from_errno(err, kk_context()); \ - } else { \ - return kk_std_core_exn__new_Ok(result, kk_context()); \ - } - -// Typically used to clean up when an error occurs -#define kk_uv_check_return_err_drops(err, result, drops) \ - if (kk_unlikely(err < UV_OK)) { \ - do drops while (0); \ - return kk_uv_async_error_from_errno(err, kk_context()); \ - } else { \ - return kk_std_core_exn__new_Ok(result, kk_context()); \ - } - -// Typically used when cleaning up a handle -#define kk_uv_check_return_ok_drops(err, result, drops) \ - if (kk_unlikely(err < UV_OK)) { \ - return kk_uv_async_error_from_errno(err, kk_context()); \ - } else { \ - do drops while (0); \ - return kk_std_core_exn__new_Ok(result, kk_context()); \ - } - -// Check the uv status code and return a kk_std_core_exn__error Ok or Error -#define kk_uv_check(err) kk_uv_check_return(err, kk_unit_box(kk_Unit)) - -// Check the uv status code and return a kk_std_core_exn__error Ok or Error -// Dropping the references if it was an error -#define kk_uv_check_err_drops(err, drops) \ - kk_uv_check_return_err_drops(err, kk_unit_box(kk_Unit), drops) - -// Check the uv status code and return a kk_std_core_exn__error Ok or Error -// Dropping the references if the result is Okay -#define kk_uv_check_ok_drops(err, drops) \ - kk_uv_check_return_ok_drops(err, kk_unit_box(kk_Unit), drops) - #endif From 79efdd4c945391f03c0ef3e0352f5d2222ee83a1 Mon Sep 17 00:00:00 2001 From: Tim Cuthbertson Date: Sat, 28 Feb 2026 21:27:04 +1100 Subject: [PATCH 5/6] add uv-status-code/show impl --- lib/uv/utils.kk | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/lib/uv/utils.kk b/lib/uv/utils.kk index 27f145276..fe968ab3e 100644 --- a/lib/uv/utils.kk +++ b/lib/uv/utils.kk @@ -201,3 +201,88 @@ pub fun message(code: uv-status-code): string UV_EILSEQ -> "illegal byte sequence" UV_ESOCKTNOSUPPORT -> "socket type not supported" UV_EUNATCH -> "protocol driver not attached" + +pub fun show(code: uv-status-code): string + match code + UV_OK -> "UV_OK" + UV_E2BIG -> "UV_E2BIG" + UV_EACCES -> "UV_EACCES" + UV_EADDRINUSE -> "UV_EADDRINUSE" + UV_EADDRNOTAVAIL -> "UV_EADDRNOTAVAIL" + UV_EAFNOSUPPORT -> "UV_EAFNOSUPPORT" + UV_EAGAIN -> "UV_EAGAIN" + UV_EAI_ADDRFAMILY -> "UV_EAI_ADDRFAMILY" + UV_EAI_AGAIN -> "UV_EAI_AGAIN" + UV_EAI_BADFLAGS -> "UV_EAI_BADFLAGS" + UV_EAI_BADHINTS -> "UV_EAI_BADHINTS" + UV_EAI_CANCELED -> "UV_EAI_CANCELED" + UV_EAI_FAIL -> "UV_EAI_FAIL" + UV_EAI_FAMILY -> "UV_EAI_FAMILY" + UV_EAI_MEMORY -> "UV_EAI_MEMORY" + UV_EAI_NODATA -> "UV_EAI_NODATA" + UV_EAI_NONAME -> "UV_EAI_NONAME" + UV_EAI_OVERFLOW -> "UV_EAI_OVERFLOW" + UV_EAI_PROTOCOL -> "UV_EAI_PROTOCOL" + UV_EAI_SERVICE -> "UV_EAI_SERVICE" + UV_EAI_SOCKTYPE -> "UV_EAI_SOCKTYPE" + UV_EALREADY -> "UV_EALREADY" + UV_EBADF -> "UV_EBADF" + UV_EBUSY -> "UV_EBUSY" + UV_ECANCELED -> "UV_ECANCELED" + UV_ECHARSET -> "UV_ECHARSET" + UV_ECONNABORTED -> "UV_ECONNABORTED" + UV_ECONNREFUSED -> "UV_ECONNREFUSED" + UV_ECONNRESET -> "UV_ECONNRESET" + UV_EDESTADDRREQ -> "UV_EDESTADDRREQ" + UV_EEXIST -> "UV_EEXIST" + UV_EFAULT -> "UV_EFAULT" + UV_EFBIG -> "UV_EFBIG" + UV_EHOSTUNREACH -> "UV_EHOSTUNREACH" + UV_EINTR -> "UV_EINTR" + UV_EINVAL -> "UV_EINVAL" + UV_EIO -> "UV_EIO" + UV_EISCONN -> "UV_EISCONN" + UV_EISDIR -> "UV_EISDIR" + UV_ELOOP -> "UV_ELOOP" + UV_EMFILE -> "UV_EMFILE" + UV_EMSGSIZE -> "UV_EMSGSIZE" + UV_ENAMETOOLONG -> "UV_ENAMETOOLONG" + UV_ENETDOWN -> "UV_ENETDOWN" + UV_ENETUNREACH -> "UV_ENETUNREACH" + UV_ENFILE -> "UV_ENFILE" + UV_ENOBUFS -> "UV_ENOBUFS" + UV_ENODEV -> "UV_ENODEV" + UV_ENOENT -> "UV_ENOENT" + UV_ENOMEM -> "UV_ENOMEM" + UV_ENONET -> "UV_ENONET" + UV_ENOPROTOOPT -> "UV_ENOPROTOOPT" + UV_ENOSPC -> "UV_ENOSPC" + UV_ENOSYS -> "UV_ENOSYS" + UV_ENOTCONN -> "UV_ENOTCONN" + UV_ENOTDIR -> "UV_ENOTDIR" + UV_ENOTEMPTY -> "UV_ENOTEMPTY" + UV_ENOTSOCK -> "UV_ENOTSOCK" + UV_ENOTSUP -> "UV_ENOTSUP" + UV_EOVERFLOW -> "UV_EOVERFLOW" + UV_EPERM -> "UV_EPERM" + UV_EPIPE -> "UV_EPIPE" + UV_EPROTO -> "UV_EPROTO" + UV_EPROTONOSUPPORT -> "UV_EPROTONOSUPPORT" + UV_EPROTOTYPE -> "UV_EPROTOTYPE" + UV_ERANGE -> "UV_ERANGE" + UV_EROFS -> "UV_EROFS" + UV_ESHUTDOWN -> "UV_ESHUTDOWN" + UV_ESPIPE -> "UV_ESPIPE" + UV_ESRCH -> "UV_ESRCH" + UV_ETIMEDOUT -> "UV_ETIMEDOUT" + UV_ETXTBSY -> "UV_ETXTBSY" + UV_EXDEV -> "UV_EXDEV" + UV_UNKNOWN -> "UV_UNKNOWN" + UV_EOF -> "UV_EOF" + UV_ENXIO -> "UV_ENXIO" + UV_EMLINK -> "UV_EMLINK" + UV_ENOTTY -> "UV_ENOTTY" + UV_EFTYPE -> "UV_EFTYPE" + UV_EILSEQ -> "UV_EILSEQ" + UV_ESOCKTNOSUPPORT -> "UV_ESOCKTNOSUPPORT" + UV_EUNATCH -> "UV_EUNATCH" From df0c3ef4da47746626f98aa5d2b9cda7a08e5752 Mon Sep 17 00:00:00 2001 From: Tim Cuthbertson Date: Sun, 8 Mar 2026 13:07:59 +1100 Subject: [PATCH 6/6] report open handle counts and types when event-loop close reports UV_EBUSY --- lib/uv/event-loop.kk | 4 +++- lib/uv/inline/event-loop.c | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/lib/uv/event-loop.kk b/lib/uv/event-loop.kk index 0f3dbaa69..9aba80f1e 100644 --- a/lib/uv/event-loop.kk +++ b/lib/uv/event-loop.kk @@ -26,7 +26,9 @@ pub extern clear-timeout( tid : any) : io-noexn () // Runs an async action on the default uv event loop, and exceptions at the top level exit the async loop. pub fun default-async-uv(action: () -> a): io a - val result = ref(Error(Exception("Unreachable", ExnInternal("Unreachable")))) + val result = ref(Error(Exception( + "uv event loop returned without fully executing koka code (missing callback or deadlock?)" + , ExnInternal("Unreachable")))) val _ = with default-event-loop with @default-async diff --git a/lib/uv/inline/event-loop.c b/lib/uv/inline/event-loop.c index 8d60a8e51..6bd988575 100644 --- a/lib/uv/inline/event-loop.c +++ b/lib/uv/inline/event-loop.c @@ -57,10 +57,45 @@ void kk_uv_loop_run(kk_context_t* _ctx){ } } +static char* kk_uv_handle_type_str(uv_handle_t* handle) { + switch (handle->type) { + case UV_UNKNOWN_HANDLE: return "UNKNOWN"; + case UV_ASYNC: return "ASYNC"; + case UV_CHECK: return "CHECK"; + case UV_FS_EVENT: return "FS_EVENT"; + case UV_FS_POLL: return "FS_POLL"; + case UV_HANDLE: return "HANDLE"; + case UV_IDLE: return "IDLE"; + case UV_NAMED_PIPE: return "NAMED_PIPE"; + case UV_POLL: return "POLL"; + case UV_PREPARE: return "PREPARE"; + case UV_PROCESS: return "PROCESS"; + case UV_STREAM: return "STREAM"; + case UV_TCP: return "TCP"; + case UV_TIMER: return "TIMER"; + case UV_TTY: return "TTY"; + case UV_UDP: return "UDP"; + case UV_SIGNAL: return "SIGNAL"; + case UV_FILE: return "FILE"; + default: return "INVALID"; + } +} + +static void kk_uv_loop_walk_cb(uv_handle_t* handle, void* arg) { + const char* closing_msg = uv_is_closing(handle) ? " [CLOSING]" : ""; + const char* active_msg = uv_is_active(handle) ? " [ACTIVE]" : ""; + kk_warning_message(" - %s handle%s%s\n", kk_uv_handle_type_str(handle), active_msg, closing_msg); +} + static void kk_uv_loop_close(kk_context_t* _ctx) { int ret = uv_loop_close(uvloop()); if (ret != 0) { - kk_warning_message("Event loop closed %s\n", uv_err_name(ret)); + if (ret == UV_EBUSY) { + kk_warning_message("Event loop closed with open child handles:\n"); + uv_walk(uvloop(), kk_uv_loop_walk_cb, NULL); + } else { + kk_warning_message("Event loop close returned error: %s\n", uv_err_name(ret)); + } } kk_free(uvloop(), _ctx); }