diff --git a/backend/backend/core/routers/connection/views.py b/backend/backend/core/routers/connection/views.py index 0de8ddc..296abb7 100644 --- a/backend/backend/core/routers/connection/views.py +++ b/backend/backend/core/routers/connection/views.py @@ -10,6 +10,8 @@ from backend.core.utils import handle_http_request from backend.utils.constants import HTTPMethods from rbac.factory import handle_permission +from visitran.events.functions import fire_event +from visitran.events.types import ConnectionCreated, ConnectionTested, ConnectionDeletedEvt, ConnectionDeleteFailedEvt RESOURCE_NAME = "connectiondetails" @@ -42,6 +44,10 @@ def create_connection(request: Request) -> Response: connection_data = con_context.create_connection( connection_details=request_payload, force_create=bool(force_create) ) + fire_event(ConnectionCreated( + connection_name=request_payload.get("name", ""), + datasource=request_payload.get("datasource_name", ""), + )) response_data = {"status": "success", "data": connection_data} return Response(data=response_data, status=status.HTTP_200_OK) @@ -114,11 +120,27 @@ def connection_usage(request: Request, connection_id: str) -> Response: @handle_http_request @handle_permission def delete_connection(request: Request, connection_id: str) -> Response: + from backend.errors.validation_exceptions import ConnectionDeleteFailed + from backend.errors.visitran_backend_base_exceptions import VisitranBackendBaseException + con_context = ConnectionContext() - con_context.delete_connection(connection_id=connection_id) + conn_name = connection_id + try: + conn_data = con_context.get_connection(connection_id=connection_id) + conn_name = conn_data.get("name", connection_id) if conn_data else connection_id + con_context.delete_connection(connection_id=connection_id) + except VisitranBackendBaseException: + raise + except Exception as e: + fire_event(ConnectionDeleteFailedEvt(connection_name=conn_name, reason=str(e))) + raise ConnectionDeleteFailed( + connection_name=conn_name, + reason=str(e), + ) + fire_event(ConnectionDeletedEvt(connection_name=conn_name)) response_data = { "status": "success", - "data": f"{connection_id} is deleted successfully.", + "data": f"{conn_name} is deleted successfully.", } return Response(data=response_data, status=status.HTTP_200_OK) @@ -158,4 +180,5 @@ def test_connection(request: Request) -> Response: ) connection_id: str = cast(str, request_data.get("connection_id", "")) or None con_context.test_connection(datasource=datasource, connection_data=connection_data, connection_id=connection_id) + fire_event(ConnectionTested(datasource=datasource, result="success")) return Response(data={"status": "success"}, status=status.HTTP_200_OK) diff --git a/backend/backend/core/routers/environment/views.py b/backend/backend/core/routers/environment/views.py index 87f6c16..baa9a3f 100644 --- a/backend/backend/core/routers/environment/views.py +++ b/backend/backend/core/routers/environment/views.py @@ -11,6 +11,8 @@ from backend.core.utils import handle_http_request from backend.utils.constants import HTTPMethods from rbac.factory import handle_permission +from visitran.events.functions import fire_event +from visitran.events.types import EnvironmentCreated, EnvironmentDeleted RESOURCE_NAME = "environmentmodels" @@ -50,6 +52,9 @@ def create_environment(request) -> Response: env_data: dict[str, Any] = env_context.create_environment( environment_details=request_payload ) + fire_event(EnvironmentCreated( + environment_name=request_payload.get("name", ""), + )) response_data = {"status": "success", "data": env_data} return Response(data=response_data, status=status.HTTP_201_CREATED) @@ -71,34 +76,33 @@ def update_environment(request, environment_id: str) -> Response: @handle_http_request @handle_permission def delete_environment(request: Request, environment_id: str): + from backend.core.models.environment_models import EnvironmentModels + from backend.errors.validation_exceptions import EnvironmentInUse + env_context = EnvironmentContext() + env_name = environment_id + try: + env_obj = EnvironmentModels.objects.get(environment_id=environment_id) + env_name = env_obj.environment_name or environment_id + except EnvironmentModels.DoesNotExist: + pass + try: env_context.delete_environment(environment_id=environment_id) - response_data = {"status": "success"} - return Response(data=response_data, status=status.HTTP_200_OK) except ProtectedError as e: - protected_objects = e.protected_objects - blocked_apps = set() - blocked_data = {} - for obj in protected_objects: - app_name = obj._meta.label.split(".")[ - 0 - ] # Extracts "appname. model_name can also be extracted like _meta.model_name" - if app_name == "job_scheduler": - key = "Deploy" - if key not in blocked_data: - blocked_data[key] = [] - blocked_data[key] = obj.task_name - blocked_apps.add(app_name) - error_details = [] - for model, ids in blocked_data.items(): - error_details.append(f"{ids} from '{model}'") - error_message = f"Cannot delete this environment record because it is referenced by: {', '.join(error_details)}." - data = { - "message": error_message, - "status": "failed", - } - return Response(data=data, status=status.HTTP_400_BAD_REQUEST) + job_names = [ + obj.task_name + for obj in e.protected_objects + if obj._meta.label.split(".")[0] == "job_scheduler" + ] + raise EnvironmentInUse( + environment_name=env_name, + job_names=", ".join(job_names) if job_names else "unknown", + ) + + fire_event(EnvironmentDeleted(environment_name=env_name)) + response_data = {"status": "success"} + return Response(data=response_data, status=status.HTTP_200_OK) @api_view([HTTPMethods.GET]) diff --git a/backend/backend/core/routers/explorer/views.py b/backend/backend/core/routers/explorer/views.py index 1a05e8e..f8869af 100644 --- a/backend/backend/core/routers/explorer/views.py +++ b/backend/backend/core/routers/explorer/views.py @@ -10,6 +10,8 @@ from backend.core.utils import handle_http_request from backend.utils.cache_service.decorators.cache_decorator import clear_cache from backend.utils.constants import HTTPMethods +from visitran.events.functions import fire_event +from visitran.events.types import ModelCreated, FileDeleted, FileRenamed @api_view([HTTPMethods.GET]) @@ -52,6 +54,7 @@ def create_model_explorer(request: Request, project_id: str) -> Response: model_name = request_data.get("model_name", "").replace(" ", "_").strip() app = ApplicationContext(project_id=project_id) app.create_a_model(model_name=model_name, is_generate_ai_request=False) + fire_event(ModelCreated(model_name=model_name)) return Response(data={"status": "success"}, status=status.HTTP_200_OK) except FileExistsError: return Response( @@ -76,6 +79,7 @@ def delete_a_file_or_folder(request: Request, project_id: str) -> Response: app = ApplicationContext(project_id=project_id) if wipe_all_enabled: app.cleanup_no_code_model(table_delete_enabled=table_delete_enabled) + fire_event(FileDeleted(file_names="all models")) response_json = {"status": "success", "message": f"successfully deleted all model files"} else: # Build set of model names being deleted in this batch so that @@ -95,6 +99,7 @@ def delete_a_file_or_folder(request: Request, project_id: str) -> Response: ) deleted_files.append(file_name) + fire_event(FileDeleted(file_names=", ".join(deleted_files))) response_json = {"status": "success", "message": f"successfully deleted files {deleted_files}"} return Response(data=response_json) @@ -109,6 +114,7 @@ def rename_a_file_or_folder(request: Request, project_id: str) -> Response: rename: str = request_data["rename"] app = ApplicationContext(project_id=project_id) refactored_models = app.rename_a_file_or_folder(file_path=file_name, rename=rename) + fire_event(FileRenamed(old_name=file_name, new_name=rename)) response_json = {"status": "success", "refactored_models": refactored_models} return Response(data=response_json) diff --git a/backend/backend/core/routers/transformation/views.py b/backend/backend/core/routers/transformation/views.py index bf8b3f8..b0787de 100644 --- a/backend/backend/core/routers/transformation/views.py +++ b/backend/backend/core/routers/transformation/views.py @@ -10,6 +10,8 @@ from backend.utils.cache_service.decorators.cache_decorator import clear_cache from backend.utils.constants import HTTPMethods from rbac.factory import handle_permission +from visitran.events.functions import fire_event +from visitran.events.types import TransformationApplied, TransformationDeleted, ModelConfigured RESOURCE_NAME = "configmodels" @@ -98,6 +100,14 @@ def set_model_config_and_reference( request_data, model_name=file_name ) response_json["status"] = "success" + model_config = request_data.get("model_config", {}) + src = model_config.get("source", {}) + dest = model_config.get("model", {}) + fire_event(ModelConfigured( + model_name=file_name, + source=f"{src.get('schema_name', '')}.{src.get('table_name', '')}", + destination=f"{dest.get('schema_name', '')}.{dest.get('table_name', '')}", + )) return Response(data=response_json) @@ -116,6 +126,12 @@ def set_model_transformation( request_data, model_name=file_name ) response_json["status"] = "success" + step_config = request_data.get("step_config", {}) + transformation_type = step_config.get("type", "unknown") if isinstance(step_config, dict) else "unknown" + fire_event(TransformationApplied( + model_name=file_name, + transformation_type=transformation_type, + )) return Response(data=response_json) @@ -138,6 +154,10 @@ def delete_model_transformation( is_clear_all=is_clear_all, ) response_json["status"] = "success" + fire_event(TransformationDeleted( + model_name=file_name, + transformation_type="all" if is_clear_all else (transformation_id or "unknown"), + )) return Response(data=response_json) diff --git a/backend/backend/core/scheduler/views.py b/backend/backend/core/scheduler/views.py index c0c341e..aa09f6f 100644 --- a/backend/backend/core/scheduler/views.py +++ b/backend/backend/core/scheduler/views.py @@ -16,6 +16,8 @@ from backend.core.scheduler.serializer import TaskRunHistorySerializer from backend.core.scheduler.task_constant import Task, TaskStatus, TaskType from backend.utils.tenant_context import get_organization +from visitran.events.functions import fire_event +from visitran.events.types import JobCreated, JobUpdated, JobDeleted, JobTriggered logger = logging.getLogger(__name__) @@ -343,6 +345,10 @@ def create_periodic_task(request, project_id): ) periodic_task.save() + fire_event(JobCreated( + job_name=task_name, + environment_name=getattr(environment, "environment_name", ""), + )) return Response( {"status": "Task created successfully"}, status=status.HTTP_201_CREATED, @@ -530,6 +536,7 @@ def update_periodic_task(request, project_id, user_task_id): ) periodic_task.save(update_fields=["kwargs"]) + fire_event(JobUpdated(job_name=user_task.task_name)) return Response( {"status": "Task updated successfully"}, status=status.HTTP_200_OK, @@ -554,11 +561,13 @@ def delete_periodic_task(request, project_id, task_id): user_task = UserTaskDetails.objects.select_related( "periodic_task" ).get(periodic_task_id=task_id, project__project_uuid=project_id) + task_name = user_task.task_name periodic_task = user_task.periodic_task user_task.delete() if periodic_task: periodic_task.delete() + fire_event(JobDeleted(job_name=task_name)) return Response( {"status": "Task deleted successfully"}, status=status.HTTP_200_OK, @@ -630,6 +639,7 @@ def _dispatch_task_run(task, user_id, models_override=None): scheduler path hits ``trigger_scheduled_run`` without this dispatch wrapper, and it keeps the default ``trigger="scheduled"``. """ + scope = models_override[0] if models_override and len(models_override) == 1 else "job" run_kwargs = { "user_task_id": task.id, "user_id": user_id, @@ -649,6 +659,7 @@ def _dispatch_task_run(task, user_id, models_override=None): task.task_run_time = timezone.now() task.save(update_fields=["status", "task_run_time"]) + fire_event(JobTriggered(job_name=task.task_name, scope=scope)) return Response( {"success": True, "data": "Job submitted to Celery broker."}, status=status.HTTP_200_OK, @@ -661,6 +672,7 @@ def _dispatch_task_run(task, user_id, models_override=None): trigger_scheduled_run(**run_kwargs) + fire_event(JobTriggered(job_name=task.task_name, scope=scope)) return Response( {"success": True, "data": "Job executed synchronously (no broker)."}, status=status.HTTP_200_OK, diff --git a/backend/backend/errors/error_codes.py b/backend/backend/errors/error_codes.py index a5075ca..b61f5a1 100644 --- a/backend/backend/errors/error_codes.py +++ b/backend/backend/errors/error_codes.py @@ -87,6 +87,19 @@ class BackendErrorMessages(BaseConstant): "\nPlease delete the projects or **ask for a feature to modify the connections in projects** and retry." ) + CONNECTION_DELETE_FAILED = ( + '### **Connection Delete Failed!**\n' + 'Unable to delete connection **"{connection_name}"**.\n\n' + 'Reason: {reason}' + ) + + ENVIRONMENT_IN_USE = ( + '### **Environment In Use!**\n' + 'Environment **"{environment_name}"** cannot be deleted because it is ' + 'referenced by the following job(s): **{job_names}**.\n\n' + 'Please remove the environment from these jobs first, then delete.' + ) + MODEL_ALREADY_EXISTS = ( '**Model Exists!**\nModel "{model_name}" already created at {created_at}. ' "Choose a unique name or delete the existing one." diff --git a/backend/backend/errors/validation_exceptions.py b/backend/backend/errors/validation_exceptions.py index 2935cec..0dbe17b 100644 --- a/backend/backend/errors/validation_exceptions.py +++ b/backend/backend/errors/validation_exceptions.py @@ -138,3 +138,39 @@ def __init__(self, prohibited_action: str, prohibited_actions: list[str]) -> Non prohibited_action=prohibited_action, prohibited_actions=prohibited_actions, ) + + +class EnvironmentInUse(VisitranBackendBaseException): + """ + Raised when trying to delete an environment that is referenced by scheduled jobs. + """ + + def __init__(self, environment_name: str, job_names: str) -> None: + super().__init__( + error_code=BackendErrorMessages.ENVIRONMENT_IN_USE, + http_status_code=status.HTTP_400_BAD_REQUEST, + environment_name=environment_name, + job_names=job_names, + ) + + @property + def severity(self) -> str: + return "Warning" + + +class ConnectionDeleteFailed(VisitranBackendBaseException): + """ + Raised when a connection cannot be deleted. + """ + + def __init__(self, connection_name: str, reason: str) -> None: + super().__init__( + error_code=BackendErrorMessages.CONNECTION_DELETE_FAILED, + http_status_code=status.HTTP_400_BAD_REQUEST, + connection_name=connection_name, + reason=reason, + ) + + @property + def severity(self) -> str: + return "Warning" diff --git a/backend/visitran/events/base_types.py b/backend/visitran/events/base_types.py index 8dee9eb..dff2c96 100644 --- a/backend/visitran/events/base_types.py +++ b/backend/visitran/events/base_types.py @@ -28,6 +28,9 @@ class BaseEvent: def level_tag(self) -> EventLevel: return EventLevel.DEBUG + def audience(self) -> str: + return "developer" + def message(self) -> str: raise NotImplementedError("message() not implemented for event") @@ -87,6 +90,29 @@ def level_tag(self) -> EventLevel: return EventLevel.ERROR +@dataclass +class UserLevel(BaseEvent): + """User-facing events shown in the activity log (not developer noise).""" + + def level_tag(self) -> EventLevel: + return EventLevel.INFO + + def audience(self) -> str: + return "user" + + def title(self) -> str: + """Clean, short title for the activity feed.""" + return self.message() + + def subtitle(self) -> str: + """Contextual metadata shown below the title.""" + return "" + + def event_status(self) -> str: + """One of: running, success, error, warning, info.""" + return "success" + + class NoFile: """Prevents an event from going to the file.""" diff --git a/backend/visitran/events/eventmgr.py b/backend/visitran/events/eventmgr.py index 52b00d5..3b73fc8 100644 --- a/backend/visitran/events/eventmgr.py +++ b/backend/visitran/events/eventmgr.py @@ -126,10 +126,26 @@ def create_line(self, msg: EventMsg) -> str: def write_line(self, msg: EventMsg) -> None: line = self.create_line(msg) if self._python_logger is not None and self.name == "file_log": - # send_to_logger(self._python_logger, msg.info.level, line) #send logs to file - # Using shared memory to write the logs - # LogHelper.publish(LogHelper.log(line, msg.info.level)) - LogHelper.publish_log(StateStore.get("log_events_id"), LogHelper.log(line, msg.info.level)) + audience = getattr(msg.data, "audience", lambda: "developer")() + if audience == "user": + ts = datetime.utcnow().strftime("%H:%M:%S") + title = getattr(msg.data, "title", lambda: msg.info.msg)() + subtitle = getattr(msg.data, "subtitle", lambda: "")() + event_status = getattr(msg.data, "event_status", lambda: "info")() + event_code = getattr(msg.data, "code", lambda: "")() + payload = { + "level": msg.info.level, + "audience": audience, + "title": title, + "subtitle": subtitle, + "status": event_status, + "code": event_code, + "timestamp": ts, + "message": msg.info.msg, + } + LogHelper.publish_log(StateStore.get("log_events_id"), payload) + else: + LogHelper.publish_log(StateStore.get("log_events_id"), LogHelper.log(line, msg.info.level, audience)) if self._stream is not None and self.name == "stdout_log": self._stream.write(line + "\n") @@ -164,7 +180,7 @@ def create_debug_line(self, msg: EventMsg) -> str: color_ts = self._get_color_tag(msg=msg, invoker="ts") color_msg = self._get_color_tag(msg=msg, invoker="msg") color_level = self._get_color_tag(msg=msg, invoker="level") - log_line += f"{color_ts}{ts} {color_level}[{level:<5}]{color_msg}{self._get_thread_name()} {msg.info.msg}" + log_line += f"{color_ts}{ts} {color_level}[{level:<5}]{color_msg} {msg.info.msg}" return log_line def _get_color_tag(self, msg: EventMsg, invoker: str) -> Any: diff --git a/backend/visitran/events/log_helper.py b/backend/visitran/events/log_helper.py index eff16eb..0af2083 100644 --- a/backend/visitran/events/log_helper.py +++ b/backend/visitran/events/log_helper.py @@ -41,10 +41,12 @@ class LogHelper: def log( message: str, level: str = "INFO", + audience: str = "developer", ) -> dict[str, str]: return { "level": level, "message": message, + "audience": audience, } @staticmethod diff --git a/backend/visitran/events/proto_types.py b/backend/visitran/events/proto_types.py index 81c313d..0935f53 100644 --- a/backend/visitran/events/proto_types.py +++ b/backend/visitran/events/proto_types.py @@ -1033,3 +1033,201 @@ class FailedScheduleJob(betterproto.Message): class FailedScheduleJobMsg(betterproto.Message): info: "EventInfo" = betterproto.message_field(1) data: "FailedScheduleJob" = betterproto.message_field(2) + + +@dataclass +class ModelRunStarted(betterproto.Message): + model_name: str = betterproto.string_field(1) + source_table: str = betterproto.string_field(2) + destination_table: str = betterproto.string_field(3) + materialization: str = betterproto.string_field(4) + +@dataclass +class ModelRunStartedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ModelRunStarted" = betterproto.message_field(2) + +@dataclass +class ModelRunSucceeded(betterproto.Message): + model_name: str = betterproto.string_field(1) + duration_seconds: float = betterproto.float_field(2) + +@dataclass +class ModelRunSucceededMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ModelRunSucceeded" = betterproto.message_field(2) + +@dataclass +class ModelRunFailed(betterproto.Message): + model_name: str = betterproto.string_field(1) + error: str = betterproto.string_field(2) + +@dataclass +class ModelRunFailedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ModelRunFailed" = betterproto.message_field(2) + +@dataclass +class TransformationApplied(betterproto.Message): + model_name: str = betterproto.string_field(1) + transformation_type: str = betterproto.string_field(2) + +@dataclass +class TransformationAppliedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "TransformationApplied" = betterproto.message_field(2) + +@dataclass +class TransformationDeleted(betterproto.Message): + model_name: str = betterproto.string_field(1) + transformation_type: str = betterproto.string_field(2) + +@dataclass +class TransformationDeletedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "TransformationDeleted" = betterproto.message_field(2) + +@dataclass +class ModelConfigured(betterproto.Message): + model_name: str = betterproto.string_field(1) + source: str = betterproto.string_field(2) + destination: str = betterproto.string_field(3) + +@dataclass +class ModelConfiguredMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ModelConfigured" = betterproto.message_field(2) + +@dataclass +class SeedCompleted(betterproto.Message): + seed_name: str = betterproto.string_field(1) + schema_name: str = betterproto.string_field(2) + status: str = betterproto.string_field(3) + +@dataclass +class SeedCompletedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "SeedCompleted" = betterproto.message_field(2) + +@dataclass +class JobCreated(betterproto.Message): + job_name: str = betterproto.string_field(1) + environment_name: str = betterproto.string_field(2) + +@dataclass +class JobCreatedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "JobCreated" = betterproto.message_field(2) + +@dataclass +class JobUpdated(betterproto.Message): + job_name: str = betterproto.string_field(1) + +@dataclass +class JobUpdatedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "JobUpdated" = betterproto.message_field(2) + +@dataclass +class JobDeleted(betterproto.Message): + job_name: str = betterproto.string_field(1) + +@dataclass +class JobDeletedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "JobDeleted" = betterproto.message_field(2) + +@dataclass +class JobTriggered(betterproto.Message): + job_name: str = betterproto.string_field(1) + scope: str = betterproto.string_field(2) + +@dataclass +class JobTriggeredMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "JobTriggered" = betterproto.message_field(2) + +@dataclass +class ModelCreated(betterproto.Message): + model_name: str = betterproto.string_field(1) + +@dataclass +class ModelCreatedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ModelCreated" = betterproto.message_field(2) + +@dataclass +class FileDeleted(betterproto.Message): + file_names: str = betterproto.string_field(1) + +@dataclass +class FileDeletedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "FileDeleted" = betterproto.message_field(2) + +@dataclass +class FileRenamed(betterproto.Message): + old_name: str = betterproto.string_field(1) + new_name: str = betterproto.string_field(2) + +@dataclass +class FileRenamedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "FileRenamed" = betterproto.message_field(2) + +@dataclass +class ConnectionCreated(betterproto.Message): + connection_name: str = betterproto.string_field(1) + datasource: str = betterproto.string_field(2) + +@dataclass +class ConnectionCreatedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ConnectionCreated" = betterproto.message_field(2) + +@dataclass +class ConnectionTested(betterproto.Message): + datasource: str = betterproto.string_field(1) + result: str = betterproto.string_field(2) + +@dataclass +class ConnectionTestedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ConnectionTested" = betterproto.message_field(2) + +@dataclass +class ConnectionDeletedEvt(betterproto.Message): + connection_name: str = betterproto.string_field(1) + +@dataclass +class ConnectionDeletedEvtMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ConnectionDeletedEvt" = betterproto.message_field(2) + +@dataclass +class EnvironmentCreated(betterproto.Message): + environment_name: str = betterproto.string_field(1) + +@dataclass +class EnvironmentCreatedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "EnvironmentCreated" = betterproto.message_field(2) + +@dataclass +class EnvironmentDeleted(betterproto.Message): + environment_name: str = betterproto.string_field(1) + +@dataclass +class EnvironmentDeletedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "EnvironmentDeleted" = betterproto.message_field(2) + +@dataclass +class ConnectionDeleteFailed(betterproto.Message): + connection_name: str = betterproto.string_field(1) + reason: str = betterproto.string_field(2) + +@dataclass +class ConnectionDeleteFailedMsg(betterproto.Message): + info: "EventInfo" = betterproto.message_field(1) + data: "ConnectionDeleteFailed" = betterproto.message_field(2) diff --git a/backend/visitran/events/types.py b/backend/visitran/events/types.py index 104dd9a..edabc96 100644 --- a/backend/visitran/events/types.py +++ b/backend/visitran/events/types.py @@ -3,7 +3,7 @@ from dataclasses import dataclass import visitran.events.proto_types as proto_type -from visitran.events.base_types import DebugLevel, ErrorLevel, InfoLevel, WarnLevel +from visitran.events.base_types import DebugLevel, ErrorLevel, EventLevel, InfoLevel, UserLevel, WarnLevel # # | Code | Description | @@ -820,3 +820,333 @@ def code(self) -> str: def message(self) -> str: return f"{self.task_type} Task update failed, task_ID: {self.task_id}. task details: {self.cron_data}" + + +# ─── User-facing activity events ───────────────────────────────────── +# These inherit from UserLevel so they carry audience="user" and are +# shown in the frontend's "User activity" log view by default. + +@dataclass +class ModelRunStarted(UserLevel, proto_type.ModelRunStarted): + def code(self) -> str: + return "U001" + + def message(self) -> str: + mat = f" as {self.materialization}" if self.materialization else "" + return f'Building model "{self.model_name}" → {self.destination_table}{mat} from "{self.source_table}"' + + def title(self) -> str: + return f'Building model "{self.model_name}"' + + def subtitle(self) -> str: + mat = self.materialization.upper() if self.materialization else "" + parts = [] + if mat: + parts.append(mat) + parts.append(f"{self.source_table} → {self.destination_table}") + return " · ".join(parts) + + def event_status(self) -> str: + return "running" + + +@dataclass +class ModelRunSucceeded(UserLevel, proto_type.ModelRunSucceeded): + def code(self) -> str: + return "U002" + + def message(self) -> str: + dur = f" in {self.duration_seconds:.1f}s" if self.duration_seconds else "" + return f'Model "{self.model_name}" built successfully{dur}' + + def title(self) -> str: + return f'Model "{self.model_name}" built successfully' + + def subtitle(self) -> str: + return f"{self.duration_seconds:.1f}s elapsed" if self.duration_seconds else "" + + +@dataclass +class ModelRunFailed(UserLevel, proto_type.ModelRunFailed): + def code(self) -> str: + return "U003" + + def message(self) -> str: + short_err = (self.error[:120] + "…") if len(self.error) > 120 else self.error + return f'Model "{self.model_name}" failed: {short_err}' + + def title(self) -> str: + return f'Model "{self.model_name}" failed' + + def subtitle(self) -> str: + return (self.error[:150] + "…") if len(self.error) > 150 else self.error + + def event_status(self) -> str: + return "error" + + def level_tag(self) -> EventLevel: + return EventLevel.ERROR + + +@dataclass +class TransformationApplied(UserLevel, proto_type.TransformationApplied): + def code(self) -> str: + return "U004" + + def message(self) -> str: + return f'Applied {self.transformation_type} transformation on "{self.model_name}"' + + def title(self) -> str: + return f'Applied {self.transformation_type} transformation on "{self.model_name}"' + + +@dataclass +class TransformationDeleted(UserLevel, proto_type.TransformationDeleted): + def code(self) -> str: + return "U005" + + def message(self) -> str: + return f'Removed {self.transformation_type} transformation from "{self.model_name}"' + + def title(self) -> str: + return f'Removed {self.transformation_type} transformation from "{self.model_name}"' + + +@dataclass +class ModelConfigured(UserLevel, proto_type.ModelConfigured): + def code(self) -> str: + return "U006" + + def message(self) -> str: + return f'Configured "{self.model_name}" — source: {self.source}, destination: {self.destination}' + + def title(self) -> str: + return f'Configured model "{self.model_name}"' + + def subtitle(self) -> str: + return f"{self.source} → {self.destination}" + + +@dataclass +class SeedCompleted(UserLevel, proto_type.SeedCompleted): + def code(self) -> str: + return "U007" + + def message(self) -> str: + if self.status == "Success": + return f'Seed "{self.seed_name}" loaded into "{self.schema_name}"' + return f'Seed "{self.seed_name}" failed in "{self.schema_name}"' + + def title(self) -> str: + if self.status == "Success": + return f'Seed "{self.seed_name}" loaded successfully' + return f'Seed "{self.seed_name}" failed' + + def subtitle(self) -> str: + return f"Schema: {self.schema_name}" + + def event_status(self) -> str: + return "success" if self.status == "Success" else "error" + + +@dataclass +class JobCreated(UserLevel, proto_type.JobCreated): + def code(self) -> str: + return "U008" + + def message(self) -> str: + return f'Job "{self.job_name}" created for environment "{self.environment_name}"' + + def title(self) -> str: + return f'Job "{self.job_name}" created' + + def subtitle(self) -> str: + return f"Environment: {self.environment_name}" + + +@dataclass +class JobUpdated(UserLevel, proto_type.JobUpdated): + def code(self) -> str: + return "U009" + + def message(self) -> str: + return f'Job "{self.job_name}" updated' + + def title(self) -> str: + return f'Job "{self.job_name}" updated' + + +@dataclass +class JobDeleted(UserLevel, proto_type.JobDeleted): + def code(self) -> str: + return "U010" + + def message(self) -> str: + return f'Job "{self.job_name}" deleted' + + def title(self) -> str: + return f'Job "{self.job_name}" deleted' + + def event_status(self) -> str: + return "warning" + + +@dataclass +class JobTriggered(UserLevel, proto_type.JobTriggered): + def code(self) -> str: + return "U011" + + def message(self) -> str: + scope_desc = "all models" if self.scope == "job" else f"model {self.scope}" + return f'Job "{self.job_name}" triggered manually — running {scope_desc}' + + def title(self) -> str: + return f'Job "{self.job_name}" triggered' + + def subtitle(self) -> str: + scope_desc = "All models" if self.scope == "job" else f"Model: {self.scope}" + return f"{scope_desc} · Manual trigger" + + def event_status(self) -> str: + return "running" + + +# ─── P3: Model/project CRUD ────────────────────────────────────────── + +@dataclass +class ModelCreated(UserLevel, proto_type.ModelCreated): + def code(self) -> str: + return "U012" + + def message(self) -> str: + return f'Model "{self.model_name}" created' + + def title(self) -> str: + return f'Model "{self.model_name}" created' + + +@dataclass +class FileDeleted(UserLevel, proto_type.FileDeleted): + def code(self) -> str: + return "U013" + + def message(self) -> str: + return f'Deleted: {self.file_names}' + + def title(self) -> str: + return f'Deleted: {self.file_names}' + + def event_status(self) -> str: + return "warning" + + +@dataclass +class FileRenamed(UserLevel, proto_type.FileRenamed): + def code(self) -> str: + return "U014" + + def message(self) -> str: + return f'Renamed "{self.old_name}" → "{self.new_name}"' + + def title(self) -> str: + return f'File renamed' + + def subtitle(self) -> str: + return f'"{self.old_name}" → "{self.new_name}"' + + +# ─── P4: Connection & environment ──────────────────────────────────── + +@dataclass +class ConnectionCreated(UserLevel, proto_type.ConnectionCreated): + def code(self) -> str: + return "U015" + + def message(self) -> str: + return f'Connection created ({self.datasource})' + + def title(self) -> str: + return "Connection created" + + def subtitle(self) -> str: + return f"Datasource: {self.datasource}" + + +@dataclass +class ConnectionTested(UserLevel, proto_type.ConnectionTested): + def code(self) -> str: + return "U016" + + def message(self) -> str: + return f'Connection test: {self.result} ({self.datasource})' + + def title(self) -> str: + status = "passed" if self.result == "success" else self.result + return f"Connection test {status}" + + def subtitle(self) -> str: + return f"Datasource: {self.datasource}" + + +@dataclass +class ConnectionDeletedEvt(UserLevel, proto_type.ConnectionDeletedEvt): + def code(self) -> str: + return "U017" + + def message(self) -> str: + return f'Connection "{self.connection_name}" deleted' + + def title(self) -> str: + return f'Connection "{self.connection_name}" deleted' + + def event_status(self) -> str: + return "warning" + + +@dataclass +class ConnectionDeleteFailedEvt(UserLevel, proto_type.ConnectionDeleteFailed): + def code(self) -> str: + return "U020" + + def level_tag(self) -> EventLevel: + return EventLevel.ERROR + + def message(self) -> str: + short = (self.reason[:120] + "…") if len(self.reason) > 120 else self.reason + return f'Failed to delete connection "{self.connection_name}": {short}' + + def title(self) -> str: + return f'Failed to delete connection "{self.connection_name}"' + + def subtitle(self) -> str: + return (self.reason[:150] + "…") if len(self.reason) > 150 else self.reason + + def event_status(self) -> str: + return "error" + + +@dataclass +class EnvironmentCreated(UserLevel, proto_type.EnvironmentCreated): + def code(self) -> str: + return "U018" + + def message(self) -> str: + return f'Environment "{self.environment_name}" created' + + def title(self) -> str: + return f'Environment "{self.environment_name}" created' + + +@dataclass +class EnvironmentDeleted(UserLevel, proto_type.EnvironmentDeleted): + def code(self) -> str: + return "U019" + + def message(self) -> str: + return f'Environment "{self.environment_name}" deleted' + + def title(self) -> str: + return f'Environment "{self.environment_name}" deleted' + + def event_status(self) -> str: + return "warning" diff --git a/backend/visitran/visitran.py b/backend/visitran/visitran.py index a5e49bb..4ccee4c 100644 --- a/backend/visitran/visitran.py +++ b/backend/visitran/visitran.py @@ -75,6 +75,10 @@ def now(): SortedDAGNodes, TestExecutionCompleted, TestExecutionFailed, + ModelRunStarted, + ModelRunSucceeded, + ModelRunFailed, + SeedCompleted, ) from visitran.materialization import Materialization from visitran.singleton import Singleton @@ -313,6 +317,16 @@ def execute_graph(self) -> None: if not is_executable: continue + _model_display = getattr(node, "destination_table_name", "") or str(node_name) + _mat = node.materialization.name if hasattr(node.materialization, "name") else str(node.materialization) + fire_event( + ModelRunStarted( + model_name=_model_display, + source_table=f"{node.source_schema_name}.{node.source_table_name}", + destination_table=f"{node.destination_schema_name}.{node.destination_table_name}", + materialization=_mat, + ) + ) fire_event( ExecutingModelNode( database=node.database, @@ -333,10 +347,17 @@ def execute_graph(self) -> None: self.db_adapter.db_connection.create_schema(node.destination_schema_name) # create if not exists self.db_adapter.run_model(visitran_model=node) + _elapsed = time.monotonic() - start_time + fire_event( + ModelRunSucceeded( + model_name=_model_display, + duration_seconds=round(_elapsed, 2), + ) + ) self._update_model_status( str(node_name), ConfigModels.RunStatus.SUCCESS, - run_duration=time.monotonic() - start_time, + run_duration=_elapsed, ) base_result = BaseResult( @@ -351,6 +372,12 @@ def execute_graph(self) -> None: sequence_number += 1 BASE_RESULT.append(base_result) except VisitranBaseExceptions as visitran_err: + fire_event( + ModelRunFailed( + model_name=getattr(node, "destination_table_name", "") or str(node_name), + error=str(visitran_err), + ) + ) self._update_model_status( str(node_name), ConfigModels.RunStatus.FAILED, @@ -362,7 +389,12 @@ def execute_graph(self) -> None: dest_table = node.destination_table_name sch_name = node.destination_schema_name err_trace = repr(err) - + fire_event( + ModelRunFailed( + model_name=dest_table or str(node_name), + error=err_trace, + ) + ) self._update_model_status( str(node_name), ConfigModels.RunStatus.FAILED, @@ -820,6 +852,9 @@ def validate_and_run_seed(self, seed_file, session_id: str) -> dict[str, str]: ) SEED_RESULT.append(seed_result) parse_and_fire_seed_report() + fire_event(SeedCompleted( + seed_name=file_name, schema_name=schema, status="Success", + )) return { "file_name": file_name, "status": "Success", @@ -829,6 +864,11 @@ def validate_and_run_seed(self, seed_file, session_id: str) -> dict[str, str]: except Exception as err: # Catches all kind of errors and raises with custom exceptions for seeds fire_event(SeedExecutionError(file_name, str(err))) + fire_event(SeedCompleted( + seed_name=file_name, + schema_name=getattr(self.context, "schema_name", "") or "", + status="Failed", + )) logging.exception(f"validate and run seed failed with exception {err}") raise RunSeedFailedException(file_name=file_name, error_message=str(err)) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 422ca4b..720ee86 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -54,7 +54,7 @@ services: container_name: visitran-celery-worker restart: unless-stopped entrypoint: .venv/bin/celery - command: "-A backend worker --loglevel=info -Q celery --concurrency=2" + command: "-A backend worker --loglevel=info -Q celery,celery_log_task_queue --concurrency=2" env_file: - ../backend/.env volumes: diff --git a/frontend/src/base/components/environment/CreateConnection.jsx b/frontend/src/base/components/environment/CreateConnection.jsx index 81ff920..7fecee9 100644 --- a/frontend/src/base/components/environment/CreateConnection.jsx +++ b/frontend/src/base/components/environment/CreateConnection.jsx @@ -125,6 +125,14 @@ const CreateConnection = ({ getConnectionFields(); }, [getConnectionFields]); + const hasDetailsChanged = useMemo(() => { + if (!connectionId || !originalDbSelectionInfo) return false; + return ( + formName !== originalDbSelectionInfo.name || + formDescription !== originalDbSelectionInfo.description + ); + }, [connectionId, formName, formDescription, originalDbSelectionInfo]); + const handleCreateOrUpdate = useCallback(async () => { setIsCreateOrUpdateLoading(true); try { @@ -407,15 +415,6 @@ const CreateConnection = ({ [dbSelectionInfo.datasource_name, selectedOrgId, csrfToken, connType] ); - // Detect if connection name or description changed (metadata-only changes) - const hasDetailsChanged = useMemo(() => { - if (!connectionId || !originalDbSelectionInfo) return false; - return ( - formName !== originalDbSelectionInfo.name || - formDescription !== originalDbSelectionInfo.description - ); - }, [connectionId, formName, formDescription, originalDbSelectionInfo]); - const mappedDataSources = useMemo( () => dataSources.map((dSource) => ({ diff --git a/frontend/src/ide/editor/no-code-model/no-code-model.jsx b/frontend/src/ide/editor/no-code-model/no-code-model.jsx index fe7aa9e..061c519 100644 --- a/frontend/src/ide/editor/no-code-model/no-code-model.jsx +++ b/frontend/src/ide/editor/no-code-model/no-code-model.jsx @@ -230,7 +230,7 @@ function NoCodeModel({ nodeData }) { const [reactFlowInstance, setReactFlowInstance] = useState(null); const [logsInfo, setLogsInfo] = useState([]); const [logsLevel, setLogsLevel] = useState( - () => localStorage.getItem("visitran.logsLevel") || "info" + () => localStorage.getItem("visitran.logsLevel") || "user" ); const [reveal, setReveal] = useState(false); const [seqOrder, setSeqOrder] = useState({}); @@ -415,10 +415,12 @@ function NoCodeModel({ nodeData }) { const filteredLogs = useMemo( () => - logsInfo.filter( - (entry) => + logsInfo.filter((entry) => { + if (logsLevel === "user") return entry.audience === "user"; + return ( (LOG_LEVEL_RANK[entry.level] ?? 1) >= (LOG_LEVEL_RANK[logsLevel] ?? 1) - ), + ); + }), [logsInfo, logsLevel] ); const handleLogsLevelChange = (value) => { @@ -716,8 +718,9 @@ function NoCodeModel({ nodeData }) { size="small" value={logsLevel} onChange={handleLogsLevelChange} - style={{ width: 140 }} + style={{ width: 160 }} options={[ + { value: "user", label: "User activity" }, { value: "debug", label: "All logs" }, { value: "info", label: "Info & above" }, { value: "warn", label: "Warnings & errors" }, @@ -731,13 +734,91 @@ function NoCodeModel({ nodeData }) { height: `calc(${bottomSectionRef.current.height} - 100px)`, }} > - {filteredLogs.map((el, index) => ( -
- ))} + {filteredLogs.map((el, index) => + el.audience === "user" ? ( +