Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ jobs:

bump-dev:
runs-on: ubuntu-latest
needs: [prepare, publish]
needs: [prepare, bump-version, publish]
steps:
- name: Generate app token
id: app-token
Expand All @@ -336,8 +336,14 @@ jobs:
- uses: actions/checkout@v4
with:
ref: ${{ needs.prepare.outputs.target_branch }}
fetch-depth: 0
token: ${{ steps.app-token.outputs.token }}

- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"

- name: Update to next dev version
env:
NEXT: ${{ needs.prepare.outputs.next_version }}
Expand All @@ -350,6 +356,9 @@ jobs:
next_version = os.environ["NEXT"]
next_pep440 = os.environ["NEXT_PEP440"]

# Strip -dev suffix for changelog heading
release_version = next_version.removesuffix("-dev")

Path("VERSION").write_text(next_version + "\n")

pyproject = Path("adapters/python/pyproject.toml")
Expand All @@ -362,12 +371,74 @@ jobs:
)
pyproject.write_text(content)

# Add empty changelog sections
changelogs = [
"server/CHANGELOG.md",
"cli/CHANGELOG.md",
"adapters/python/CHANGELOG.md",
]
for path in changelogs:
p = Path(path)
content = p.read_text()
p.write_text(f"## {release_version}\n\nNo changes.\n\n{content}")

- name: Commit and push
env:
NEXT: ${{ needs.prepare.outputs.next_version }}
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
git add VERSION adapters/python/pyproject.toml
git add VERSION adapters/python/pyproject.toml server/CHANGELOG.md cli/CHANGELOG.md adapters/python/CHANGELOG.md
git commit -m "Begin $NEXT development"
git push

- name: Create release branch (minor releases only)
if: needs.prepare.outputs.target_branch == 'main'
env:
VERSION: ${{ needs.prepare.outputs.version }}
RELEASE_SHA: ${{ needs.bump-version.outputs.release_sha }}
shell: bash
run: |
# Derive release branch name (e.g., 0.9.0 -> release/0.9)
MINOR="${VERSION%.*}"
BRANCH="release/$MINOR"

# Create release branch from the release commit
git checkout -b "$BRANCH" "$RELEASE_SHA"

# Compute patch dev version (e.g., 0.9.0 -> 0.9.1-dev)
PATCH_DEV="${MINOR}.1-dev"
PATCH_PEP440="${MINOR}.1.dev0"

python3 - "$PATCH_DEV" "$PATCH_PEP440" <<'PYEOF'
import re, sys
from pathlib import Path

next_version, next_pep440 = sys.argv[1], sys.argv[2]
release_version = next_version.removesuffix("-dev")

Path("VERSION").write_text(next_version + "\n")

pyproject = Path("adapters/python/pyproject.toml")
content = pyproject.read_text()
content = re.sub(
r'^version\s*=\s*"[^"]*"',
f'version = "{next_pep440}"',
content,
flags=re.MULTILINE,
)
pyproject.write_text(content)

# Add empty changelog sections
changelogs = [
"server/CHANGELOG.md",
"cli/CHANGELOG.md",
"adapters/python/CHANGELOG.md",
]
for path in changelogs:
p = Path(path)
content = p.read_text()
p.write_text(f"## {release_version}\n\nNo changes.\n\n{content}")
PYEOF

git add VERSION adapters/python/pyproject.toml server/CHANGELOG.md cli/CHANGELOG.md adapters/python/CHANGELOG.md
git commit -m "Begin $PATCH_DEV development"
git push -u origin "$BRANCH"
28 changes: 13 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

## Why Coflux?

- **Plain Python** — Workflows are regular Python functions with decorators. No DSLs, no YAML, no static DAGs.
- **Low latency** — Millisecond task startup using warm executor processes.
- **Real-time observability** — Watch workflows execute live in [Coflux Studio](https://studio.coflux.com), with graph visualisation, logs, and results.
- **Self-hosted** — You run the server; your data stays in your infrastructure.
- **Workspace inheritance** — Branch production into development workspaces and re-run individual steps with real data.
- **Plain Python**: workflows are regular Python functions with decorators - no DSLs, no YAML, no static DAGs.
- **Low latency**: millisecond task startup using warm executor processes.
- **Real-time observability**: watch workflows execute live using the CLI, or in [Coflux Studio](https://studio.coflux.com), with graph visualisation, logs, and results.
- **Self-hosted**: you run the server; your data stays in your infrastructure.
- **Workspace inheritance**: branch production into development workspaces and re-run individual steps with real data.

## Quick example

Expand Down Expand Up @@ -136,7 +136,7 @@ def map_reduce(n: int):
cf.log_info("Processing {count} items for {user}", count=42, user="alice")
```

### And more
### More features

- **Debouncing**: defer execution until a task stops being called (`defer=True`)
- **Recurrence**: automatically re-execute workflows for polling (`recurrent=True`)
Expand All @@ -158,19 +158,16 @@ Or download a binary from the [releases page](https://github.com/bitroot/coflux/
Use the CLI to start the server:

```bash
coflux server --no-auth --project myproject
coflux server --no-auth
```

Or run it with Docker:

```bash
docker run -p 7777:7777 ghcr.io/bitroot/coflux --no-auth --project myproject
```
Or [run it with Docker](https://docs.coflux.com/getting_started/server):

### 3. Create a workflow

```python
# myapp/workflows.py

import coflux as cf

@cf.task()
Expand All @@ -185,11 +182,12 @@ def hello(name: str):
### 4. Start a worker

```bash
pip install coflux
coflux worker --dev myapp.workflows
```

The `--dev` flag watches for code changes and automatically restarts the worker.
The worker attempts to automatically detect your Python environment. If you have [`uv`](https://docs.astral.sh/uv/) installed, the (dependency-less) `coflux` package is installed automatically. Otherwise, install it first with `pip install coflux`.

The `--dev` flag (equivalent to `--watch --register`) watches for code changes and automatically restarts the worker.

### 5. Submit a run

Expand All @@ -201,7 +199,7 @@ coflux submit myapp/hello '"world"'

### 6. Open Studio

Visit [studio.coflux.com](https://studio.coflux.com), enter your server address (`localhost:7777`), and watch your workflow execute in real time.
Visit [studio.coflux.com](https://studio.coflux.com) and create a project with your server address (`localhost:7777`) - a Studio account isn't required. Submit workflows runs and watch them execute in real time.

## License

Expand Down
5 changes: 4 additions & 1 deletion cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## 0.9.1

No changes.
Enhancements:

- Updates `server` command to set default project ("default"), and improve Docker lifecycle handling.
- Updates `worker` command to infer adapter (to avoid running `setup`).

## 0.9.0

Expand Down
65 changes: 52 additions & 13 deletions cli/cmd/coflux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/bitroot/coflux/cli/internal/version"
"github.com/spf13/cobra"
Expand All @@ -27,9 +29,8 @@ COFLUX_SERVER_* environment variables.

Examples:
coflux server
coflux server --port 8080
coflux server --data-dir ./my-data
coflux server --super-token mytoken --no-auth`,
coflux server --no-auth --project myproject
coflux server --super-token mytoken --public-host %.localhost:7777`,
RunE: runServer,
}

Expand All @@ -49,20 +50,30 @@ func init() {
serverCmd.Flags().StringVar(&serverSuperToken, "super-token", "", "Super token (will be hashed)")
serverCmd.Flags().StringVar(&serverSuperTokenHash, "super-token-hash", "", "Pre-hashed super token (SHA-256 hex)")
serverCmd.Flags().String("secret", "", "Server secret for signing service tokens")
serverCmd.Flags().StringSlice("studio-teams", nil, "Team IDs allowed for Studio auth")
serverCmd.Flags().StringSlice("launcher-types", nil, "Allowed launcher types (docker, process)")
serverCmd.Flags().StringSlice("team", nil, "Team IDs allowed for Studio auth")
serverCmd.Flags().StringSlice("launcher", nil, "Allowed launcher types (docker, process)")
serverCmd.Flags().String("studio-url", "", "Studio URL")
serverCmd.Flags().StringSlice("allow-origin", nil, "Allowed CORS origins")

serverCmd.Flags().MarkHidden("studio-url")
serverCmd.Flags().MarkHidden("allow-origin")

serverCmd.MarkFlagsMutuallyExclusive("super-token", "super-token-hash")

// Bind flags to viper under the server.* namespace
// Note: viper keys use plural forms (e.g., studio_teams) to match
// config file keys and COFLUX_SERVER_* environment variables,
// while CLI flags use singular forms (e.g., --team).
viper.BindPFlag("server.port", serverCmd.Flags().Lookup("port"))
viper.BindPFlag("server.data_dir", serverCmd.Flags().Lookup("data-dir"))
viper.BindPFlag("server.image", serverCmd.Flags().Lookup("image"))
viper.BindPFlag("server.project", serverCmd.Flags().Lookup("project"))
viper.BindPFlag("server.public_host", serverCmd.Flags().Lookup("public-host"))
viper.BindPFlag("server.secret", serverCmd.Flags().Lookup("secret"))
viper.BindPFlag("server.studio_teams", serverCmd.Flags().Lookup("studio-teams"))
viper.BindPFlag("server.launcher_types", serverCmd.Flags().Lookup("launcher-types"))
viper.BindPFlag("server.studio_teams", serverCmd.Flags().Lookup("team"))
viper.BindPFlag("server.launcher_types", serverCmd.Flags().Lookup("launcher"))
viper.BindPFlag("server.studio_url", serverCmd.Flags().Lookup("studio-url"))
viper.BindPFlag("server.allow_origins", serverCmd.Flags().Lookup("allow-origin"))
}

// getDefaultImage returns the default Docker image name.
Expand Down Expand Up @@ -107,16 +118,25 @@ func runServer(cmd *cobra.Command, args []string) error {
// Build docker command
dockerArgs := []string{
"run",
"--rm",
"--pull", pullPolicy,
"--publish", fmt.Sprintf("%d:7777", port),
"--volume", fmt.Sprintf("%s:/data", absDataDir),
// Disable Erlang's interactive break handler (Ctrl+C menu).
"--env", "ERL_FLAGS=+Bd",
}

// Add environment variables for server configuration
if project := viper.GetString("server.project"); project != "" {
project := viper.GetString("server.project")
publicHost := viper.GetString("server.public_host")
if project == "" && !strings.HasPrefix(publicHost, "%") {
project = "default"
fmt.Printf("No project specified, using %q\n", project)
}
if project != "" {
dockerArgs = append(dockerArgs, "--env", "COFLUX_PROJECT="+project)
}
if publicHost := viper.GetString("server.public_host"); publicHost != "" {
if publicHost != "" {
dockerArgs = append(dockerArgs, "--env", "COFLUX_PUBLIC_HOST="+publicHost)
}
if serverNoAuth {
Expand Down Expand Up @@ -148,6 +168,12 @@ func runServer(cmd *cobra.Command, args []string) error {
if types := viper.GetStringSlice("server.launcher_types"); len(types) > 0 {
dockerArgs = append(dockerArgs, "--env", "COFLUX_LAUNCHER_TYPES="+strings.Join(types, ","))
}
if studioURL := viper.GetString("server.studio_url"); studioURL != "" {
dockerArgs = append(dockerArgs, "--env", "COFLUX_STUDIO_URL="+studioURL)
}
if origins := viper.GetStringSlice("server.allow_origins"); len(origins) > 0 {
dockerArgs = append(dockerArgs, "--env", "COFLUX_ALLOW_ORIGINS="+strings.Join(origins, ","))
}

// Check config-level auth setting (--no-auth flag handled above)
if !serverNoAuth {
Expand All @@ -163,17 +189,30 @@ func runServer(cmd *cobra.Command, args []string) error {
fmt.Printf("Starting Coflux server on port %d...\n", port)
fmt.Printf("Data directory: %s\n", absDataDir)

// Run docker
// Run docker in its own process group so that Ctrl+C doesn't go
// directly to it. Instead, Go catches the signal and sends SIGTERM
// to docker, which proxies it to the container for a clean shutdown.
dockerCmd := exec.Command("docker", dockerArgs...)
dockerCmd.Stdout = os.Stdout
dockerCmd.Stderr = os.Stderr
dockerCmd.Stdin = os.Stdin
dockerCmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

if err := dockerCmd.Start(); err != nil {
return fmt.Errorf("failed to start docker: %w", err)
}

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
go func() {
<-sigCh
dockerCmd.Process.Signal(syscall.SIGTERM)
}()

if err := dockerCmd.Run(); err != nil {
if err := dockerCmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
os.Exit(exitErr.ExitCode())
}
return fmt.Errorf("failed to run docker: %w", err)
return fmt.Errorf("docker exited with error: %w", err)
}

return nil
Expand Down
25 changes: 25 additions & 0 deletions cli/cmd/coflux/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"

Expand Down Expand Up @@ -88,6 +89,30 @@ func detectAdapters() []AdapterDetection {
})
}

// Fallback: uv available on PATH (auto-installs coflux via --with)
if _, err := exec.LookPath("uv"); err == nil {
detections = append(detections, AdapterDetection{
Name: "Python (uv --with coflux)",
Command: []string{"uv", "run", "--with", "coflux", "python", "-m", "coflux"},
Confidence: 30,
})
}

// Fallback: python available on PATH
if _, err := exec.LookPath("python3"); err == nil {
detections = append(detections, AdapterDetection{
Name: "Python (system)",
Command: []string{"python3", "-m", "coflux"},
Confidence: 20,
})
} else if _, err := exec.LookPath("python"); err == nil {
detections = append(detections, AdapterDetection{
Name: "Python (system)",
Command: []string{"python", "-m", "coflux"},
Confidence: 20,
})
}

return detections
}

Expand Down
21 changes: 16 additions & 5 deletions cli/cmd/coflux/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,25 @@ func runWorker(cmd *cobra.Command, args []string) error {
cfg.Worker.Adapter = workerAdapter
}

// Check adapter is configured
if len(cfg.Worker.Adapter) == 0 {
return fmt.Errorf("no adapter configured; run 'coflux setup' or add 'worker.adapter' to coflux.toml")
}

// Setup logging
logger := getLogger()

// Auto-detect adapter if not configured
if len(cfg.Worker.Adapter) == 0 {
detections := detectAdapters()
if len(detections) == 0 {
return fmt.Errorf("no adapter configured; run 'coflux setup' or add 'worker.adapter' to coflux.toml")
}
best := detections[0]
for _, d := range detections[1:] {
if d.Confidence > best.Confidence {
best = d
}
}
cfg.Worker.Adapter = best.Command
logger.Info("auto-detected adapter", "name", best.Name, "command", strings.Join(best.Command, " "))
}

// Create adapter from config
cmdAdapter := adapter.NewCommandAdapter(cfg.Worker.Adapter)

Expand Down
Loading
Loading