diff --git a/Cargo.lock b/Cargo.lock index 0127dc42cc..34eba352a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1471,6 +1471,18 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.18" @@ -2385,6 +2397,7 @@ dependencies = [ "acme-lib", "anyhow", "chirp-workflow", + "cjson", "cloudflare", "cluster", "faker-build", @@ -3313,6 +3326,23 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "clickhouse-inserter" +version = "25.4.2" +dependencies = [ + "anyhow", + "async-channel", + "futures", + "global-error", + "reqwest 0.11.27", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-util 0.7.12", + "tracing", +] + [[package]] name = "clickhouse-rs-cityhash-sys" version = "0.1.2" @@ -12404,6 +12434,7 @@ name = "rivet-guard-core" version = "25.4.2" dependencies = [ "bytes", + "clickhouse-inserter", "cluster", "futures", "futures-util", @@ -12686,6 +12717,7 @@ dependencies = [ "anyhow", "async-nats", "clickhouse", + "clickhouse-inserter", "dirs", "divan", "fdb-util", diff --git a/Cargo.toml b/Cargo.toml index e34833b653..5a790d7138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/fdb-util","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/logs","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/server-cli","packages/common/service-discovery","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/core/api/actor","packages/core/api/auth","packages/core/api/cf-verification","packages/core/api/cloud","packages/core/api/games","packages/core/api/group","packages/core/api/identity","packages/core/api/intercom","packages/core/api/job","packages/core/api/matchmaker","packages/core/api/monolith-edge","packages/core/api/monolith-public","packages/core/api/portal","packages/core/api/provision","packages/core/api/status","packages/core/api/traefik-provider","packages/core/api/ui","packages/core/infra/legacy/job-runner","packages/core/infra/schema-generator","packages/core/infra/server","packages/core/services/build","packages/core/services/build/ops/create","packages/core/services/build/ops/get","packages/core/services/build/ops/list-for-env","packages/core/services/build/ops/list-for-game","packages/core/services/build/standalone/default-create","packages/core/services/build/util","packages/core/services/captcha/ops/hcaptcha-config-get","packages/core/services/captcha/ops/hcaptcha-verify","packages/core/services/captcha/ops/request","packages/core/services/captcha/ops/turnstile-config-get","packages/core/services/captcha/ops/turnstile-verify","packages/core/services/captcha/ops/verify","packages/core/services/captcha/util","packages/core/services/cdn/ops/namespace-auth-user-remove","packages/core/services/cdn/ops/namespace-auth-user-update","packages/core/services/cdn/ops/namespace-create","packages/core/services/cdn/ops/namespace-domain-create","packages/core/services/cdn/ops/namespace-domain-remove","packages/core/services/cdn/ops/namespace-get","packages/core/services/cdn/ops/namespace-resolve-domain","packages/core/services/cdn/ops/ns-auth-type-set","packages/core/services/cdn/ops/ns-enable-domain-public-auth-set","packages/core/services/cdn/ops/site-create","packages/core/services/cdn/ops/site-get","packages/core/services/cdn/ops/site-list-for-game","packages/core/services/cdn/ops/version-get","packages/core/services/cdn/ops/version-prepare","packages/core/services/cdn/ops/version-publish","packages/core/services/cdn/util","packages/core/services/cdn/worker","packages/core/services/cf-custom-hostname/ops/get","packages/core/services/cf-custom-hostname/ops/list-for-namespace-id","packages/core/services/cf-custom-hostname/ops/resolve-hostname","packages/core/services/cf-custom-hostname/worker","packages/core/services/cloud/ops/device-link-create","packages/core/services/cloud/ops/game-config-create","packages/core/services/cloud/ops/game-config-get","packages/core/services/cloud/ops/game-token-create","packages/core/services/cloud/ops/namespace-create","packages/core/services/cloud/ops/namespace-get","packages/core/services/cloud/ops/namespace-token-development-create","packages/core/services/cloud/ops/namespace-token-public-create","packages/core/services/cloud/ops/version-get","packages/core/services/cloud/ops/version-publish","packages/core/services/cloud/standalone/default-create","packages/core/services/cloud/worker","packages/core/services/cluster","packages/core/services/cluster/standalone/datacenter-tls-renew","packages/core/services/cluster/standalone/default-update","packages/core/services/cluster/standalone/gc","packages/core/services/cluster/standalone/metrics-publish","packages/core/services/custom-user-avatar/ops/list-for-game","packages/core/services/custom-user-avatar/ops/upload-complete","packages/core/services/debug/ops/email-res","packages/core/services/dynamic-config","packages/core/services/email-verification/ops/complete","packages/core/services/email-verification/ops/create","packages/core/services/email/ops/send","packages/core/services/external/ops/request-validate","packages/core/services/external/worker","packages/core/services/faker/ops/build","packages/core/services/faker/ops/cdn-site","packages/core/services/faker/ops/game","packages/core/services/faker/ops/game-namespace","packages/core/services/faker/ops/game-version","packages/core/services/faker/ops/job-run","packages/core/services/faker/ops/job-template","packages/core/services/faker/ops/mm-lobby","packages/core/services/faker/ops/mm-lobby-row","packages/core/services/faker/ops/mm-player","packages/core/services/faker/ops/region","packages/core/services/faker/ops/team","packages/core/services/faker/ops/user","packages/core/services/game/ops/banner-upload-complete","packages/core/services/game/ops/create","packages/core/services/game/ops/get","packages/core/services/game/ops/list-all","packages/core/services/game/ops/list-for-team","packages/core/services/game/ops/logo-upload-complete","packages/core/services/game/ops/namespace-create","packages/core/services/game/ops/namespace-get","packages/core/services/game/ops/namespace-list","packages/core/services/game/ops/namespace-resolve-name-id","packages/core/services/game/ops/namespace-resolve-url","packages/core/services/game/ops/namespace-validate","packages/core/services/game/ops/namespace-version-history-list","packages/core/services/game/ops/namespace-version-set","packages/core/services/game/ops/recommend","packages/core/services/game/ops/resolve-name-id","packages/core/services/game/ops/resolve-namespace-id","packages/core/services/game/ops/token-development-validate","packages/core/services/game/ops/validate","packages/core/services/game/ops/version-create","packages/core/services/game/ops/version-get","packages/core/services/game/ops/version-list","packages/core/services/game/ops/version-validate","packages/core/services/ip/ops/info","packages/core/services/job-log/ops/read","packages/core/services/job-log/worker","packages/core/services/job-run","packages/core/services/job/standalone/gc","packages/core/services/job/util","packages/core/services/linode","packages/core/services/linode/standalone/gc","packages/core/services/load-test/standalone/api-cloud","packages/core/services/load-test/standalone/mm","packages/core/services/load-test/standalone/mm-sustain","packages/core/services/load-test/standalone/sqlx","packages/core/services/load-test/standalone/watch-requests","packages/core/services/mm-config/ops/game-get","packages/core/services/mm-config/ops/game-upsert","packages/core/services/mm-config/ops/lobby-group-get","packages/core/services/mm-config/ops/lobby-group-resolve-name-id","packages/core/services/mm-config/ops/lobby-group-resolve-version","packages/core/services/mm-config/ops/namespace-config-set","packages/core/services/mm-config/ops/namespace-config-validate","packages/core/services/mm-config/ops/namespace-create","packages/core/services/mm-config/ops/namespace-get","packages/core/services/mm-config/ops/version-get","packages/core/services/mm-config/ops/version-prepare","packages/core/services/mm-config/ops/version-publish","packages/core/services/mm/ops/dev-player-token-create","packages/core/services/mm/ops/lobby-find-fail","packages/core/services/mm/ops/lobby-find-lobby-query-list","packages/core/services/mm/ops/lobby-find-try-complete","packages/core/services/mm/ops/lobby-for-run-id","packages/core/services/mm/ops/lobby-get","packages/core/services/mm/ops/lobby-history","packages/core/services/mm/ops/lobby-idle-update","packages/core/services/mm/ops/lobby-list-for-namespace","packages/core/services/mm/ops/lobby-list-for-user-id","packages/core/services/mm/ops/lobby-player-count","packages/core/services/mm/ops/lobby-runtime-aggregate","packages/core/services/mm/ops/lobby-state-get","packages/core/services/mm/ops/player-count-for-namespace","packages/core/services/mm/ops/player-get","packages/core/services/mm/standalone/gc","packages/core/services/mm/util","packages/core/services/mm/worker","packages/core/services/monolith/standalone/worker","packages/core/services/monolith/standalone/workflow-worker","packages/core/services/nomad/standalone/monitor","packages/core/services/region/ops/get","packages/core/services/region/ops/list","packages/core/services/region/ops/list-for-game","packages/core/services/region/ops/recommend","packages/core/services/region/ops/resolve","packages/core/services/region/ops/resolve-for-game","packages/core/services/route","packages/core/services/server-spec","packages/core/services/team-invite/ops/get","packages/core/services/team-invite/worker","packages/core/services/team/ops/avatar-upload-complete","packages/core/services/team/ops/get","packages/core/services/team/ops/join-request-list","packages/core/services/team/ops/member-count","packages/core/services/team/ops/member-get","packages/core/services/team/ops/member-list","packages/core/services/team/ops/member-relationship-get","packages/core/services/team/ops/profile-validate","packages/core/services/team/ops/recommend","packages/core/services/team/ops/resolve-display-name","packages/core/services/team/ops/user-ban-get","packages/core/services/team/ops/user-ban-list","packages/core/services/team/ops/validate","packages/core/services/team/util","packages/core/services/team/worker","packages/core/services/telemetry/standalone/beacon","packages/core/services/tier","packages/core/services/token/ops/create","packages/core/services/token/ops/exchange","packages/core/services/token/ops/get","packages/core/services/token/ops/revoke","packages/core/services/upload/ops/complete","packages/core/services/upload/ops/file-list","packages/core/services/upload/ops/get","packages/core/services/upload/ops/list-for-user","packages/core/services/upload/ops/prepare","packages/core/services/upload/worker","packages/core/services/user","packages/core/services/user-identity/ops/create","packages/core/services/user-identity/ops/delete","packages/core/services/user-identity/ops/get","packages/core/services/user/ops/avatar-upload-complete","packages/core/services/user/ops/get","packages/core/services/user/ops/pending-delete-toggle","packages/core/services/user/ops/profile-validate","packages/core/services/user/ops/resolve-email","packages/core/services/user/ops/team-list","packages/core/services/user/ops/token-create","packages/core/services/user/standalone/delete-pending","packages/core/services/user/worker","packages/edge/api/actor","packages/edge/api/intercom","packages/edge/api/monolith-edge","packages/edge/api/monolith-public","packages/edge/api/traefik-provider","packages/edge/infra/client/actor-kv","packages/edge/infra/client/config","packages/edge/infra/client/container-runner","packages/edge/infra/client/echo","packages/edge/infra/client/isolate-v8-runner","packages/edge/infra/client/manager","packages/edge/infra/edge-server","packages/edge/infra/guard/core","packages/edge/infra/guard/server","packages/edge/services/monolith/standalone/workflow-worker","packages/edge/services/pegboard","packages/edge/services/pegboard/standalone/usage-metrics-publish","packages/edge/services/pegboard/standalone/ws","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] +members = ["packages/common/api-helper/build","packages/common/api-helper/macros","packages/common/cache/build","packages/common/cache/result","packages/common/chirp-workflow/core","packages/common/chirp-workflow/macros","packages/common/chirp/client","packages/common/chirp/metrics","packages/common/chirp/perf","packages/common/chirp/types","packages/common/chirp/worker","packages/common/chirp/worker-attributes","packages/common/claims","packages/common/clickhouse-inserter","packages/common/config","packages/common/connection","packages/common/convert","packages/common/deno-embed","packages/common/env","packages/common/fdb-util","packages/common/formatted-error","packages/common/global-error","packages/common/health-checks","packages/common/hub-embed","packages/common/kv-str","packages/common/logs","packages/common/metrics","packages/common/migrate","packages/common/nomad-util","packages/common/operation/core","packages/common/operation/macros","packages/common/pools","packages/common/redis-util","packages/common/runtime","packages/common/s3-util","packages/common/schemac","packages/common/server-cli","packages/common/service-discovery","packages/common/service-manager","packages/common/smithy-output/api-auth/rust","packages/common/smithy-output/api-auth/rust-server","packages/common/smithy-output/api-cf-verification/rust","packages/common/smithy-output/api-cf-verification/rust-server","packages/common/smithy-output/api-cloud/rust","packages/common/smithy-output/api-cloud/rust-server","packages/common/smithy-output/api-group/rust","packages/common/smithy-output/api-group/rust-server","packages/common/smithy-output/api-identity/rust","packages/common/smithy-output/api-identity/rust-server","packages/common/smithy-output/api-job/rust","packages/common/smithy-output/api-job/rust-server","packages/common/smithy-output/api-kv/rust","packages/common/smithy-output/api-kv/rust-server","packages/common/smithy-output/api-matchmaker/rust","packages/common/smithy-output/api-matchmaker/rust-server","packages/common/smithy-output/api-party/rust","packages/common/smithy-output/api-party/rust-server","packages/common/smithy-output/api-portal/rust","packages/common/smithy-output/api-portal/rust-server","packages/common/smithy-output/api-status/rust","packages/common/smithy-output/api-status/rust-server","packages/common/smithy-output/api-traefik-provider/rust","packages/common/smithy-output/api-traefik-provider/rust-server","packages/common/test","packages/common/test-images","packages/common/types-proto/build","packages/common/types-proto/core","packages/common/util/core","packages/common/util/macros","packages/common/util/search","packages/core/api/actor","packages/core/api/auth","packages/core/api/cf-verification","packages/core/api/cloud","packages/core/api/games","packages/core/api/group","packages/core/api/identity","packages/core/api/intercom","packages/core/api/job","packages/core/api/matchmaker","packages/core/api/monolith-edge","packages/core/api/monolith-public","packages/core/api/portal","packages/core/api/provision","packages/core/api/status","packages/core/api/traefik-provider","packages/core/api/ui","packages/core/infra/legacy/job-runner","packages/core/infra/schema-generator","packages/core/infra/server","packages/core/services/build","packages/core/services/build/ops/create","packages/core/services/build/ops/get","packages/core/services/build/ops/list-for-env","packages/core/services/build/ops/list-for-game","packages/core/services/build/standalone/default-create","packages/core/services/build/util","packages/core/services/captcha/ops/hcaptcha-config-get","packages/core/services/captcha/ops/hcaptcha-verify","packages/core/services/captcha/ops/request","packages/core/services/captcha/ops/turnstile-config-get","packages/core/services/captcha/ops/turnstile-verify","packages/core/services/captcha/ops/verify","packages/core/services/captcha/util","packages/core/services/cdn/ops/namespace-auth-user-remove","packages/core/services/cdn/ops/namespace-auth-user-update","packages/core/services/cdn/ops/namespace-create","packages/core/services/cdn/ops/namespace-domain-create","packages/core/services/cdn/ops/namespace-domain-remove","packages/core/services/cdn/ops/namespace-get","packages/core/services/cdn/ops/namespace-resolve-domain","packages/core/services/cdn/ops/ns-auth-type-set","packages/core/services/cdn/ops/ns-enable-domain-public-auth-set","packages/core/services/cdn/ops/site-create","packages/core/services/cdn/ops/site-get","packages/core/services/cdn/ops/site-list-for-game","packages/core/services/cdn/ops/version-get","packages/core/services/cdn/ops/version-prepare","packages/core/services/cdn/ops/version-publish","packages/core/services/cdn/util","packages/core/services/cdn/worker","packages/core/services/cf-custom-hostname/ops/get","packages/core/services/cf-custom-hostname/ops/list-for-namespace-id","packages/core/services/cf-custom-hostname/ops/resolve-hostname","packages/core/services/cf-custom-hostname/worker","packages/core/services/cloud/ops/device-link-create","packages/core/services/cloud/ops/game-config-create","packages/core/services/cloud/ops/game-config-get","packages/core/services/cloud/ops/game-token-create","packages/core/services/cloud/ops/namespace-create","packages/core/services/cloud/ops/namespace-get","packages/core/services/cloud/ops/namespace-token-development-create","packages/core/services/cloud/ops/namespace-token-public-create","packages/core/services/cloud/ops/version-get","packages/core/services/cloud/ops/version-publish","packages/core/services/cloud/standalone/default-create","packages/core/services/cloud/worker","packages/core/services/cluster","packages/core/services/cluster/standalone/datacenter-tls-renew","packages/core/services/cluster/standalone/default-update","packages/core/services/cluster/standalone/gc","packages/core/services/cluster/standalone/metrics-publish","packages/core/services/custom-user-avatar/ops/list-for-game","packages/core/services/custom-user-avatar/ops/upload-complete","packages/core/services/debug/ops/email-res","packages/core/services/dynamic-config","packages/core/services/email-verification/ops/complete","packages/core/services/email-verification/ops/create","packages/core/services/email/ops/send","packages/core/services/external/ops/request-validate","packages/core/services/external/worker","packages/core/services/faker/ops/build","packages/core/services/faker/ops/cdn-site","packages/core/services/faker/ops/game","packages/core/services/faker/ops/game-namespace","packages/core/services/faker/ops/game-version","packages/core/services/faker/ops/job-run","packages/core/services/faker/ops/job-template","packages/core/services/faker/ops/mm-lobby","packages/core/services/faker/ops/mm-lobby-row","packages/core/services/faker/ops/mm-player","packages/core/services/faker/ops/region","packages/core/services/faker/ops/team","packages/core/services/faker/ops/user","packages/core/services/game/ops/banner-upload-complete","packages/core/services/game/ops/create","packages/core/services/game/ops/get","packages/core/services/game/ops/list-all","packages/core/services/game/ops/list-for-team","packages/core/services/game/ops/logo-upload-complete","packages/core/services/game/ops/namespace-create","packages/core/services/game/ops/namespace-get","packages/core/services/game/ops/namespace-list","packages/core/services/game/ops/namespace-resolve-name-id","packages/core/services/game/ops/namespace-resolve-url","packages/core/services/game/ops/namespace-validate","packages/core/services/game/ops/namespace-version-history-list","packages/core/services/game/ops/namespace-version-set","packages/core/services/game/ops/recommend","packages/core/services/game/ops/resolve-name-id","packages/core/services/game/ops/resolve-namespace-id","packages/core/services/game/ops/token-development-validate","packages/core/services/game/ops/validate","packages/core/services/game/ops/version-create","packages/core/services/game/ops/version-get","packages/core/services/game/ops/version-list","packages/core/services/game/ops/version-validate","packages/core/services/ip/ops/info","packages/core/services/job-log/ops/read","packages/core/services/job-log/worker","packages/core/services/job-run","packages/core/services/job/standalone/gc","packages/core/services/job/util","packages/core/services/linode","packages/core/services/linode/standalone/gc","packages/core/services/load-test/standalone/api-cloud","packages/core/services/load-test/standalone/mm","packages/core/services/load-test/standalone/mm-sustain","packages/core/services/load-test/standalone/sqlx","packages/core/services/load-test/standalone/watch-requests","packages/core/services/mm-config/ops/game-get","packages/core/services/mm-config/ops/game-upsert","packages/core/services/mm-config/ops/lobby-group-get","packages/core/services/mm-config/ops/lobby-group-resolve-name-id","packages/core/services/mm-config/ops/lobby-group-resolve-version","packages/core/services/mm-config/ops/namespace-config-set","packages/core/services/mm-config/ops/namespace-config-validate","packages/core/services/mm-config/ops/namespace-create","packages/core/services/mm-config/ops/namespace-get","packages/core/services/mm-config/ops/version-get","packages/core/services/mm-config/ops/version-prepare","packages/core/services/mm-config/ops/version-publish","packages/core/services/mm/ops/dev-player-token-create","packages/core/services/mm/ops/lobby-find-fail","packages/core/services/mm/ops/lobby-find-lobby-query-list","packages/core/services/mm/ops/lobby-find-try-complete","packages/core/services/mm/ops/lobby-for-run-id","packages/core/services/mm/ops/lobby-get","packages/core/services/mm/ops/lobby-history","packages/core/services/mm/ops/lobby-idle-update","packages/core/services/mm/ops/lobby-list-for-namespace","packages/core/services/mm/ops/lobby-list-for-user-id","packages/core/services/mm/ops/lobby-player-count","packages/core/services/mm/ops/lobby-runtime-aggregate","packages/core/services/mm/ops/lobby-state-get","packages/core/services/mm/ops/player-count-for-namespace","packages/core/services/mm/ops/player-get","packages/core/services/mm/standalone/gc","packages/core/services/mm/util","packages/core/services/mm/worker","packages/core/services/monolith/standalone/worker","packages/core/services/monolith/standalone/workflow-worker","packages/core/services/nomad/standalone/monitor","packages/core/services/region/ops/get","packages/core/services/region/ops/list","packages/core/services/region/ops/list-for-game","packages/core/services/region/ops/recommend","packages/core/services/region/ops/resolve","packages/core/services/region/ops/resolve-for-game","packages/core/services/route","packages/core/services/server-spec","packages/core/services/team-invite/ops/get","packages/core/services/team-invite/worker","packages/core/services/team/ops/avatar-upload-complete","packages/core/services/team/ops/get","packages/core/services/team/ops/join-request-list","packages/core/services/team/ops/member-count","packages/core/services/team/ops/member-get","packages/core/services/team/ops/member-list","packages/core/services/team/ops/member-relationship-get","packages/core/services/team/ops/profile-validate","packages/core/services/team/ops/recommend","packages/core/services/team/ops/resolve-display-name","packages/core/services/team/ops/user-ban-get","packages/core/services/team/ops/user-ban-list","packages/core/services/team/ops/validate","packages/core/services/team/util","packages/core/services/team/worker","packages/core/services/telemetry/standalone/beacon","packages/core/services/tier","packages/core/services/token/ops/create","packages/core/services/token/ops/exchange","packages/core/services/token/ops/get","packages/core/services/token/ops/revoke","packages/core/services/upload/ops/complete","packages/core/services/upload/ops/file-list","packages/core/services/upload/ops/get","packages/core/services/upload/ops/list-for-user","packages/core/services/upload/ops/prepare","packages/core/services/upload/worker","packages/core/services/user","packages/core/services/user-identity/ops/create","packages/core/services/user-identity/ops/delete","packages/core/services/user-identity/ops/get","packages/core/services/user/ops/avatar-upload-complete","packages/core/services/user/ops/get","packages/core/services/user/ops/pending-delete-toggle","packages/core/services/user/ops/profile-validate","packages/core/services/user/ops/resolve-email","packages/core/services/user/ops/team-list","packages/core/services/user/ops/token-create","packages/core/services/user/standalone/delete-pending","packages/core/services/user/worker","packages/edge/api/actor","packages/edge/api/intercom","packages/edge/api/monolith-edge","packages/edge/api/monolith-public","packages/edge/api/traefik-provider","packages/edge/infra/client/actor-kv","packages/edge/infra/client/config","packages/edge/infra/client/container-runner","packages/edge/infra/client/echo","packages/edge/infra/client/isolate-v8-runner","packages/edge/infra/client/manager","packages/edge/infra/edge-server","packages/edge/infra/guard/core","packages/edge/infra/guard/server","packages/edge/services/monolith/standalone/workflow-worker","packages/edge/services/pegboard","packages/edge/services/pegboard/standalone/usage-metrics-publish","packages/edge/services/pegboard/standalone/ws","packages/toolchain/cli","packages/toolchain/js-utils-embed","packages/toolchain/toolchain","sdks/api/full/rust"] [workspace.package] version = "25.4.2" @@ -123,6 +123,9 @@ path = "packages/common/chirp/worker-attributes" [workspace.dependencies.rivet-claims] path = "packages/common/claims" +[workspace.dependencies.clickhouse-inserter] +path = "packages/common/clickhouse-inserter" + [workspace.dependencies.rivet-config] path = "packages/common/config" diff --git a/docker/dev-full/rivet-edge-server/config.jsonc b/docker/dev-full/rivet-edge-server/config.jsonc index 219412b8d6..f441be3844 100644 --- a/docker/dev-full/rivet-edge-server/config.jsonc +++ b/docker/dev-full/rivet-edge-server/config.jsonc @@ -68,6 +68,10 @@ } } }, + "vector_http": { + "host": "vector-client", + "port": 5022 + }, "prometheus": { "url": "http://prometheus:9090" }, diff --git a/docker/dev-full/rivet-guard/config.jsonc b/docker/dev-full/rivet-guard/config.jsonc index a3cc401d55..2c704e7bb5 100644 --- a/docker/dev-full/rivet-guard/config.jsonc +++ b/docker/dev-full/rivet-guard/config.jsonc @@ -80,6 +80,10 @@ } } }, + "vector_http": { + "host": "vector-client", + "port": 5022 + }, "prometheus": { "url": "http://prometheus:9090" }, diff --git a/docker/dev-full/vector-client/vector.yaml b/docker/dev-full/vector-client/vector.yaml index 278ae90592..008feaa39f 100644 --- a/docker/dev-full/vector-client/vector.yaml +++ b/docker/dev-full/vector-client/vector.yaml @@ -11,6 +11,11 @@ sources: - http://rivet-client:6090 scrape_interval_secs: 15 + dynamic_events_http: + type: http_server + address: 0.0.0.0:5022 + encoding: ndjson + pegboard_manager: type: file include: @@ -92,6 +97,7 @@ sinks: type: vector inputs: - metrics_add_meta + - dynamic_events_http - pegboard_manager_add_meta - pegboard_v8_isolate_runner_add_meta - pegboard_container_runner_add_meta diff --git a/docker/dev-full/vector-server/vector.yaml b/docker/dev-full/vector-server/vector.yaml index ed008e89bb..9a1261580e 100644 --- a/docker/dev-full/vector-server/vector.yaml +++ b/docker/dev-full/vector-server/vector.yaml @@ -59,6 +59,35 @@ transforms: condition: type: vrl source: .source == "pegboard_container_runner" + + clickhouse_dynamic_events_filter: + type: filter + inputs: + - vector + condition: + type: vrl + source: .source == "clickhouse" + + clickhouse_dynamic_events_transform: + type: remap + inputs: + - clickhouse_dynamic_events_filter + source: | + # Extract and store metadata + __database = .database + __table = .table + __columns = .columns + + # Create a new object with just the columns data + . = { + "__database": __database, + "__table": __table, + # By default insert namespace column since most tables include this + "namespace": "rivet" + } + + # Merge in the column data that should be inserted + . = merge!(., __columns) sinks: prom_exporter: @@ -82,7 +111,7 @@ sinks: compression: gzip database: db_pegboard_actor_log endpoint: http://clickhouse:8123 - table: actor_logs + table: actor_logs2 auth: strategy: basic user: vector @@ -118,4 +147,19 @@ sinks: path: "/var/log/vector/pegboard_container_runner/%Y-%m-%d.log" encoding: codec: "text" + + clickhouse_dynamic_events: + type: clickhouse + inputs: + - clickhouse_dynamic_events_transform + compression: gzip + endpoint: http://clickhouse:8123 + database: "{{ __database }}" + table: "{{ __table }}" + auth: + strategy: basic + user: vector + password: vector + batch: + timeout_secs: 1.0 diff --git a/docker/monolith/vector-server/vector.yaml b/docker/monolith/vector-server/vector.yaml index 228580ba50..bbf13bd17d 100644 --- a/docker/monolith/vector-server/vector.yaml +++ b/docker/monolith/vector-server/vector.yaml @@ -82,7 +82,7 @@ sinks: compression: gzip endpoint: http://clickhouse:9300 database: db_pegboard_actor_log - table: actor_logs + table: actor_logs2 auth: strategy: basic user: vector diff --git a/packages/common/chirp-workflow/core/src/ctx/activity.rs b/packages/common/chirp-workflow/core/src/ctx/activity.rs index 3ae20fd87d..967f418916 100644 --- a/packages/common/chirp-workflow/core/src/ctx/activity.rs +++ b/packages/common/chirp-workflow/core/src/ctx/activity.rs @@ -238,6 +238,11 @@ impl ActivityCtx { self.conn.clickhouse().await } + #[tracing::instrument(skip_all)] + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.conn.clickhouse_inserter().await + } + #[tracing::instrument(skip_all)] pub async fn fdb(&self) -> Result { self.conn.fdb().await diff --git a/packages/common/chirp-workflow/core/src/ctx/api.rs b/packages/common/chirp-workflow/core/src/ctx/api.rs index 8b3b6075b5..d0f505caf5 100644 --- a/packages/common/chirp-workflow/core/src/ctx/api.rs +++ b/packages/common/chirp-workflow/core/src/ctx/api.rs @@ -242,6 +242,11 @@ impl ApiCtx { self.conn.clickhouse().await } + #[tracing::instrument(skip_all)] + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.conn.clickhouse_inserter().await + } + #[tracing::instrument(skip_all)] pub async fn fdb(&self) -> Result { self.conn.fdb().await diff --git a/packages/common/chirp-workflow/core/src/ctx/operation.rs b/packages/common/chirp-workflow/core/src/ctx/operation.rs index 58a4b8d9f1..b307675ec2 100644 --- a/packages/common/chirp-workflow/core/src/ctx/operation.rs +++ b/packages/common/chirp-workflow/core/src/ctx/operation.rs @@ -266,6 +266,11 @@ impl OperationCtx { self.conn.clickhouse().await } + #[tracing::instrument(skip_all)] + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.conn.clickhouse_inserter().await + } + #[tracing::instrument(skip_all)] pub async fn fdb(&self) -> Result { self.conn.fdb().await diff --git a/packages/common/chirp-workflow/core/src/ctx/standalone.rs b/packages/common/chirp-workflow/core/src/ctx/standalone.rs index 4475a62292..658b08360b 100644 --- a/packages/common/chirp-workflow/core/src/ctx/standalone.rs +++ b/packages/common/chirp-workflow/core/src/ctx/standalone.rs @@ -253,6 +253,11 @@ impl StandaloneCtx { self.conn.clickhouse().await } + #[tracing::instrument(skip_all)] + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.conn.clickhouse_inserter().await + } + #[tracing::instrument(skip_all)] pub async fn fdb(&self) -> Result { self.conn.fdb().await diff --git a/packages/common/chirp-workflow/core/src/ctx/test.rs b/packages/common/chirp-workflow/core/src/ctx/test.rs index f57e64ac0f..4993d9d3ce 100644 --- a/packages/common/chirp-workflow/core/src/ctx/test.rs +++ b/packages/common/chirp-workflow/core/src/ctx/test.rs @@ -296,6 +296,11 @@ impl TestCtx { self.conn.clickhouse().await } + #[tracing::instrument(skip_all)] + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.conn.clickhouse_inserter().await + } + #[tracing::instrument(skip_all, fields(%workflow_id))] pub async fn sqlite_for_workflow(&self, workflow_id: Uuid) -> GlobalResult { common::sqlite_for_workflow(&self.db, &self.conn, workflow_id, true) diff --git a/packages/common/chirp/worker/src/test.rs b/packages/common/chirp/worker/src/test.rs index 6d5b23a11b..3818c3c008 100644 --- a/packages/common/chirp/worker/src/test.rs +++ b/packages/common/chirp/worker/src/test.rs @@ -84,4 +84,8 @@ impl TestCtx { pub async fn clickhouse(&self) -> GlobalResult { self.op_ctx.clickhouse().await } + + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.op_ctx.clickhouse_inserter().await + } } diff --git a/packages/common/clickhouse-inserter/Cargo.toml b/packages/common/clickhouse-inserter/Cargo.toml new file mode 100644 index 0000000000..f85862dccb --- /dev/null +++ b/packages/common/clickhouse-inserter/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "clickhouse-inserter" +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +global-error.workspace = true +tokio.workspace = true +reqwest = { version = "0.11", features = ["json"] } +tracing.workspace = true +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = ["raw_value"] } +thiserror = "1.0" +tokio-util = "0.7" +futures = "0.3" +async-channel = "2.1.1" +anyhow.workspace = true diff --git a/packages/common/clickhouse-inserter/src/error.rs b/packages/common/clickhouse-inserter/src/error.rs new file mode 100644 index 0000000000..9d389ecabb --- /dev/null +++ b/packages/common/clickhouse-inserter/src/error.rs @@ -0,0 +1,16 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("failed to send event to ClickHouse inserter")] + ChannelSendError, + + #[error("serialization error: {0}")] + SerializationError(#[source] serde_json::Error), + + #[error("failed to build reqwest client: {0}")] + ReqwestBuildError(#[source] reqwest::Error), + + #[error("failed to spawn background task")] + TaskSpawnError, +} \ No newline at end of file diff --git a/packages/common/clickhouse-inserter/src/lib.rs b/packages/common/clickhouse-inserter/src/lib.rs new file mode 100644 index 0000000000..38734b5bfe --- /dev/null +++ b/packages/common/clickhouse-inserter/src/lib.rs @@ -0,0 +1,179 @@ +use std::time::Duration; + +use global_error::prelude::*; +use serde::Serialize; +use serde_json::value::RawValue; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +pub mod error; +use error::Error; + +const BATCH_INTERVAL: Duration = Duration::from_millis(500); +const BATCH_SIZE: usize = 10_000; + +/// A handle to the ClickHouse inserter service +#[derive(Clone)] +pub struct ClickHouseInserterHandle { + sender: mpsc::Sender, +} + +#[derive(Serialize)] +struct Event { + source: &'static str, + database: &'static str, + table: &'static str, + columns: Box, +} + +impl ClickHouseInserterHandle { + /// Sends an event to the ClickHouse database with a specific database + pub fn insert( + &self, + database: &'static str, + table: &'static str, + columns: impl serde::Serialize, + ) -> GlobalResult<()> { + // Serialize the columns to a JSON string + let columns = serde_json::value::to_raw_value(&columns).map_err(|e| { + tracing::error!(?e, "failed to serialize columns for ClickHouse"); + Error::SerializationError(e) + })?; + + // Send the event to the background task + self.sender + .try_send(Event { + source: "clickhouse", + database, + table, + columns, + }) + .map_err(|e| { + tracing::error!(?e, "failed to send event to ClickHouse inserter"); + Error::ChannelSendError + })?; + + Ok(()) + } +} + +struct InserterService { + receiver: mpsc::Receiver, + vector_url: String, + client: reqwest::Client, + cancel_token: CancellationToken, +} + +impl InserterService { + async fn run(mut self) -> Result<(), anyhow::Error> { + let mut interval = tokio::time::interval(BATCH_INTERVAL); + let mut events = Vec::new(); + + loop { + tokio::select! { + _ = self.cancel_token.cancelled() => { + tracing::info!("clickhouse inserter service shutting down"); + // Send remaining events before shutting down + if !events.is_empty() { + if let Err(e) = self.send_events(&events).await { + tracing::error!(?e, "failed to send final events to Vector"); + } + } + break; + } + // Receive new events from the channel + Some(event) = self.receiver.recv() => { + events.push(event); + + // Send batch if it's reached the size limit + if events.len() >= BATCH_SIZE { + if let Err(e) = self.send_events(&events).await { + tracing::error!(?e, "failed to send events to Vector"); + } + events.clear(); + } + } + // Timer tick - send any pending events + _ = interval.tick() => { + if !events.is_empty() { + if let Err(e) = self.send_events(&events).await { + tracing::error!(?e, "failed to send events to Vector"); + } + events.clear(); + } + } + } + } + + Ok(()) + } + + async fn send_events(&self, events: &[Event]) -> Result<(), anyhow::Error> { + let mut payload = Vec::new(); + + // Write each event as a separate JSON line (NDJSON format) + for event in events { + serde_json::to_writer(&mut payload, event)?; + payload.push(b'\n'); + } + + tracing::debug!( + event_count = events.len(), + event_bytes = payload.len(), + "sending events to Vector" + ); + + let response = self + .client + .post(&self.vector_url) + .header("Content-Type", "application/x-ndjson") + .body(payload) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await?; + tracing::error!(?status, ?body, "vector http request failed"); + return Err(anyhow::anyhow!( + "Vector HTTP request failed: {} {}", + status, + body + )); + } + + Ok(()) + } +} + +/// Creates a new ClickHouse inserter service +pub fn create_inserter( + vector_host: impl Into, + vector_port: u16, +) -> GlobalResult { + let (sender, receiver) = mpsc::channel(BATCH_SIZE * 2); + let vector_url = format!("http://{}:{}", vector_host.into(), vector_port); + + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .map_err(Error::ReqwestBuildError)?; + + let cancel_token = CancellationToken::new(); + + let service = InserterService { + receiver, + vector_url, + client, + cancel_token: cancel_token.clone(), + }; + + // Spawn the background task + let _ = tokio::spawn(async move { + if let Err(e) = service.run().await { + tracing::error!(?e, "clickhouse inserter service failed"); + } + }); + + Ok(ClickHouseInserterHandle { sender }) +} diff --git a/packages/common/config/src/config/server/mod.rs b/packages/common/config/src/config/server/mod.rs index 4f72e7bd57..6a02350a97 100644 --- a/packages/common/config/src/config/server/mod.rs +++ b/packages/common/config/src/config/server/mod.rs @@ -42,6 +42,8 @@ pub struct Server { pub prometheus: Option, #[serde(default)] pub foundationdb: Option, + #[serde(default)] + pub vector_http: Option, // Services #[serde(default)] @@ -488,6 +490,22 @@ impl FoundationDb { } } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub struct VectorHttp { + pub host: String, + pub port: u16, +} + +impl Default for VectorHttp { + fn default() -> Self { + Self { + host: "127.0.0.1".into(), + port: 5022, + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub enum Addresses { diff --git a/packages/common/connection/src/lib.rs b/packages/common/connection/src/lib.rs index 81ff0caa7a..89b4d420cc 100644 --- a/packages/common/connection/src/lib.rs +++ b/packages/common/connection/src/lib.rs @@ -114,6 +114,10 @@ impl Connection { pub async fn clickhouse(&self) -> GlobalResult { self.pools.clickhouse() } + + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.pools.clickhouse_inserter() + } } impl std::ops::Deref for Connection { diff --git a/packages/common/operation/core/src/lib.rs b/packages/common/operation/core/src/lib.rs index 77058da77f..0ca9db5f8e 100644 --- a/packages/common/operation/core/src/lib.rs +++ b/packages/common/operation/core/src/lib.rs @@ -272,6 +272,10 @@ where pub async fn clickhouse(&self) -> GlobalResult { self.conn.clickhouse().await } + + pub async fn clickhouse_inserter(&self) -> GlobalResult { + self.conn.clickhouse_inserter().await + } } impl std::ops::Deref for OperationContext diff --git a/packages/common/pools/Cargo.toml b/packages/common/pools/Cargo.toml index cd61bbbfb6..b97b6929c7 100644 --- a/packages/common/pools/Cargo.toml +++ b/packages/common/pools/Cargo.toml @@ -10,6 +10,7 @@ dirs = { workspace = true } anyhow = "1.0" async-nats = "0.33" clickhouse = { version = "0.11.2", features = ["uuid"] } +clickhouse-inserter.workspace = true fdb-util.workspace = true foundationdb.workspace = true funty = "=1.1.0" # Fixes issue with sqlx dependency, see https://github.com/bitvecto-rs/bitvec/issues/105#issuecomment-778570981 diff --git a/packages/common/pools/src/db/redis.rs b/packages/common/pools/src/db/redis.rs index 87177a736b..6e377f8c25 100644 --- a/packages/common/pools/src/db/redis.rs +++ b/packages/common/pools/src/db/redis.rs @@ -1,3 +1,4 @@ +use redis::aio::ConnectionManager; use rivet_config::Config; use std::collections::HashMap; use tokio::task::JoinSet; @@ -5,7 +6,7 @@ use tokio::time::Duration; use crate::Error; -pub type RedisPool = redis::aio::ConnectionManager; +pub type RedisPool = ConnectionManager; /// Connection timeout for the first connection. /// @@ -15,7 +16,7 @@ const INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); #[tracing::instrument(skip(config))] pub async fn setup(config: Config) -> Result, Error> { // Create Redis connections - let mut join_set = JoinSet::new(); + let mut join_set: JoinSet> = JoinSet::new(); let redis_types = &config.server().map_err(Error::Global)?.redis; for (key, redis_config) in [ ("ephemeral", redis_types.ephemeral.clone()), @@ -64,4 +65,4 @@ pub async fn setup(config: Config) -> Result, Error> tracing::debug!("redis connected"); Ok(redis) -} +} \ No newline at end of file diff --git a/packages/common/pools/src/error.rs b/packages/common/pools/src/error.rs index 29f2bb088a..7425a3c692 100644 --- a/packages/common/pools/src/error.rs +++ b/packages/common/pools/src/error.rs @@ -12,6 +12,9 @@ pub enum Error { #[error("missing clickhouse pool")] MissingClickHousePool, + #[error("missing clickhouse inserter")] + MissingClickHouseInserter, + #[error("missing fdb pool")] MissingFdbPool, @@ -57,6 +60,9 @@ pub enum Error { #[error("build clickhouse url: {0}")] BuildClickHouseUrl(url::ParseError), + #[error("build clickhouse inserter: {0}")] + BuildClickHouseInserter(global_error::GlobalError), + #[error("io error: {0}")] Io(std::io::Error), @@ -66,3 +72,9 @@ pub enum Error { #[error("lz4: {0}")] Lz4(lz4_flex::frame::Error), } + +impl From for Error { + fn from(err: global_error::GlobalError) -> Self { + Error::Global(err) + } +} diff --git a/packages/common/pools/src/lib.rs b/packages/common/pools/src/lib.rs index 4fc463834b..a02c45483e 100644 --- a/packages/common/pools/src/lib.rs +++ b/packages/common/pools/src/lib.rs @@ -11,6 +11,8 @@ pub use crate::{ db::redis::RedisPool, db::sqlite::SqlitePool, error::Error, pools::Pools, }; +pub use clickhouse_inserter::ClickHouseInserterHandle; + // Re-export for macros #[doc(hidden)] pub use rivet_util as __rivet_util; diff --git a/packages/common/pools/src/pools.rs b/packages/common/pools/src/pools.rs index 8aa362f7e8..bc5623411e 100644 --- a/packages/common/pools/src/pools.rs +++ b/packages/common/pools/src/pools.rs @@ -1,3 +1,4 @@ +use clickhouse_inserter::ClickHouseInserterHandle; use fdb_util::prelude::*; use global_error::{ensure_with, prelude::*, GlobalResult}; use rivet_config::Config; @@ -16,6 +17,7 @@ pub(crate) struct PoolsInner { pub(crate) crdb: Option, pub(crate) redis: HashMap, pub(crate) clickhouse: Option, + pub(crate) clickhouse_inserter: Option, pub(crate) fdb: Option, pub(crate) sqlite: SqlitePoolManagerHandle, clickhouse_enabled: bool, @@ -40,12 +42,24 @@ impl Pools { let clickhouse = crate::db::clickhouse::setup(config.clone())?; let sqlite = SqlitePoolManager::new(fdb.clone()).await?; + // Create the ClickHouse inserter if vector is enabled + let clickhouse_inserter = if let Some(vector_http) = + config.server.as_ref().and_then(|x| x.vector_http.as_ref()) + { + let inserter = clickhouse_inserter::create_inserter(&vector_http.host, vector_http.port) + .map_err(Error::BuildClickHouseInserter)?; + Some(inserter) + } else { + None + }; + let pool = Pools(Arc::new(PoolsInner { _guard: token.clone().drop_guard(), nats: Some(nats), crdb: Some(crdb), redis, clickhouse, + clickhouse_inserter, fdb, sqlite, clickhouse_enabled: config @@ -77,12 +91,14 @@ impl Pools { )?; let sqlite = SqlitePoolManager::new(fdb.clone()).await?; + // Test setup doesn't use ClickHouse inserter let pool = Pools(Arc::new(PoolsInner { _guard: token.clone().drop_guard(), nats: Some(nats), crdb: None, redis, clickhouse: None, + clickhouse_inserter: None, fdb, sqlite, clickhouse_enabled: config @@ -165,6 +181,20 @@ impl Pools { Ok(ch) } + pub fn clickhouse_inserter(&self) -> GlobalResult { + ensure_with!( + self.clickhouse_enabled(), + FEATURE_DISABLED, + feature = "Clickhouse" + ); + + let inserter = unwrap!( + self.0.clickhouse_inserter.clone(), + "missing clickhouse inserter" + ); + Ok(inserter) + } + pub fn fdb(&self) -> Result { self.0.fdb.clone().ok_or(Error::MissingFdbPool) } diff --git a/packages/common/pools/src/prelude.rs b/packages/common/pools/src/prelude.rs index 02035c454b..48d7a39030 100644 --- a/packages/common/pools/src/prelude.rs +++ b/packages/common/pools/src/prelude.rs @@ -4,7 +4,7 @@ pub use redis; pub use sqlx; pub use crate::{ - ClickHousePool, CrdbPool, FdbPool, NatsPool, RedisPool, SqlitePool, __sql_query, + ClickHouseInserterHandle, ClickHousePool, CrdbPool, FdbPool, NatsPool, RedisPool, SqlitePool, __sql_query, __sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all, sql_fetch_many, sql_fetch_one, sql_fetch_optional, }; diff --git a/packages/core/infra/server/src/run_config.rs b/packages/core/infra/server/src/run_config.rs index 1532c93d10..ab5dd4196f 100644 --- a/packages/core/infra/server/src/run_config.rs +++ b/packages/core/infra/server/src/run_config.rs @@ -271,6 +271,18 @@ pub fn config(rivet_config: rivet_config::Config) -> Result { ), db_name: "db_service_log", }, + SqlService { + kind: SqlServiceKind::ClickHouse, + migrations: include_dir!("$CARGO_MANIFEST_DIR/../../../edge/infra/guard/db/analytics"), + db_name: "db_guard_analytics", + }, + SqlService { + kind: SqlServiceKind::ClickHouse, + migrations: include_dir!( + "$CARGO_MANIFEST_DIR/../../../edge/services/pegboard/db/analytics" + ), + db_name: "db_pegboard_analytics", + }, ]; let s3_buckets = vec![ diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs index e493d9d97d..c8b9337e0a 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/vector.rs @@ -7,6 +7,7 @@ use crate::types::PoolType; pub const TUNNEL_VECTOR_PORT: u16 = 5020; pub const TUNNEL_VECTOR_TCP_JSON_PORT: u16 = 5021; +pub const TUNNEL_VECTOR_HTTP_PORT: u16 = 5022; pub fn install() -> String { include_str!("../files/vector_install.sh").to_string() @@ -32,6 +33,13 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa "api": { "enabled": true }, + "sources": { + "http_events": { + "type": "http_server", + "address": format!("127.0.0.1:{}", TUNNEL_VECTOR_HTTP_PORT), + "encoding": "ndjson" + } + }, "transforms": { "filter_metrics": { "type": "filter", @@ -56,7 +64,7 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa "sinks": { "vector_sink": { "type": "vector", - "inputs": ["metrics_add_meta"], + "inputs": ["metrics_add_meta", "http_events"], "address": format!("127.0.0.1:{}", TUNNEL_VECTOR_PORT), "healthcheck": { "enabled": false @@ -227,3 +235,4 @@ pub fn configure(namespace: &str, config: &Config, pool_type: PoolType) -> Globa Ok(include_str!("../files/vector_configure.sh").replace("__VECTOR_CONFIG__", &config_str)) } + diff --git a/packages/edge/api/actor/src/route/actors.rs b/packages/edge/api/actor/src/route/actors.rs index a148f8bfe6..65f82046df 100644 --- a/packages/edge/api/actor/src/route/actors.rs +++ b/packages/edge/api/actor/src/route/actors.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use api_helper::{anchor::WatchIndexQuery, ctx::Ctx}; use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use util::serde::AsHashableExt; use rivet_api::models; use rivet_convert::{ApiInto, ApiTryInto}; use rivet_operation::prelude::*; @@ -185,7 +186,7 @@ pub async fn create( // args: body.runtime.arguments.unwrap_or_default(), args: Vec::new(), network_mode: network.mode.unwrap_or_default().api_into(), - environment: body.runtime.and_then(|r| r.environment).unwrap_or_default(), + environment: body.runtime.and_then(|r| r.environment).unwrap_or_default().as_hashable(), network_ports: network .ports .unwrap_or_default() @@ -231,7 +232,7 @@ pub async fn create( } } ))) - .collect::>>()?, + .collect::>>()?.as_hashable(), endpoint_type, }) .tag("actor_id", actor_id) diff --git a/packages/edge/infra/guard/core/Cargo.toml b/packages/edge/infra/guard/core/Cargo.toml index cf266606e0..b065aff2e3 100644 --- a/packages/edge/infra/guard/core/Cargo.toml +++ b/packages/edge/infra/guard/core/Cargo.toml @@ -33,6 +33,7 @@ regex = "1.10.3" futures-util = "0.3.30" hyper-tungstenite = "0.17.0" tokio-tungstenite = "0.26.1" +clickhouse-inserter.workspace = true [dev-dependencies] futures-util = "0.3.30" diff --git a/packages/edge/infra/guard/core/src/analytics.rs b/packages/edge/infra/guard/core/src/analytics.rs new file mode 100644 index 0000000000..8f92d99fde --- /dev/null +++ b/packages/edge/infra/guard/core/src/analytics.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +// Properties not currently collected but should be added in future iterations: +// - client_ssl_cipher: Requires TLS connection introspection +// - client_ssl_protocol: Requires TLS connection introspection +// - client_tcp_rtt_ms: Requires network-level measurements +// - client_request_bytes: Total request size including headers +// - service_dns_response_time_ms: Requires DNS timing instrumentation +// - service_ssl_protocol: Requires upstream TLS introspection +// - service_tcp_handshake_duration_ms: Requires connection-level timing +// - service_tls_handshake_duration_ms: Requires TLS handshake timing +// - service_request_header_send_duration_ms: Requires granular timing +// - service_response_header_receive_duration_ms: Requires granular timing +// - guard_response_bytes: Total response size including headers +// - guard_time_to_first_byte_ms: Requires granular timing +// - security_rule_id: Requires security/firewall rule integration + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GuardHttpRequest { + pub request_id: Uuid, + pub client_ip: String, + pub client_request_body_bytes: u64, + pub client_request_host: String, + pub client_request_method: String, + pub client_request_path: String, + pub client_request_protocol: String, + pub client_request_referer: String, + pub client_request_scheme: String, + pub client_request_uri: String, + pub client_request_user_agent: String, + pub client_src_port: u16, + pub client_x_requested_with: String, + pub guard_datacenter_id: Uuid, + pub guard_cluster_id: Uuid, + pub guard_server_id: Uuid, + pub guard_end_timestamp: u64, + pub guard_response_body_bytes: u64, + pub guard_response_content_type: String, + pub guard_response_status: u16, + pub guard_start_timestamp: u64, + pub service_ip: String, + pub service_response_duration_ms: u32, + pub service_response_http_expires: String, + pub service_response_http_last_modified: String, + pub service_response_status: u16, + pub service_actor_id: String, + pub service_server_id: Uuid, +} diff --git a/packages/edge/infra/guard/core/src/lib.rs b/packages/edge/infra/guard/core/src/lib.rs index 61dfbce09c..a773ec3796 100644 --- a/packages/edge/infra/guard/core/src/lib.rs +++ b/packages/edge/infra/guard/core/src/lib.rs @@ -1,6 +1,8 @@ +pub mod analytics; pub mod cert_resolver; pub mod metrics; pub mod proxy_service; +pub mod request_context; mod server; pub mod types; pub mod util; diff --git a/packages/edge/infra/guard/core/src/proxy_service.rs b/packages/edge/infra/guard/core/src/proxy_service.rs index 2ce3905cc4..4146cde7db 100644 --- a/packages/edge/infra/guard/core/src/proxy_service.rs +++ b/packages/edge/infra/guard/core/src/proxy_service.rs @@ -14,7 +14,7 @@ use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use global_error::*; use http_body_util::Full; -use hyper::body::Incoming as BodyIncoming; +use hyper::body::{Body, Incoming as BodyIncoming}; use hyper::header::HeaderName; use hyper::{Request, Response, StatusCode}; use hyper_tungstenite; @@ -29,6 +29,7 @@ use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::metrics; +use crate::request_context::RequestContext; const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for"); const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes @@ -266,6 +267,7 @@ pub struct ProxyState { rate_limiters: Cache<(Uuid, std::net::IpAddr), Arc>>, in_flight_counters: Cache<(Uuid, std::net::IpAddr), Arc>>, port_type: PortType, + clickhouse_inserter: Option, } impl ProxyState { @@ -274,6 +276,7 @@ impl ProxyState { routing_fn: RoutingFn, middleware_fn: MiddlewareFn, port_type: PortType, + clickhouse_inserter: Option, ) -> Self { Self { _config: config, @@ -289,6 +292,7 @@ impl ProxyState { .time_to_live(PROXY_STATE_CACHE_TTL) .build(), port_type, + clickhouse_inserter, } } @@ -601,6 +605,7 @@ impl ProxyService { async fn handle_request( &self, req: Request, + request_context: &mut RequestContext, ) -> GlobalResult>> { let host = req .headers() @@ -616,6 +621,8 @@ impl ProxyService { let start_time = Instant::now(); + // Set request body size in analytics (will be updated with actual size later) + let target_res = self .state .resolve_route(host, &path, self.state.port_type.clone(), false) @@ -641,6 +648,21 @@ impl ProxyService { let actor_id = target.actor_id; + // Update request context with target info + if let Some(actor_id) = actor_id { + request_context.service_actor_id = Some(actor_id); + } + if let Some(server_id) = target.server_id { + request_context.service_server_id = Some(server_id); + } + + // Set service IP from target + if let Ok(target_ip) = + format!("{}:{}", target.host, target.port).parse::() + { + request_context.service_ip = Some(target_ip.ip()); + } + // Extract IP address from remote_addr let client_ip = self.remote_addr.ip(); @@ -674,10 +696,11 @@ impl ProxyService { // Both paths will handle their own metrics and error handling let res = if hyper_tungstenite::is_upgrade_request(&req) { // WebSocket upgrade - self.handle_websocket_upgrade(req, target).await + self.handle_websocket_upgrade(req, target, request_context) + .await } else { // Regular HTTP request - self.handle_http_request(req, target).await + self.handle_http_request(req, target, request_context).await }; let status = match &res { @@ -690,7 +713,7 @@ impl ProxyService { metrics::PROXY_REQUEST_DURATION .with_label_values(&[&status]) .observe(duration_secs); - + metrics::PROXY_REQUEST_PENDING.dec(); res @@ -702,6 +725,55 @@ impl ProxyService { .inc(); } + // Update request context with response details + match &res { + Ok(resp) => { + request_context.guard_response_status = Some(resp.status().as_u16()); + request_context.service_response_status = Some(resp.status().as_u16()); + + if let Some(content_type) = resp + .headers() + .get(hyper::header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + { + request_context.guard_response_content_type = Some(content_type.to_string()); + } + + if let Some(expires) = resp + .headers() + .get(hyper::header::EXPIRES) + .and_then(|h| h.to_str().ok()) + { + request_context.service_response_http_expires = Some(expires.to_string()); + } + + if let Some(last_modified) = resp + .headers() + .get(hyper::header::LAST_MODIFIED) + .and_then(|h| h.to_str().ok()) + { + request_context.service_response_http_last_modified = + Some(last_modified.to_string()); + } + } + Err(_) => { + request_context.guard_response_status = Some(500); + request_context.service_response_status = Some(500); + } + } + + // Set timing information + request_context.service_response_duration_ms = + Some(start_time.elapsed().as_millis() as u32); + + // Insert analytics event asynchronously + let mut context_clone = request_context.clone(); + tokio::spawn(async move { + if let Err(error) = context_clone.insert_event().await { + tracing::warn!(?error, "failed to insert guard analytics event"); + } + }); + res } @@ -710,6 +782,7 @@ impl ProxyService { &self, req: Request, mut target: RouteTarget, + request_context: &mut RequestContext, ) -> GlobalResult>> { // Get middleware config for this actor if it exists let middleware_config = match &target.actor_id { @@ -758,6 +831,9 @@ impl ProxyService { } }; + // Set actual request body size in analytics + request_context.client_request_body_bytes = Some(req_body.len() as u64); + // Set up retry with backoff from middleware config let max_attempts = middleware_config.retry.max_attempts; let initial_interval = middleware_config.retry.initial_interval; @@ -814,8 +890,11 @@ impl ProxyService { }; // Send the request with timeout + let request_send_start = Instant::now(); match timeout(timeout_duration, self.client.request(proxied_req)).await { Ok(Ok(resp)) => { + let response_receive_time = request_send_start.elapsed(); + // Convert the hyper::body::Incoming to http_body_util::Full let (parts, body) = resp.into_parts(); @@ -825,6 +904,9 @@ impl ProxyService { Err(_) => Bytes::new(), }; + // Set actual response body size in analytics + request_context.guard_response_body_bytes = Some(body_bytes.len() as u64); + let full_body = Full::new(body_bytes); return Ok(Response::from_parts(parts, full_body)); } @@ -951,6 +1033,7 @@ impl ProxyService { &self, req: Request, mut target: RouteTarget, + _request_context: &mut RequestContext, ) -> GlobalResult>> { // Get actor and server IDs for metrics and middleware let actor_id = target.actor_id; @@ -1577,7 +1660,10 @@ impl ProxyService { // Process an individual request #[tracing::instrument(skip_all)] pub async fn process(&self, req: Request) -> GlobalResult>> { - // Extract request information for logging before consuming the request + // Create request context for analytics tracking + let mut request_context = RequestContext::new(self.state.clickhouse_inserter.clone()); + + // Extract request information for logging and analytics before consuming the request let host = req .headers() .get(hyper::header::HOST) @@ -1591,7 +1677,6 @@ impl ProxyService { .map(|x| x.to_string()) .unwrap_or_else(|| req.uri().path().to_string()); let method = req.method().clone(); - let request_id = Uuid::new_v4(); let user_agent = req .headers() @@ -1599,9 +1684,43 @@ impl ProxyService { .and_then(|h| h.to_str().ok()) .map(|s| s.to_string()); + // Populate request context with available data + request_context.client_ip = Some(self.remote_addr.ip()); + request_context.client_request_host = Some(host.clone()); + request_context.client_request_method = Some(method.to_string()); + request_context.client_request_path = Some(req.uri().path().to_string()); + request_context.client_request_protocol = Some(format!("{:?}", req.version())); + request_context.client_request_scheme = + Some(req.uri().scheme_str().unwrap_or("http").to_string()); + request_context.client_request_uri = Some(path.clone()); + request_context.client_src_port = Some(self.remote_addr.port()); + + if let Some(referer) = req + .headers() + .get(hyper::header::REFERER) + .and_then(|h| h.to_str().ok()) + { + request_context.client_request_referer = Some(referer.to_string()); + } + + if let Some(ua) = &user_agent { + request_context.client_request_user_agent = Some(ua.clone()); + } + + if let Some(requested_with) = req + .headers() + .get("x-requested-with") + .and_then(|h| h.to_str().ok()) + { + request_context.client_x_requested_with = Some(requested_with.to_string()); + } + + // TLS information would be set here if available (for HTTPS connections) + // This requires TLS connection introspection and is marked for future enhancement + // Debug log request information with structured fields (Apache-like access log) debug!( - request_id = %request_id, + request_id = %request_context.request_id, method = %method, path = %path, host = %host, @@ -1613,7 +1732,7 @@ impl ProxyService { ); // Process the request - let result = self.handle_request(req).await; + let result = self.handle_request(req, &mut request_context).await; match &result { Ok(response) => { @@ -1627,7 +1746,7 @@ impl ProxyService { // Log information about the completed request debug!( - request_id = %request_id, + request_id = %request_context.request_id, method = %method, path = %path, host = %host, @@ -1640,7 +1759,7 @@ impl ProxyService { Err(e) => { // Log error information debug!( - request_id = %request_id, + request_id = %request_context.request_id, method = %method, path = %path, host = %host, @@ -1676,12 +1795,14 @@ impl ProxyServiceFactory { routing_fn: RoutingFn, middleware_fn: MiddlewareFn, port_type: PortType, + clickhouse_inserter: Option, ) -> Self { let state = Arc::new(ProxyState::new( config, routing_fn, middleware_fn, port_type, + clickhouse_inserter, )); Self { state } } diff --git a/packages/edge/infra/guard/core/src/request_context.rs b/packages/edge/infra/guard/core/src/request_context.rs new file mode 100644 index 0000000000..e35c469922 --- /dev/null +++ b/packages/edge/infra/guard/core/src/request_context.rs @@ -0,0 +1,210 @@ +use crate::analytics::GuardHttpRequest; +use global_error::GlobalResult; +use lazy_static::lazy_static; +use std::{net::IpAddr, time::SystemTime}; +use tracing::warn; +use uuid::Uuid; + +// Properties not currently tracked but should be added in future iterations: +// - client_ssl_cipher: Requires TLS connection introspection +// - client_ssl_protocol: Requires TLS connection introspection +// - client_tcp_rtt_ms: Requires network-level measurements +// - client_request_bytes: Total request size including headers +// - service_dns_response_time_ms: Requires DNS timing instrumentation +// - service_ssl_protocol: Requires upstream TLS introspection +// - service_tcp_handshake_duration_ms: Requires connection-level timing +// - service_tls_handshake_duration_ms: Requires TLS handshake timing +// - service_request_header_send_duration_ms: Requires granular timing +// - service_response_header_receive_duration_ms: Requires granular timing +// - guard_response_bytes: Total response size including headers +// - guard_time_to_first_byte_ms: Requires granular timing +// - security_rule_id: Requires security/firewall rule integration + +lazy_static! { + static ref GUARD_IDS: (Option, Option, Option) = { + let datacenter_id = std::env::var("RIVET_DATACENTER_ID") + .ok() + .and_then(|s| s.parse::().ok()); + let cluster_id = std::env::var("RIVET_CLUSTER_ID") + .ok() + .and_then(|s| s.parse::().ok()); + let server_id = std::env::var("RIVET_SERVER_ID") + .ok() + .and_then(|s| s.parse::().ok()); + (datacenter_id, cluster_id, server_id) + }; +} + +#[derive(Clone)] +pub struct RequestContext { + // Request tracking data + pub request_id: Uuid, + pub client_ip: Option, + pub client_request_body_bytes: Option, + pub client_request_host: Option, + pub client_request_method: Option, + pub client_request_path: Option, + pub client_request_protocol: Option, + pub client_request_referer: Option, + pub client_request_scheme: Option, + pub client_request_uri: Option, + pub client_request_user_agent: Option, + pub client_src_port: Option, + pub client_x_requested_with: Option, + + // Guard tracking data + pub guard_datacenter_id: Option, + pub guard_cluster_id: Option, + pub guard_server_id: Option, + pub guard_end_timestamp: Option, + pub guard_response_body_bytes: Option, + pub guard_response_content_type: Option, + pub guard_response_status: Option, + pub guard_start_timestamp: SystemTime, + + // Service tracking data + pub service_ip: Option, + pub service_response_duration_ms: Option, + pub service_response_http_expires: Option, + pub service_response_http_last_modified: Option, + pub service_response_status: Option, + pub service_actor_id: Option, + pub service_server_id: Option, + + // ClickHouse inserter handle + clickhouse_inserter: Option, +} + +impl RequestContext { + pub fn new(clickhouse_inserter: Option) -> Self { + Self::new_with_request_id(Uuid::new_v4(), clickhouse_inserter) + } + + pub fn new_with_request_id( + request_id: Uuid, + clickhouse_inserter: Option, + ) -> Self { + let (datacenter_id, cluster_id, server_id) = *GUARD_IDS; + + Self { + request_id, + client_ip: None, + client_request_body_bytes: None, + client_request_host: None, + client_request_method: None, + client_request_path: None, + client_request_protocol: None, + client_request_referer: None, + client_request_scheme: None, + client_request_uri: None, + client_request_user_agent: None, + client_src_port: None, + client_x_requested_with: None, + guard_datacenter_id: datacenter_id, + guard_cluster_id: cluster_id, + guard_server_id: server_id, + guard_end_timestamp: None, + guard_response_body_bytes: None, + guard_response_content_type: None, + guard_response_status: None, + guard_start_timestamp: SystemTime::now(), + service_ip: None, + service_response_duration_ms: None, + service_response_http_expires: None, + service_response_http_last_modified: None, + service_response_status: None, + service_actor_id: None, + service_server_id: None, + clickhouse_inserter, + } + } + + // Finalize the request and insert analytics event + pub async fn insert_event(&mut self) -> GlobalResult<()> { + let Some(inserter) = &self.clickhouse_inserter else { + return Ok(()); // No inserter available + }; + + // Set end timestamp + self.guard_end_timestamp = Some(SystemTime::now()); + + // Convert IP addresses to strings for ClickHouse IPv4 type + let client_ip = match self.client_ip { + Some(IpAddr::V4(ip)) => ip.to_string(), + Some(IpAddr::V6(_)) => "0.0.0.0".to_string(), // Fallback for IPv6 addresses + None => "0.0.0.0".to_string(), // Default fallback + }; + + let service_ip = match self.service_ip { + Some(IpAddr::V4(ip)) => ip.to_string(), + Some(IpAddr::V6(_)) => "0.0.0.0".to_string(), // Fallback for IPv6 addresses + None => "127.0.0.1".to_string(), // Default fallback + }; + + // Convert SystemTime to nanoseconds since Unix epoch for ClickHouse DateTime64(9) + let guard_start_timestamp = self + .guard_start_timestamp + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + let guard_end_timestamp = self + .guard_end_timestamp + .unwrap_or_else(SystemTime::now) + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + // Build the analytics event inline with defaults for missing values + let analytics_event = GuardHttpRequest { + request_id: self.request_id, + client_ip, + client_request_body_bytes: self.client_request_body_bytes.unwrap_or_default(), + client_request_host: self.client_request_host.clone().unwrap_or_default(), + client_request_method: self.client_request_method.clone().unwrap_or_default(), + client_request_path: self.client_request_path.clone().unwrap_or_default(), + client_request_protocol: self.client_request_protocol.clone().unwrap_or_default(), + client_request_referer: self.client_request_referer.clone().unwrap_or_default(), + client_request_scheme: self.client_request_scheme.clone().unwrap_or_default(), + client_request_uri: self.client_request_uri.clone().unwrap_or_default(), + client_request_user_agent: self.client_request_user_agent.clone().unwrap_or_default(), + client_src_port: self.client_src_port.unwrap_or_default(), + client_x_requested_with: self.client_x_requested_with.clone().unwrap_or_default(), + guard_datacenter_id: self.guard_datacenter_id.unwrap_or_default(), + guard_cluster_id: self.guard_cluster_id.unwrap_or_default(), + guard_server_id: self.guard_server_id.unwrap_or_default(), + guard_end_timestamp, + guard_response_body_bytes: self.guard_response_body_bytes.unwrap_or_default(), + guard_response_content_type: self + .guard_response_content_type + .clone() + .unwrap_or_default(), + guard_response_status: self.guard_response_status.unwrap_or_default(), + guard_start_timestamp, + service_ip, + service_response_duration_ms: self.service_response_duration_ms.unwrap_or_default(), + service_response_http_expires: self + .service_response_http_expires + .clone() + .unwrap_or_default(), + service_response_http_last_modified: self + .service_response_http_last_modified + .clone() + .unwrap_or_default(), + service_response_status: self.service_response_status.unwrap_or_default(), + service_actor_id: self.service_actor_id.unwrap_or_default().to_string(), + service_server_id: self.service_server_id.unwrap_or_default(), + }; + + // Insert the event asynchronously + inserter.insert("db_guard_analytics", "http_requests", analytics_event)?; + + Ok(()) + } +} + +impl std::fmt::Debug for RequestContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RequestContext").finish_non_exhaustive() + } +} diff --git a/packages/edge/infra/guard/core/src/server.rs b/packages/edge/infra/guard/core/src/server.rs index 2a4414bf82..7b35365ac7 100644 --- a/packages/edge/infra/guard/core/src/server.rs +++ b/packages/edge/infra/guard/core/src/server.rs @@ -34,6 +34,7 @@ pub async fn run_server( routing_fn: RoutingFn, middleware_fn: MiddlewareFn, cert_resolver_fn: Option, + clickhouse_inserter: Option, ) -> GlobalResult<()> { // Configure servers for different ports let guard_config = config.guard()?; @@ -45,6 +46,7 @@ pub async fn run_server( routing_fn.clone(), middleware_fn.clone(), crate::proxy_service::PortType::Http, + clickhouse_inserter.clone(), )); let http_listener = tokio::net::TcpListener::bind(http_addr).await?; @@ -57,6 +59,7 @@ pub async fn run_server( routing_fn.clone(), middleware_fn.clone(), crate::proxy_service::PortType::Https, + clickhouse_inserter.clone(), )); let listener = tokio::net::TcpListener::bind(https_addr).await?; diff --git a/packages/edge/infra/guard/db/analytics/migrations/20200101000000_init.down.sql b/packages/edge/infra/guard/db/analytics/migrations/20200101000000_init.down.sql new file mode 100644 index 0000000000..fe06a97e43 --- /dev/null +++ b/packages/edge/infra/guard/db/analytics/migrations/20200101000000_init.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS http_requests; \ No newline at end of file diff --git a/packages/edge/infra/guard/db/analytics/migrations/20200101000000_init.up.sql b/packages/edge/infra/guard/db/analytics/migrations/20200101000000_init.up.sql new file mode 100644 index 0000000000..657744f0c0 --- /dev/null +++ b/packages/edge/infra/guard/db/analytics/migrations/20200101000000_init.up.sql @@ -0,0 +1,37 @@ +CREATE TABLE IF NOT EXISTS http_requests +( + namespace LowCardinality(String), + request_id UUID, + client_ip IPv4, + client_request_body_bytes UInt64, + client_request_host String, + client_request_method LowCardinality(String), + client_request_path String, + client_request_protocol LowCardinality(String), + client_request_referer String, + client_request_scheme LowCardinality(String), + client_request_uri String, + client_request_user_agent String, + client_src_port UInt16, + client_x_requested_with String, + guard_datacenter_id UUID, + guard_cluster_id UUID, + guard_server_id UUID, + guard_end_timestamp DateTime64(9), + guard_response_body_bytes UInt64, + guard_response_content_type String, + guard_response_status UInt16, + guard_start_timestamp DateTime64(9), + service_ip IPv4, + service_response_duration_ms UInt32, + service_response_http_expires String, + service_response_http_last_modified String, + service_response_status UInt16, + service_actor_id String, + service_server_id UUID +) +ENGINE = ReplicatedMergeTree() +PARTITION BY toStartOfHour(guard_start_timestamp) +ORDER BY (namespace, guard_start_timestamp, request_id) +TTL toDate(guard_start_timestamp + toIntervalDay(30)) +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/packages/edge/infra/guard/server/src/main.rs b/packages/edge/infra/guard/server/src/main.rs index 9b6d61b8f3..0441c8495c 100644 --- a/packages/edge/infra/guard/server/src/main.rs +++ b/packages/edge/infra/guard/server/src/main.rs @@ -85,10 +85,13 @@ async fn main_inner() -> GlobalResult<()> { tracing::info!("No TLS configuration found, HTTPS will not be enabled"); } + // Get the ClickHouse inserter from pools if available + let clickhouse_inserter = ctx.op_ctx().pools().clickhouse_inserter().ok(); + // Start the server tracing::info!("starting proxy server"); tokio::select! { - res = rivet_guard_core::run_server(config, routing_fn, middleware_fn, cert_resolver) => { + res = rivet_guard_core::run_server(config, routing_fn, middleware_fn, cert_resolver, clickhouse_inserter) => { if let Err(err) = res { tracing::error!(?err, "Server error"); } diff --git a/packages/edge/services/pegboard/db/actor-log/migrations/20250604022411_actor_log2.down.sql b/packages/edge/services/pegboard/db/actor-log/migrations/20250604022411_actor_log2.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/edge/services/pegboard/db/actor-log/migrations/20250604022411_actor_log2.up.sql b/packages/edge/services/pegboard/db/actor-log/migrations/20250604022411_actor_log2.up.sql new file mode 100644 index 0000000000..acf2fa929b --- /dev/null +++ b/packages/edge/services/pegboard/db/actor-log/migrations/20250604022411_actor_log2.up.sql @@ -0,0 +1,17 @@ +CREATE TABLE IF NOT EXISTS actor_logs2 ( + namespace LowCardinality(String), + actor_id String, + ts DateTime64 (9), + stream_type UInt8, -- pegboard::types::LogsStreamType + message String +) ENGINE = ReplicatedMergeTree () +PARTITION BY + toStartOfHour (ts) +ORDER BY ( + namespace, + actor_id, + toUnixTimestamp (ts), + stream_type +) +TTL toDate (ts + toIntervalDay(14)) +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/packages/edge/services/pegboard/db/actor-log/migrations/20250604022421_actor_log2_mv.down.sql b/packages/edge/services/pegboard/db/actor-log/migrations/20250604022421_actor_log2_mv.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/edge/services/pegboard/db/actor-log/migrations/20250604022421_actor_log2_mv.up.sql b/packages/edge/services/pegboard/db/actor-log/migrations/20250604022421_actor_log2_mv.up.sql new file mode 100644 index 0000000000..3a3f98ce9d --- /dev/null +++ b/packages/edge/services/pegboard/db/actor-log/migrations/20250604022421_actor_log2_mv.up.sql @@ -0,0 +1,36 @@ +CREATE MATERIALIZED VIEW IF NOT EXISTS actor_logs2_with_metadata +( + namespace LowCardinality(String), + actor_id String, + ts DateTime64(9), + stream_type UInt8, -- pegboard::types::LogsStreamType + message String, + project_id UUID, + env_id UUID, + datacenter_id UUID, + tags Map(String, String), + build_id UUID, + client_id UUID, + durable Bool +) +ENGINE = ReplicatedMergeTree() +PARTITION BY (env_id, toStartOfHour(ts)) +ORDER BY (env_id, toUnixTimestamp(ts), actor_id, stream_type) +TTL toDate(ts + toIntervalDay(14)) +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1 +AS SELECT + l.namespace, + l.actor_id, + l.ts, + l.stream_type, + l.message, + a.project_id, + a.env_id, + a.datacenter_id, + a.tags, + a.build_id, + a.client_id, + a.durable +FROM actor_logs2 l +LEFT JOIN db_pegboard_analytics.actors a ON l.actor_id = a.actor_id; + diff --git a/packages/edge/services/pegboard/db/actor-log/migrations/20250604123912_increase_ttl.down.sql b/packages/edge/services/pegboard/db/actor-log/migrations/20250604123912_increase_ttl.down.sql deleted file mode 100644 index 2a7e2f95da..0000000000 --- a/packages/edge/services/pegboard/db/actor-log/migrations/20250604123912_increase_ttl.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE actor_logs - MODIFY TTL toDate(ts + toIntervalDay(3)); diff --git a/packages/edge/services/pegboard/db/actor-log/migrations/20250604123912_increase_ttl.up.sql b/packages/edge/services/pegboard/db/actor-log/migrations/20250604123912_increase_ttl.up.sql deleted file mode 100644 index c84a3717c5..0000000000 --- a/packages/edge/services/pegboard/db/actor-log/migrations/20250604123912_increase_ttl.up.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE actor_logs - MODIFY TTL toDate(ts + toIntervalDay(14)); diff --git a/packages/edge/services/pegboard/db/analytics/migrations/20200101000000_init.down.sql b/packages/edge/services/pegboard/db/analytics/migrations/20200101000000_init.down.sql new file mode 100644 index 0000000000..e29cbba739 --- /dev/null +++ b/packages/edge/services/pegboard/db/analytics/migrations/20200101000000_init.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS actors; \ No newline at end of file diff --git a/packages/edge/services/pegboard/db/analytics/migrations/20200101000000_init.up.sql b/packages/edge/services/pegboard/db/analytics/migrations/20200101000000_init.up.sql new file mode 100644 index 0000000000..b985194c8b --- /dev/null +++ b/packages/edge/services/pegboard/db/analytics/migrations/20200101000000_init.up.sql @@ -0,0 +1,62 @@ +-- row_updated_at = determines the latest version of this row +-- created_at = used in ORDER BY in order mitigate random insertion order +-- ttl_only_drop_parts = 0 means we can't drop by entire partitions bc the destroy ts is not in PARTITION BY +CREATE TABLE IF NOT EXISTS actors +( + namespace LowCardinality(String), + actor_id String, + project_id UUID, + env_id UUID, + datacenter_id UUID, + tags Map(String, String), + build_id UUID, + build_kind UInt8, + build_compression UInt8, + network_mode UInt8, + network_ports Map(String, Tuple( + internal_port UInt16, + routing_guard Bool, + routing_host Bool, + routing_guard_protocol UInt8, + routing_host_protocol UInt8 + )), + network_ports_ingress Map(String, Tuple( + port_number UInt16, + ingress_port_number UInt16, + protocol UInt8 + )), + network_ports_host Map(String, Tuple( + port_number UInt16, + protocol UInt8 + )), + network_ports_proxied Map(String, Tuple( + ip String, + source UInt8 + )), + client_id UUID, + client_wan_hostname String, + selected_cpu_millicores UInt32, + selected_memory_mib UInt32, + root_user_enabled Bool, + env_vars Int64, + env_var_bytes Int64, + args Int64, + args_bytes Int64, + durable Bool, + kill_timeout Int64, + cpu_millicores UInt32, + memory_mib UInt32, + created_at DateTime64(9), + started_at DateTime64(9), + connectable_at DateTime64(9), + finished_at DateTime64(9), + destroyed_at DateTime64(9), + row_updated_at DateTime64(9), + + INDEX idx_actor_id actor_id TYPE bloom_filter GRANULARITY 1 +) +ENGINE = ReplicatedReplacingMergeTree(row_updated_at) +PARTITION BY (namespace, env_id, toStartOfHour(created_at)) +ORDER BY (env_id, created_at, actor_id) +TTL toDate(multiIf(destroyed_at > 0, destroyed_at + toIntervalDay(90), toDateTime64('2099-12-31 23:59:59', 9))) +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 0; diff --git a/packages/edge/services/pegboard/src/ops/actor/log/export.rs b/packages/edge/services/pegboard/src/ops/actor/log/export.rs index aa1fed7edf..cebb0fe3c4 100644 --- a/packages/edge/services/pegboard/src/ops/actor/log/export.rs +++ b/packages/edge/services/pegboard/src/ops/actor/log/export.rs @@ -37,10 +37,21 @@ pub async fn pegboard_actor_log_read(ctx: &OperationCtx, input: &Input) -> Globa actor_id = ? AND stream_type = ? ORDER BY ts ASC + + UNION ALL + + SELECT message + FROM db_pegboard_actor_log.actor_logs2 + WHERE + actor_id = ? AND + stream_type = ? + ORDER BY ts ASC " )) .bind(input.actor_id) .bind(input.stream_type as i8) + .bind(input.actor_id.to_string()) + .bind(input.stream_type as i8) .fetch::()?; let mut lines = 0; diff --git a/packages/edge/services/pegboard/src/ops/actor/log/read.rs b/packages/edge/services/pegboard/src/ops/actor/log/read.rs index 34874598c2..c67e73b5de 100644 --- a/packages/edge/services/pegboard/src/ops/actor/log/read.rs +++ b/packages/edge/services/pegboard/src/ops/actor/log/read.rs @@ -92,11 +92,26 @@ pub async fn pegboard_actor_log_read(ctx: &OperationCtx, input: &Input) -> Globa ts, message, stream_type, - toString(actor_id) as actor_id_str - FROM - db_pegboard_actor_log.actor_logs + actor_id_str + FROM ( + SELECT + ts, + message, + stream_type, + toString(actor_id) as actor_id_str + FROM + db_pegboard_actor_log.actor_logs + UNION ALL + SELECT + ts, + message, + stream_type, + actor_id as actor_id_str + FROM + db_pegboard_actor_log.actor_logs2 + ) WHERE - actor_id IN ? + actor_id_str IN ? AND stream_type IN ? -- Apply timestamp filtering based on query type AND ( @@ -131,10 +146,13 @@ pub async fn pegboard_actor_log_read(ctx: &OperationCtx, input: &Input) -> Globa " ); + // Convert actor IDs to strings for the query + let actor_id_strings: Vec = input.actor_ids.iter().map(|id| id.to_string()).collect(); + // Build query with all parameters and safety restrictions let query_builder = clickhouse .query(&query) - .bind(&input.actor_ids) + .bind(&actor_id_strings) .bind(stream_type_values) // Query type parameters .bind(is_all) diff --git a/packages/edge/services/pegboard/src/workflows/actor/analytics.rs b/packages/edge/services/pegboard/src/workflows/actor/analytics.rs new file mode 100644 index 0000000000..9b42c73697 --- /dev/null +++ b/packages/edge/services/pegboard/src/workflows/actor/analytics.rs @@ -0,0 +1,312 @@ +use chirp_workflow::prelude::*; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct InsertClickHouseInput { + pub actor_id: Uuid, +} + +/// Row to be inserted in to ClickHouse +#[derive(Serialize)] +pub struct ActorClickHouseRow { + actor_id: String, + project_id: Uuid, + env_id: Uuid, + datacenter_id: Uuid, + tags: HashMap, + /// Alias of image_id + build_id: Uuid, + build_kind: i64, + build_compression: i64, + network_mode: i64, + network_ports: HashMap, + network_ports_ingress: HashMap, + network_ports_host: HashMap, + network_ports_proxied: HashMap, + client_id: Uuid, + client_wan_hostname: String, + selected_cpu_millicores: u32, + selected_memory_mib: u32, + root_user_enabled: bool, + env_vars: i64, + env_var_bytes: i64, + args: i64, + args_bytes: i64, + durable: bool, + kill_timeout: i64, + cpu_millicores: i64, + memory_mib: i64, + /// Used in ORDER BY for replacing the key so this must never change. + created_at: i64, + /// This will not be set until after the actor is destroyed because we only insert in to + /// ClickHouse after start & destroy. + /// + /// 0 = not set + started_at: i64, + /// See `started_at`. + connectable_at: i64, + /// See `started_at`. + finished_at: i64, + /// This column is used for configuring the TTL of the actor. + /// + /// 0 = not set + destroyed_at: i64, + row_updated_at: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPort { + /// Will be 0 if not configured + internal_port: u16, + routing_guard: bool, + routing_host: bool, + routing_guard_protocol: i64, + routing_host_protocol: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPortIngress { + port_number: u16, + ingress_port_number: u16, + protocol: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPortHost { + port_number: u16, + protocol: i64, +} + +#[derive(Serialize)] +pub struct ActorClickHouseRowPortProxied { + ip: String, + source: i64, +} + +/// State row to select from SQLite +#[derive(sqlx::FromRow)] +struct StateRow { + project_id: Uuid, + env_id: Uuid, + tags: sqlx::types::Json>, + resources_cpu_millicores: i64, + resources_memory_mib: i64, + selected_resources_cpu_millicores: Option, + selected_resources_memory_mib: Option, + client_id: Option, + client_workflow_id: Option, + client_wan_hostname: Option, + lifecycle_kill_timeout_ms: i64, + lifecycle_durable: bool, + create_ts: i64, + start_ts: Option, + connectable_ts: Option, + finish_ts: Option, + destroy_ts: Option, + image_id: Uuid, + build_kind: i64, + build_compression: i64, + root_user_enabled: bool, + args: sqlx::types::Json>, + network_mode: i64, + environment: sqlx::types::Json>, +} + +/// This activity is idempotent and will upsert the actor row. If we want to change the data in +/// ClickHouse, we need to use this. This gets inserted in to a ReplacingMergeTree so it's safe to +/// update frequently. +#[activity(InsertClickHouse)] +pub async fn insert_clickhouse( + ctx: &ActivityCtx, + input: &InsertClickHouseInput, +) -> GlobalResult<()> { + let Ok(inserter) = ctx.clickhouse_inserter().await else { + return Ok(()); + }; + + let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; + + // Read extra information + let pool = ctx.sqlite().await?; + + // Read state + let state_row = sql_fetch_one!( + [ctx, StateRow, &pool] + " + SELECT + project_id, + env_id, + json(tags) AS tags, + resources_cpu_millicores, + resources_memory_mib, + selected_resources_cpu_millicores, + selected_resources_memory_mib, + client_id, + client_workflow_id, + client_wan_hostname, + lifecycle_kill_timeout_ms, + lifecycle_durable, + create_ts, + start_ts, + connectable_ts, + finish_ts, + destroy_ts, + image_id, + build_kind, + build_compression, + root_user_enabled, + json(args) AS args, + network_mode, + json(environment) AS environment + FROM state + ", + ) + .await?; + + // Read network ports from SQLite tables + let network_ports_data = sql_fetch_all!( + [ctx, (String, Option, i64, String), &pool] + " + SELECT port_name, port_number, protocol, 'ingress' as routing_type FROM ports_ingress + UNION ALL + SELECT port_name, port_number, protocol, 'host' as routing_type FROM ports_host + ", + ) + .await?; + + let network_ports: HashMap = network_ports_data + .into_iter() + .map(|(name, port_number, protocol, routing_type)| { + let (routing_guard, routing_host, routing_guard_protocol, routing_host_protocol) = + match routing_type.as_str() { + "ingress" => (true, false, protocol as i64, 0), + "host" => (false, true, 0, protocol as i64), + _ => (false, false, 0, 0), + }; + + ( + name, + ActorClickHouseRowPort { + internal_port: port_number.unwrap_or_default() as u16, + routing_guard, + routing_host, + routing_guard_protocol, + routing_host_protocol, + }, + ) + }) + .collect(); + + // Read ingress ports + let ingress_ports = sql_fetch_all!( + [ctx, (String, Option, i64, i64), &pool] + "SELECT port_name, port_number, ingress_port_number, protocol FROM ports_ingress", + ) + .await? + .into_iter() + .map(|(name, port_number, ingress_port_number, protocol)| { + ( + name, + ActorClickHouseRowPortIngress { + port_number: port_number.unwrap_or_default() as u16, + ingress_port_number: ingress_port_number as u16, + protocol: protocol as i64, + }, + ) + }) + .collect::>(); + + // Read host ports + let host_ports = sql_fetch_all!( + [ctx, (String, Option, i64), &pool] + "SELECT port_name, port_number, protocol FROM ports_host", + ) + .await? + .into_iter() + .map(|(name, port_number, protocol)| { + ( + name, + ActorClickHouseRowPortHost { + port_number: port_number.unwrap_or_default() as u16, + protocol: protocol as i64, + }, + ) + }) + .collect::>(); + + // Read proxied ports + let proxied_ports = sql_fetch_all!( + [ctx, (String, String, i64), &pool] + "SELECT port_name, ip, source FROM ports_proxied", + ) + .await? + .into_iter() + .map(|(name, ip, source)| { + ( + name, + ActorClickHouseRowPortProxied { + ip, + source: source as i64, + }, + ) + }) + .collect::>(); + + inserter.insert( + "db_pegboard_analytics", + "actors", + ActorClickHouseRow { + actor_id: input.actor_id.to_string(), + project_id: state_row.project_id, + env_id: state_row.env_id, + datacenter_id: dc_id, + tags: state_row.tags.0, + build_id: state_row.image_id, + build_kind: state_row.build_kind, + build_compression: state_row.build_compression, + network_mode: state_row.network_mode as i64, + network_ports, + network_ports_ingress: ingress_ports, + network_ports_host: host_ports, + network_ports_proxied: proxied_ports, + client_id: state_row.client_id.unwrap_or_default(), + client_wan_hostname: state_row.client_wan_hostname.unwrap_or_default(), + selected_cpu_millicores: state_row + .selected_resources_cpu_millicores + .unwrap_or_default() as u32, + selected_memory_mib: state_row.selected_resources_memory_mib.unwrap_or_default() as u32, + root_user_enabled: state_row.root_user_enabled, + env_vars: state_row.environment.len() as i64, + env_var_bytes: state_row.environment + .iter() + .map(|(k, v)| k.len() + v.len()) + .sum::() as i64, + args: state_row.args.len() as i64, + args_bytes: state_row.args.iter().map(|arg| arg.len()).sum::() as i64, + durable: state_row.lifecycle_durable, + kill_timeout: state_row.lifecycle_kill_timeout_ms, + cpu_millicores: state_row.resources_cpu_millicores, + memory_mib: state_row.resources_memory_mib, + created_at: state_row.create_ts * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + started_at: state_row + .start_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + connectable_at: state_row + .connectable_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + finished_at: state_row + .finish_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + destroyed_at: state_row + .destroy_ts + .map(|ts| ts * 1_000_000) + .unwrap_or_default(), + row_updated_at: util::timestamp::now() * 1_000_000, // Convert ms to ns for ClickHouse DateTime64(9) + }, + )?; + + Ok(()) +} diff --git a/packages/edge/services/pegboard/src/workflows/actor/destroy.rs b/packages/edge/services/pegboard/src/workflows/actor/destroy.rs index aaa02b9417..07b73a1e1f 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/destroy.rs @@ -4,7 +4,7 @@ use fdb_util::{FormalKey, SERIALIZABLE}; use foundationdb as fdb; use nix::sys::signal::Signal; -use super::{DestroyComplete, DestroyStarted}; +use super::{analytics::InsertClickHouseInput, DestroyComplete, DestroyStarted}; use crate::{keys, protocol, types::GameGuardProtocol}; #[derive(Debug, Serialize, Deserialize)] @@ -57,6 +57,13 @@ pub(crate) async fn pegboard_actor_destroy( } } + // Update ClickHouse analytics with destroyed timestamp + ctx.v(2) + .activity(InsertClickHouseInput { + actor_id: input.actor_id, + }) + .await?; + ctx.msg(DestroyComplete {}) .tag("actor_id", input.actor_id) .send() diff --git a/packages/edge/services/pegboard/src/workflows/actor/migrations.rs b/packages/edge/services/pegboard/src/workflows/actor/migrations.rs index bf27c3619e..fefbc34d5a 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/migrations.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/migrations.rs @@ -3,6 +3,7 @@ use sqlx::Acquire; pub async fn run(ctx: &mut WorkflowCtx) -> GlobalResult<()> { ctx.activity(MigrateInitInput {}).await?; + ctx.v(2).activity(MigrateExtraMetaInput {}).await?; Ok(()) } @@ -12,6 +13,7 @@ struct MigrateInitInput {} #[activity(MigrateInit)] async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalResult<()> { + // Transactions make migrations atomic let pool = ctx.sqlite().await?; let mut conn = pool.conn().await?; let mut tx = conn.begin().await?; @@ -75,3 +77,28 @@ async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalRes Ok(()) } + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct MigrateExtraMetaInput {} + +#[activity(MigrateExtraMeta)] +async fn migrate_extra_meta(ctx: &ActivityCtx, _input: &MigrateExtraMetaInput) -> GlobalResult<()> { + let pool = ctx.sqlite().await?; + let mut conn = pool.conn().await?; + let mut tx = conn.begin().await?; + + sql_execute!( + [ctx, @tx &mut tx] + " + ALTER TABLE state ADD project_id BLOB DEFAULT X'00000000000000000000000000000000'; -- UUID + ALTER TABLE state ADD root_user_enabled INT DEFAULT false; + ALTER TABLE state ADD build_kind INT DEFAULT -1; + ALTER TABLE state ADD build_compression INT DEFAULT -1; + ", + ) + .await?; + + tx.commit().await?; + + Ok(()) +} diff --git a/packages/edge/services/pegboard/src/workflows/actor/mod.rs b/packages/edge/services/pegboard/src/workflows/actor/mod.rs index 4ab6833411..3600e5b942 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/mod.rs @@ -1,15 +1,15 @@ -use std::collections::HashMap; - +use analytics::InsertClickHouseInput; use chirp_workflow::prelude::*; use destroy::KillCtx; use futures_util::FutureExt; -use util::serde::AsHashableExt; +use rivet_util::serde::HashableMap; use crate::{ protocol, types::{ActorLifecycle, ActorResources, EndpointType, NetworkMode, Routing}, }; +mod analytics; pub mod destroy; mod migrations; mod runtime; @@ -30,19 +30,19 @@ const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5); /// backoff to 0. const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10); -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct Input { pub actor_id: Uuid, pub env_id: Uuid, - pub tags: HashMap, + pub tags: HashableMap, pub resources: ActorResources, pub lifecycle: ActorLifecycle, pub image_id: Uuid, pub root_user_enabled: bool, pub args: Vec, pub network_mode: NetworkMode, - pub environment: HashMap, - pub network_ports: HashMap, + pub environment: HashableMap, + pub network_ports: HashableMap, pub endpoint_type: Option, } @@ -60,14 +60,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul let validation_res = ctx .activity(setup::ValidateInput { env_id: input.env_id, - tags: input.tags.as_hashable(), + tags: input.tags.clone(), resources: input.resources.clone(), image_id: input.image_id, root_user_enabled: input.root_user_enabled, args: input.args.clone(), network_mode: input.network_mode, - environment: input.environment.as_hashable(), - network_ports: input.network_ports.as_hashable(), + environment: input.environment.clone(), + network_ports: input.network_ports.clone(), }) .await?; @@ -85,7 +85,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul let network_ports = ctx .activity(setup::DisableTlsPortsInput { - network_ports: input.network_ports.as_hashable(), + network_ports: input.network_ports.clone(), }) .await?; @@ -122,6 +122,12 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul } }; + ctx.v(2) + .activity(InsertClickHouseInput { + actor_id: input.actor_id, + }) + .await?; + ctx.msg(CreateComplete {}) .tag("actor_id", input.actor_id) .send() @@ -158,6 +164,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul runtime::State::new(res.client_id, res.client_workflow_id, input.image_id), |ctx, state| { let input = input.clone(); + let meta = initial_actor_setup.meta.clone(); async move { let sig = if let Some(drain_timeout_ts) = state.drain_timeout_ts { @@ -269,6 +276,12 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul }) .await?; + ctx.v(2) + .activity(InsertClickHouseInput { + actor_id: input.actor_id, + }) + .await?; + if updated { ctx.msg(Ready {}) .tag("actor_id", input.actor_id) @@ -442,7 +455,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul ctx.workflow(destroy::Input { actor_id: input.actor_id, - build_kind: Some(initial_actor_setup.meta.build_kind), + build_kind: Some(initial_actor_setup.meta.build_kind.clone()), kill: state_res.kill, }) .output() diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index 019384efb9..d9d674252a 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -684,7 +684,7 @@ pub async fn spawn_actor( }, }, root_user_enabled: input.root_user_enabled, - env: input.environment.as_hashable(), + env: input.environment.clone(), ports: ports_res .ports .iter() @@ -724,7 +724,7 @@ pub async fn spawn_actor( metadata: util::serde::Raw::new(&protocol::ActorMetadata { actor: protocol::ActorMetadataActor { actor_id: input.actor_id, - tags: input.tags.as_hashable(), + tags: input.tags.clone(), create_ts: ctx.ts(), }, network: Some(protocol::ActorMetadataNetwork { diff --git a/packages/edge/services/pegboard/src/workflows/actor/setup.rs b/packages/edge/services/pegboard/src/workflows/actor/setup.rs index 19dfc95656..9037a3889a 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/setup.rs @@ -6,7 +6,7 @@ use foundationdb as fdb; use sqlx::Acquire; use util::serde::AsHashableExt; -use super::{Input, Port}; +use super::{analytics::InsertClickHouseInput, Input, Port}; use crate::{ keys, protocol, types::{ActorLifecycle, ActorResources, GameGuardProtocol, NetworkMode, Routing}, @@ -402,6 +402,36 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult GlobalResult<()> { + let pool = ctx.sqlite().await?; + + sql_execute!( + [ctx, pool] + " + UPDATE state + SET + project_id = ?, + build_kind = ?, + build_compression = ?, + root_user_enabled = ? + ", + input.meta.project_id, + input.meta.build_kind as i64, + input.meta.build_compression as i64, + input.root_user_enabled, + ) + .await?; + + Ok(()) +} + #[derive(Debug, Clone, Serialize, Deserialize, Hash)] struct InsertFdbInput { actor_id: Uuid, @@ -454,9 +484,9 @@ async fn insert_fdb(ctx: &ActivityCtx, input: &InsertFdbInput) -> GlobalResult<( } #[derive(Debug, Serialize, Deserialize, Hash)] -struct GetMetaInput { - env_id: Uuid, - image_id: Uuid, +pub struct GetMetaInput { + pub env_id: Uuid, + pub image_id: Uuid, } #[derive(Clone, Debug, Serialize, Deserialize, Hash)] @@ -474,7 +504,7 @@ pub struct GetMetaOutput { } #[activity(GetMeta)] -async fn get_meta(ctx: &ActivityCtx, input: &GetMetaInput) -> GlobalResult { +pub async fn get_meta(ctx: &ActivityCtx, input: &GetMetaInput) -> GlobalResult { let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; let (env_res, build_res, dc_res) = tokio::try_join!( @@ -539,7 +569,7 @@ pub async fn setup( ) -> GlobalResult { let image_id = match setup { SetupCtx::Init { network_ports } => { - let tags = input.tags.as_hashable(); + let tags = input.tags.clone(); let create_ts = ctx .activity(InsertDbInput { actor_id: input.actor_id, @@ -550,7 +580,7 @@ pub async fn setup( image_id: input.image_id, args: input.args.clone(), network_mode: input.network_mode, - environment: input.environment.as_hashable(), + environment: input.environment.clone(), network_ports, }) .await?; @@ -575,6 +605,13 @@ pub async fn setup( }) .await?; + ctx.v(2) + .activity(InsertMetaInput { + meta: meta.clone(), + root_user_enabled: input.root_user_enabled, + }) + .await?; + let (resources, artifacts_res) = ctx .join(( activity(SelectResourcesInput { diff --git a/packages/toolchain/cli/src/commands/actor/create.rs b/packages/toolchain/cli/src/commands/actor/create.rs index 9fbde0fe48..20fc8bc870 100644 --- a/packages/toolchain/cli/src/commands/actor/create.rs +++ b/packages/toolchain/cli/src/commands/actor/create.rs @@ -200,6 +200,9 @@ impl Opts { filter_tags: None, build_tags: Some(build_tags), version: self.version.clone(), + auto_create_routes: None, + auto_sync_routes: None, + non_interactive: false, }) .await?; diff --git a/packages/toolchain/cli/src/commands/deploy.rs b/packages/toolchain/cli/src/commands/deploy.rs index cf1a95b75f..a10a24f510 100644 --- a/packages/toolchain/cli/src/commands/deploy.rs +++ b/packages/toolchain/cli/src/commands/deploy.rs @@ -17,8 +17,21 @@ pub struct Opts { #[clap(long)] extra_tags: Option, - #[clap(long, help = "Override the automatically generated version name")] + /// Override the automatically generated version name + #[clap(long)] version: Option, + + /// Automatically create routes for functions (non-interactive mode) + #[clap(long)] + auto_create_routes: Option, + + /// Automatically sync existing routes with configuration (non-interactive mode) + #[clap(long)] + auto_sync_routes: Option, + + /// Run in non-interactive mode (no prompts) + #[clap(long)] + non_interactive: bool, } impl Opts { @@ -47,6 +60,9 @@ impl Opts { filter_tags: filter_tags, build_tags: build_tags, version: self.version.clone(), + auto_create_routes: self.auto_create_routes, + auto_sync_routes: self.auto_sync_routes, + non_interactive: self.non_interactive, }) .await?; diff --git a/packages/toolchain/cli/src/commands/mod.rs b/packages/toolchain/cli/src/commands/mod.rs index a2ab0ac95d..19d4c92cb0 100644 --- a/packages/toolchain/cli/src/commands/mod.rs +++ b/packages/toolchain/cli/src/commands/mod.rs @@ -9,6 +9,7 @@ pub mod logout; pub mod metadata; pub mod project; pub mod region; +pub mod route; pub mod shell; use anyhow::*; @@ -58,6 +59,12 @@ pub enum SubCommand { #[clap(subcommand)] subcommand: region::SubCommand, }, + /// Commands for managing routes + #[clap(alias = "r", alias = "endpoint")] + Route { + #[clap(subcommand)] + subcommand: route::SubCommand, + }, /// Commands for managing Rivet configuration Config { #[clap(subcommand)] @@ -95,6 +102,7 @@ impl SubCommand { SubCommand::Actor { subcommand } => subcommand.execute().await, SubCommand::Build { subcommand } => subcommand.execute().await, SubCommand::Region { subcommand } => subcommand.execute().await, + SubCommand::Route { subcommand } => subcommand.execute().await, SubCommand::Config { subcommand } => subcommand.execute().await, SubCommand::Metadata { subcommand } => subcommand.execute().await, SubCommand::Deno(opts) => opts.execute().await, diff --git a/packages/toolchain/cli/src/commands/route/endpoint.rs b/packages/toolchain/cli/src/commands/route/endpoint.rs new file mode 100644 index 0000000000..d941854370 --- /dev/null +++ b/packages/toolchain/cli/src/commands/route/endpoint.rs @@ -0,0 +1,161 @@ +use anyhow::*; +use clap::Parser; +use std::collections::HashMap; +use toolchain::{ + rivet_api::{apis, models}, + ToolchainCtx, +}; + +/// Create or update a route endpoint +#[derive(Parser)] +pub struct Opts { + /// Name/ID of the route + name: String, + + /// Specify the environment to deploy to (will prompt if not specified) + #[clap(long, alias = "env", short = 'e')] + environment: Option, + + /// Hostname for the route + #[clap(long)] + hostname: Option, + + /// Path for the route + #[clap(long)] + path: Option, + + /// Route subpaths to the function (true/false) + #[clap(long)] + route_subpaths: Option, + + /// Strip prefix from the route (true/false) + #[clap(long)] + strip_prefix: Option, + + /// Selector tags in key=value comma-separated format (e.g. type=function,function=my-function) + #[clap(long)] + selector_tags: Option, +} + +impl Opts { + pub async fn execute(&self) -> Result<()> { + let ctx = crate::util::login::load_or_login().await?; + let env = crate::util::env::get_or_select(&ctx, self.environment.as_ref()).await?; + + // Get existing route if it exists + let route = get_route(&ctx, &env, &self.name).await?; + + // Parse selector tags + let selector_tags = self + .selector_tags + .as_ref() + .map(|tags| kv_str::from_str::>(tags)) + .transpose() + .context("Failed to parse selector tags")?; + + // Build route update body + let mut update_route_body = models::RoutesUpdateRouteBody { + hostname: route.as_ref().map(|r| r.hostname.clone()).unwrap_or_else(|| { + // Default hostname is project-env.domain + format!( + "{}-{}.{}", + ctx.project.name_id, + env, + ctx.bootstrap + .domains + .job + .as_ref() + .expect("bootstrap.domains.job") + ) + }), + path: route.as_ref().map(|r| r.path.clone()).unwrap_or_else(|| "/".to_string()), + route_subpaths: route.as_ref().map(|r| r.route_subpaths).unwrap_or(true), + strip_prefix: route.as_ref().map(|r| r.strip_prefix).unwrap_or(true), + target: Box::new(models::RoutesRouteTarget { + actors: Some(Box::new(models::RoutesRouteTargetActors { + selector_tags: route + .as_ref() + .and_then(|r| r.target.actors.as_ref().map(|a| a.selector_tags.clone())) + .unwrap_or_else(|| { + // Default selector tags for functions + let mut tags = HashMap::new(); + tags.insert("type".to_string(), "function".to_string()); + tags.insert("function".to_string(), self.name.clone()); + tags + }), + })), + }), + }; + + // Override with any provided options + if let Some(hostname) = &self.hostname { + update_route_body.hostname = hostname.clone(); + } + + if let Some(path) = &self.path { + update_route_body.path = path.clone(); + } + + if let Some(route_subpaths) = self.route_subpaths { + update_route_body.route_subpaths = route_subpaths; + } + + if let Some(strip_prefix) = self.strip_prefix { + update_route_body.strip_prefix = strip_prefix; + } + + if let Some(tags) = selector_tags { + if let Some(actors) = &mut update_route_body.target.actors { + actors.selector_tags = tags; + } + } + + // Create/update route + let result = apis::routes_api::routes_update( + &ctx.openapi_config_cloud, + &self.name, + update_route_body.clone(), + Some(&ctx.project.name_id.to_string()), + Some(&env), + ) + .await; + + match result { + Result::Ok(_) => { + println!( + "Successfully {} route: {}{}", + if route.is_some() { "updated" } else { "created" }, + update_route_body.hostname, + update_route_body.path + ); + Ok(()) + } + Err(err) => { + eprintln!("Failed to {}: {}", + if route.is_some() { "update route" } else { "create route" }, + err + ); + Err(err.into()) + } + } + } +} + +// Helper function to get route if it exists +async fn get_route(ctx: &ToolchainCtx, env: &str, route_id: &str) -> Result> { + let routes_response = apis::routes_api::routes_list( + &ctx.openapi_config_cloud, + Some(&ctx.project.name_id.to_string()), + Some(env), + ) + .await?; + + // Find route that matches the ID + let matching_route = routes_response + .routes + .iter() + .find(|route| route.id == *route_id) + .cloned(); + + Ok(matching_route) +} \ No newline at end of file diff --git a/packages/toolchain/cli/src/commands/route/list.rs b/packages/toolchain/cli/src/commands/route/list.rs new file mode 100644 index 0000000000..3e3ea0d4f9 --- /dev/null +++ b/packages/toolchain/cli/src/commands/route/list.rs @@ -0,0 +1,42 @@ +use anyhow::*; +use clap::Parser; +use toolchain::rivet_api::apis; + +/// List all routes for an environment +#[derive(Parser)] +pub struct Opts { + /// Specify the environment to list routes for (will prompt if not specified) + #[clap(long, alias = "env", short = 'e')] + environment: Option, +} + +impl Opts { + pub async fn execute(&self) -> Result<()> { + let ctx = crate::util::login::load_or_login().await?; + let env = crate::util::env::get_or_select(&ctx, self.environment.as_ref()).await?; + + // Get routes + let routes_response = apis::routes_api::routes_list( + &ctx.openapi_config_cloud, + Some(&ctx.project.name_id.to_string()), + Some(&env), + ) + .await?; + + if routes_response.routes.is_empty() { + println!("No routes found for environment '{}'", env); + return Ok(()); + } + + println!("Routes for environment '{}':", env); + println!(); + println!("{:<20} {:<40} {:<40}", "ID", "HOSTNAME", "PATH"); + println!("{:<20} {:<40} {:<40}", "----", "--------", "----"); + + for route in routes_response.routes { + println!("{:<20} {:<40} {:<40}", route.id, route.hostname, route.path); + } + + Ok(()) + } +} \ No newline at end of file diff --git a/packages/toolchain/cli/src/commands/route/mod.rs b/packages/toolchain/cli/src/commands/route/mod.rs new file mode 100644 index 0000000000..527d5486e8 --- /dev/null +++ b/packages/toolchain/cli/src/commands/route/mod.rs @@ -0,0 +1,24 @@ +use anyhow::*; +use clap::Parser; + +pub mod endpoint; +pub mod list; + +/// Commands for managing routes +#[derive(Parser)] +pub enum SubCommand { + /// List all routes + List(list::Opts), + /// Create or update an endpoint (route) + #[clap(alias = "ep")] + Endpoint(endpoint::Opts), +} + +impl SubCommand { + pub async fn execute(&self) -> Result<()> { + match self { + SubCommand::List(opts) => opts.execute().await, + SubCommand::Endpoint(opts) => opts.execute().await, + } + } +} \ No newline at end of file diff --git a/packages/toolchain/cli/src/util/deploy.rs b/packages/toolchain/cli/src/util/deploy.rs index f10942b6f6..db376cffa0 100644 --- a/packages/toolchain/cli/src/util/deploy.rs +++ b/packages/toolchain/cli/src/util/deploy.rs @@ -19,6 +19,9 @@ pub struct DeployOpts<'a> { pub filter_tags: Option>, pub build_tags: Option>, pub version: Option, + pub auto_create_routes: Option, + pub auto_sync_routes: Option, + pub non_interactive: bool, } pub async fn deploy(opts: DeployOpts<'_>) -> Result> { @@ -65,7 +68,15 @@ pub async fn deploy(opts: DeployOpts<'_>) -> Result> { .await?; // Setup function routes - setup_function_routes(opts.ctx, environment, &config, &opts.filter_tags).await?; + setup_function_routes( + opts.ctx, + environment, + &config, + &opts.filter_tags, + opts.auto_create_routes, + opts.auto_sync_routes, + opts.non_interactive + ).await?; // Print summary print_summary(opts.ctx, environment); @@ -78,6 +89,9 @@ async fn setup_function_routes( environment: &toolchain::project::environment::TEMPEnvironment, config: &config::Config, filter_tags: &Option>, + auto_create_routes: Option, + auto_sync_routes: Option, + non_interactive: bool, ) -> Result<()> { // Determine default hostname based on project & env let default_hostname = format!( @@ -190,19 +204,38 @@ async fn setup_function_routes( ]; println!(); - let choice = block_in_place(|| { - Select::new( - &format!( - "Route configuration for '{fn_name}' has changed{}", - changes_text - ), - options.to_vec(), - ) - .with_starting_cursor(0) - .prompt() - })?; + + let choice_index = if non_interactive { + // In non-interactive mode, use auto_sync_routes if provided, otherwise sync by default + if let Some(auto_sync) = auto_sync_routes { + if auto_sync { + println!("Auto-syncing route configuration for '{fn_name}' (non-interactive mode)"); + 0 // Sync route with config + } else { + println!("Skipping route sync for '{fn_name}' (non-interactive mode)"); + 1 // Keep existing route + } + } else { + println!("Auto-syncing route configuration for '{fn_name}' (non-interactive mode)"); + 0 // Default to sync in non-interactive mode + } + } else { + // Interactive mode - prompt the user + let choice = block_in_place(|| { + Select::new( + &format!( + "Route configuration for '{fn_name}' has changed{}", + changes_text + ), + options.to_vec(), + ) + .with_starting_cursor(0) + .prompt() + })?; + choice.index + }; - match choice.index { + match choice_index { 0 => { // Update first matching route to match config let mut update_route_body = models::RoutesUpdateRouteBody { @@ -268,17 +301,36 @@ async fn setup_function_routes( ]; println!(); - let choice = block_in_place(|| { - Select::new( - &format!("Set up routing for function '{}':", fn_name), - options.to_vec(), - ) - .with_help_message("Routes can be manually created in the Rivet dashboard") - .with_starting_cursor(0) - .prompt() - })?; - - match choice.index { + + let choice_index = if non_interactive { + // In non-interactive mode, use auto_create_routes if provided, otherwise create by default + if let Some(auto_create) = auto_create_routes { + if auto_create { + println!("Auto-creating route for function '{fn_name}' (non-interactive mode)"); + 0 // Create default route + } else { + println!("Skipping route creation for '{fn_name}' (non-interactive mode)"); + 1 // Skip route creation + } + } else { + println!("Auto-creating route for function '{fn_name}' (non-interactive mode)"); + 0 // Default to create in non-interactive mode + } + } else { + // Interactive mode - prompt the user + let choice = block_in_place(|| { + Select::new( + &format!("Set up routing for function '{}':", fn_name), + options.to_vec(), + ) + .with_help_message("Routes can be manually created in the Rivet dashboard") + .with_starting_cursor(0) + .prompt() + })?; + choice.index + }; + + match choice_index { 0 => { // Create route with default settings create_function_route( @@ -384,4 +436,4 @@ async fn create_function_route( } Ok(()) -} +} \ No newline at end of file diff --git a/site/src/content/docs/self-hosting/server-spec.json b/site/src/content/docs/self-hosting/server-spec.json index 4f872f87cb..280b880e40 100644 --- a/site/src/content/docs/self-hosting/server-spec.json +++ b/site/src/content/docs/self-hosting/server-spec.json @@ -268,6 +268,14 @@ "$ref": "#/definitions/Turnstile" } ] + }, + "vector_http": { + "default": null, + "allOf": [ + { + "$ref": "#/definitions/VectorHttp" + } + ] } }, "additionalProperties": false, @@ -605,15 +613,7 @@ }, "ClusterPoolAts": { "type": "object", - "required": [ - "autoscale_margin" - ], "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, "firewall_rules": { "type": "array", "items": { @@ -643,15 +643,7 @@ }, "ClusterPoolGg": { "type": "object", - "required": [ - "autoscale_margin" - ], "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, "firewall_rules": { "type": "array", "items": { @@ -666,15 +658,7 @@ }, "ClusterPoolGuard": { "type": "object", - "required": [ - "autoscale_margin" - ], "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, "firewall_rules": { "type": "array", "items": { @@ -689,29 +673,11 @@ }, "ClusterPoolJob": { "type": "object", - "required": [ - "autoscale_margin" - ], - "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - } - }, "additionalProperties": false }, "ClusterPoolNats": { "type": "object", - "required": [ - "autoscale_margin" - ], "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, "firewall_rules": { "type": "array", "items": { @@ -727,15 +693,7 @@ "ClusterPoolPegboard": { "description": "These port range values will be pass to the Rivet Clients to choose ports & are used to provision firewalls.", "type": "object", - "required": [ - "autoscale_margin" - ], "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, "firewall_rules": { "type": "array", "items": { @@ -775,15 +733,7 @@ }, "ClusterPoolWorker": { "type": "object", - "required": [ - "autoscale_margin" - ], "properties": { - "autoscale_margin": { - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, "firewall_rules": { "type": "array", "items": { @@ -871,7 +821,7 @@ "format": "uri" }, "pools": { - "description": "Configuration for server pools that use a margin for scaling.", + "description": "Configuration for server pools.", "allOf": [ { "$ref": "#/definitions/ClusterPools" @@ -1988,6 +1938,24 @@ } }, "additionalProperties": false + }, + "VectorHttp": { + "type": "object", + "required": [ + "host", + "port" + ], + "properties": { + "host": { + "type": "string" + }, + "port": { + "type": "integer", + "format": "uint16", + "minimum": 0.0 + } + }, + "additionalProperties": false } } } \ No newline at end of file