diff --git a/app/api/v1/endpoints/artifacts.py b/app/api/v1/endpoints/artifacts.py index 71ea07a..171da84 100644 --- a/app/api/v1/endpoints/artifacts.py +++ b/app/api/v1/endpoints/artifacts.py @@ -7,7 +7,7 @@ from app.core.grpc_clients import get_artifact_storage_client from app.grpc.clients.artifact_storage_client import ArtifactStorageClient from app.api import deps -from app.models.user import User +from app.models.base import User router = APIRouter() diff --git a/app/core/config.py b/app/core/config.py index 6f14923..a122364 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -51,9 +51,9 @@ class Settings(BaseSettings): refresh_token_expire_days: int = Field(default=7, alias="REFRESH_TOKEN_EXPIRE_DAYS") # CORS - # Comma-separated list of allowed origins + # Handles both JSON lists and comma-separated strings cors_origins: Any = Field( - default=["http://localhost:3000", "http://localhost:8000"], + default="http://localhost:3000,http://localhost:8000", alias="CORS_ORIGINS", ) cors_allow_credentials: bool = Field(default=True, alias="CORS_ALLOW_CREDENTIALS") @@ -64,7 +64,7 @@ class Settings(BaseSettings): # NATS Message Bus nats_servers: Any = Field( - default=["nats://localhost:4222"], + default="nats://localhost:4222", alias="NATS_SERVERS", ) nats_cluster_id: str = Field(default="xether-cluster", alias="NATS_CLUSTER_ID") @@ -74,23 +74,38 @@ class Settings(BaseSettings): artifact_storage_grpc_port: int = Field(default=50051, alias="ARTIFACT_STORAGE_GRPC_PORT") artifact_storage_http_port: int = Field(default=8080, alias="ARTIFACT_STORAGE_HTTP_PORT") - @field_validator("cors_origins", mode="before") + @field_validator("cors_origins", mode="plain") @classmethod - def parse_cors_origins(cls, v: Any) -> list[str]: - """Parse CORS origins from comma-separated string or list.""" + def parse_cors_origins(cls, v: Any) -> Any: + """Parse CORS origins with robustness against pydantic-settings parsing.""" if isinstance(v, str): - return [origin.strip() for origin in v.split(",")] - elif isinstance(v, list): + # Clean up potential shell quotes + v = v.strip("'").strip('"') + if v.startswith("[") and v.endswith("]"): + try: + import json + return json.loads(v) + except Exception: + pass + return [origin.strip() for origin in v.split(",") if origin.strip()] + if isinstance(v, list): return v return ["http://localhost:3000", "http://localhost:8000"] - @field_validator("nats_servers", mode="before") + @field_validator("nats_servers", mode="plain") @classmethod - def parse_nats_servers(cls, v: Any) -> list[str]: - """Parse NATS servers from comma-separated string or list.""" + def parse_nats_servers(cls, v: Any) -> Any: + """Parse NATS servers with robustness.""" if isinstance(v, str): - return [server.strip() for server in v.split(",")] - elif isinstance(v, list): + v = v.strip("'").strip('"') + if v.startswith("[") and v.endswith("]"): + try: + import json + return json.loads(v) + except Exception: + pass + return [server.strip() for server in v.split(",") if server.strip()] + if isinstance(v, list): return v return ["nats://localhost:4222"] @@ -108,4 +123,5 @@ def redis_url_str(self) -> str: @lru_cache def get_settings() -> Settings: """Get cached settings instance.""" + # print("DEBUG: Loading settings...") return Settings() # type: ignore[call-arg] diff --git a/app/grpc/generated/artifact_pb2.py b/app/grpc/generated/artifact_pb2.py new file mode 100644 index 0000000..dfe36d4 --- /dev/null +++ b/app/grpc/generated/artifact_pb2.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: artifact.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'artifact.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0e\x61rtifact.proto\x12\x0b\x61rtifact.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\x84\x03\n\x08\x41rtifact\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06\x62ucket\x18\x03 \x01(\t\x12\x0b\n\x03key\x18\x04 \x01(\t\x12\x0c\n\x04size\x18\x05 \x01(\x03\x12\x14\n\x0c\x63ontent_type\x18\x06 \x01(\t\x12\x17\n\x0f\x63hecksum_sha256\x18\x07 \x01(\t\x12\x17\n\nversion_id\x18\x08 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x0bpipeline_id\x18\t \x01(\tH\x01\x88\x01\x01\x12\x19\n\x0c\x65xecution_id\x18\n \x01(\tH\x02\x88\x01\x01\x12\x17\n\nproject_id\x18\x0b \x01(\tH\x03\x88\x01\x01\x12.\n\ncreated_at\x18\x0c \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12.\n\nupdated_at\x18\r \x01(\x0b\x32\x1a.google.protobuf.TimestampB\r\n\x0b_version_idB\x0e\n\x0c_pipeline_idB\x0f\n\r_execution_idB\r\n\x0b_project_id\"\xed\x01\n\x10UploadURLRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06\x62ucket\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\x12\x14\n\x0c\x63ontent_type\x18\x04 \x01(\t\x12\x1a\n\x12\x65xpires_in_seconds\x18\x05 \x01(\x05\x12\x18\n\x0bpipeline_id\x18\x06 \x01(\tH\x00\x88\x01\x01\x12\x19\n\x0c\x65xecution_id\x18\x07 \x01(\tH\x01\x88\x01\x01\x12\x17\n\nproject_id\x18\x08 \x01(\tH\x02\x88\x01\x01\x42\x0e\n\x0c_pipeline_idB\x0f\n\r_execution_idB\r\n\x0b_project_id\"<\n\x11UploadURLResponse\x12\x13\n\x0b\x61rtifact_id\x18\x01 \x01(\t\x12\x12\n\nupload_url\x18\x02 \x01(\t\"`\n\x15\x43ompleteUploadRequest\x12\x13\n\x0b\x61rtifact_id\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x10\n\x08\x63hecksum\x18\x03 \x01(\t\x12\x12\n\nversion_id\x18\x04 \x01(\t\"(\n\x16\x43ompleteUploadResponse\x12\x0e\n\x06status\x18\x01 \x01(\t\"E\n\x12\x44ownloadURLRequest\x12\x13\n\x0b\x61rtifact_id\x18\x01 \x01(\t\x12\x1a\n\x12\x65xpires_in_seconds\x18\x02 \x01(\x05\"+\n\x13\x44ownloadURLResponse\x12\x14\n\x0c\x64ownload_url\x18\x01 \x01(\t\")\n\x12GetMetadataRequest\x12\x13\n\x0b\x61rtifact_id\x18\x01 \x01(\t\"h\n\x14ListArtifactsRequest\x12\x17\n\nproject_id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x18\n\x0bpipeline_id\x18\x02 \x01(\tH\x01\x88\x01\x01\x42\r\n\x0b_project_idB\x0e\n\x0c_pipeline_id\"A\n\x15ListArtifactsResponse\x12(\n\tartifacts\x18\x01 \x03(\x0b\x32\x15.artifact.v1.Artifact2\xaf\x03\n\x0f\x41rtifactService\x12M\n\x0cGetUploadURL\x12\x1d.artifact.v1.UploadURLRequest\x1a\x1e.artifact.v1.UploadURLResponse\x12Y\n\x0e\x43ompleteUpload\x12\".artifact.v1.CompleteUploadRequest\x1a#.artifact.v1.CompleteUploadResponse\x12S\n\x0eGetDownloadURL\x12\x1f.artifact.v1.DownloadURLRequest\x1a .artifact.v1.DownloadURLResponse\x12\x45\n\x0bGetMetadata\x12\x1f.artifact.v1.GetMetadataRequest\x1a\x15.artifact.v1.Artifact\x12V\n\rListArtifacts\x12!.artifact.v1.ListArtifactsRequest\x1a\".artifact.v1.ListArtifactsResponseB?Z=github.com/xether-ai/artifact-storage/api/proto/v1;artifactv1b\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'artifact_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'Z=github.com/xether-ai/artifact-storage/api/proto/v1;artifactv1' + _globals['_ARTIFACT']._serialized_start=65 + _globals['_ARTIFACT']._serialized_end=453 + _globals['_UPLOADURLREQUEST']._serialized_start=456 + _globals['_UPLOADURLREQUEST']._serialized_end=693 + _globals['_UPLOADURLRESPONSE']._serialized_start=695 + _globals['_UPLOADURLRESPONSE']._serialized_end=755 + _globals['_COMPLETEUPLOADREQUEST']._serialized_start=757 + _globals['_COMPLETEUPLOADREQUEST']._serialized_end=853 + _globals['_COMPLETEUPLOADRESPONSE']._serialized_start=855 + _globals['_COMPLETEUPLOADRESPONSE']._serialized_end=895 + _globals['_DOWNLOADURLREQUEST']._serialized_start=897 + _globals['_DOWNLOADURLREQUEST']._serialized_end=966 + _globals['_DOWNLOADURLRESPONSE']._serialized_start=968 + _globals['_DOWNLOADURLRESPONSE']._serialized_end=1011 + _globals['_GETMETADATAREQUEST']._serialized_start=1013 + _globals['_GETMETADATAREQUEST']._serialized_end=1054 + _globals['_LISTARTIFACTSREQUEST']._serialized_start=1056 + _globals['_LISTARTIFACTSREQUEST']._serialized_end=1160 + _globals['_LISTARTIFACTSRESPONSE']._serialized_start=1162 + _globals['_LISTARTIFACTSRESPONSE']._serialized_end=1227 + _globals['_ARTIFACTSERVICE']._serialized_start=1230 + _globals['_ARTIFACTSERVICE']._serialized_end=1661 +# @@protoc_insertion_point(module_scope) diff --git a/app/grpc/generated/artifact_pb2_grpc.py b/app/grpc/generated/artifact_pb2_grpc.py new file mode 100644 index 0000000..8179a7e --- /dev/null +++ b/app/grpc/generated/artifact_pb2_grpc.py @@ -0,0 +1,269 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from . import artifact_pb2 as artifact__pb2 + +GRPC_GENERATED_VERSION = '1.78.1' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + ' but the generated code in artifact_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class ArtifactServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.GetUploadURL = channel.unary_unary( + '/artifact.v1.ArtifactService/GetUploadURL', + request_serializer=artifact__pb2.UploadURLRequest.SerializeToString, + response_deserializer=artifact__pb2.UploadURLResponse.FromString, + _registered_method=True) + self.CompleteUpload = channel.unary_unary( + '/artifact.v1.ArtifactService/CompleteUpload', + request_serializer=artifact__pb2.CompleteUploadRequest.SerializeToString, + response_deserializer=artifact__pb2.CompleteUploadResponse.FromString, + _registered_method=True) + self.GetDownloadURL = channel.unary_unary( + '/artifact.v1.ArtifactService/GetDownloadURL', + request_serializer=artifact__pb2.DownloadURLRequest.SerializeToString, + response_deserializer=artifact__pb2.DownloadURLResponse.FromString, + _registered_method=True) + self.GetMetadata = channel.unary_unary( + '/artifact.v1.ArtifactService/GetMetadata', + request_serializer=artifact__pb2.GetMetadataRequest.SerializeToString, + response_deserializer=artifact__pb2.Artifact.FromString, + _registered_method=True) + self.ListArtifacts = channel.unary_unary( + '/artifact.v1.ArtifactService/ListArtifacts', + request_serializer=artifact__pb2.ListArtifactsRequest.SerializeToString, + response_deserializer=artifact__pb2.ListArtifactsResponse.FromString, + _registered_method=True) + + +class ArtifactServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def GetUploadURL(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CompleteUpload(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetDownloadURL(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetMetadata(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ListArtifacts(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ArtifactServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'GetUploadURL': grpc.unary_unary_rpc_method_handler( + servicer.GetUploadURL, + request_deserializer=artifact__pb2.UploadURLRequest.FromString, + response_serializer=artifact__pb2.UploadURLResponse.SerializeToString, + ), + 'CompleteUpload': grpc.unary_unary_rpc_method_handler( + servicer.CompleteUpload, + request_deserializer=artifact__pb2.CompleteUploadRequest.FromString, + response_serializer=artifact__pb2.CompleteUploadResponse.SerializeToString, + ), + 'GetDownloadURL': grpc.unary_unary_rpc_method_handler( + servicer.GetDownloadURL, + request_deserializer=artifact__pb2.DownloadURLRequest.FromString, + response_serializer=artifact__pb2.DownloadURLResponse.SerializeToString, + ), + 'GetMetadata': grpc.unary_unary_rpc_method_handler( + servicer.GetMetadata, + request_deserializer=artifact__pb2.GetMetadataRequest.FromString, + response_serializer=artifact__pb2.Artifact.SerializeToString, + ), + 'ListArtifacts': grpc.unary_unary_rpc_method_handler( + servicer.ListArtifacts, + request_deserializer=artifact__pb2.ListArtifactsRequest.FromString, + response_serializer=artifact__pb2.ListArtifactsResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'artifact.v1.ArtifactService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('artifact.v1.ArtifactService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class ArtifactService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def GetUploadURL(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/artifact.v1.ArtifactService/GetUploadURL', + artifact__pb2.UploadURLRequest.SerializeToString, + artifact__pb2.UploadURLResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def CompleteUpload(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/artifact.v1.ArtifactService/CompleteUpload', + artifact__pb2.CompleteUploadRequest.SerializeToString, + artifact__pb2.CompleteUploadResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetDownloadURL(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/artifact.v1.ArtifactService/GetDownloadURL', + artifact__pb2.DownloadURLRequest.SerializeToString, + artifact__pb2.DownloadURLResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetMetadata(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/artifact.v1.ArtifactService/GetMetadata', + artifact__pb2.GetMetadataRequest.SerializeToString, + artifact__pb2.Artifact.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def ListArtifacts(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/artifact.v1.ArtifactService/ListArtifacts', + artifact__pb2.ListArtifactsRequest.SerializeToString, + artifact__pb2.ListArtifactsResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/app/services/pipeline_consumer.py b/app/services/pipeline_consumer.py index 0f64558..5ba5e19 100644 --- a/app/services/pipeline_consumer.py +++ b/app/services/pipeline_consumer.py @@ -8,7 +8,7 @@ from app.core.events import get_event_bus from app.db.session import AsyncSessionLocal -from app.models.pipeline import PipelineExecution +from app.models.base import PipelineExecution logger = logging.getLogger(__name__) diff --git a/proto/artifact.proto b/proto/artifact.proto new file mode 100644 index 0000000..b49986e --- /dev/null +++ b/proto/artifact.proto @@ -0,0 +1,80 @@ +syntax = "proto3"; + +package artifact.v1; + +option go_package = "github.com/xether-ai/artifact-storage/api/proto/v1;artifactv1"; + +import "google/protobuf/timestamp.proto"; + +service ArtifactService { + rpc GetUploadURL(UploadURLRequest) returns (UploadURLResponse); + rpc CompleteUpload(CompleteUploadRequest) returns (CompleteUploadResponse); + rpc GetDownloadURL(DownloadURLRequest) returns (DownloadURLResponse); + rpc GetMetadata(GetMetadataRequest) returns (Artifact); + rpc ListArtifacts(ListArtifactsRequest) returns (ListArtifactsResponse); +} + +message Artifact { + string id = 1; + string name = 2; + string bucket = 3; + string key = 4; + int64 size = 5; + string content_type = 6; + string checksum_sha256 = 7; + optional string version_id = 8; + optional string pipeline_id = 9; + optional string execution_id = 10; + optional string project_id = 11; + google.protobuf.Timestamp created_at = 12; + google.protobuf.Timestamp updated_at = 13; +} + +message UploadURLRequest { + string name = 1; + string bucket = 2; + string key = 3; + string content_type = 4; + int32 expires_in_seconds = 5; + optional string pipeline_id = 6; + optional string execution_id = 7; + optional string project_id = 8; +} + +message UploadURLResponse { + string artifact_id = 1; + string upload_url = 2; +} + +message CompleteUploadRequest { + string artifact_id = 1; + int64 size = 2; + string checksum = 3; + string version_id = 4; +} + +message CompleteUploadResponse { + string status = 1; +} + +message DownloadURLRequest { + string artifact_id = 1; + int32 expires_in_seconds = 2; +} + +message DownloadURLResponse { + string download_url = 1; +} + +message GetMetadataRequest { + string artifact_id = 1; +} + +message ListArtifactsRequest { + optional string project_id = 1; + optional string pipeline_id = 2; +} + +message ListArtifactsResponse { + repeated Artifact artifacts = 1; +}