diff --git a/data-machine.php b/data-machine.php index bd1faf68..a88f0845 100644 --- a/data-machine.php +++ b/data-machine.php @@ -345,9 +345,11 @@ function datamachine_activate_plugin( $network_wide = false ) { function datamachine_activate_for_site() { $db_pipelines = new \DataMachine\Core\Database\Pipelines\Pipelines(); $db_pipelines->create_table(); + $db_pipelines->migrate_columns(); $db_flows = new \DataMachine\Core\Database\Flows\Flows(); $db_flows->create_table(); + $db_flows->migrate_columns(); $db_jobs = new \DataMachine\Core\Database\Jobs\Jobs(); $db_jobs->create_table(); diff --git a/inc/Core/Database/Flows/Flows.php b/inc/Core/Database/Flows/Flows.php index 1ffc2596..d4c07159 100644 --- a/inc/Core/Database/Flows/Flows.php +++ b/inc/Core/Database/Flows/Flows.php @@ -26,11 +26,13 @@ public static function create_table(): void { $sql = "CREATE TABLE $table_name ( flow_id bigint(20) unsigned NOT NULL AUTO_INCREMENT, pipeline_id bigint(20) unsigned NOT NULL, + user_id bigint(20) unsigned NOT NULL DEFAULT 0, flow_name varchar(255) NOT NULL, flow_config longtext NOT NULL, scheduling_config longtext NOT NULL, PRIMARY KEY (flow_id), - KEY pipeline_id (pipeline_id) + KEY pipeline_id (pipeline_id), + KEY user_id (user_id) ) $charset_collate;"; require_once ABSPATH . 'wp-admin/includes/upgrade.php'; @@ -48,6 +50,57 @@ public static function create_table(): void { ); } + /** + * Migrate existing table columns to current schema. + * + * Handles: + * - user_id column: added for multi-agent support + * + * Safe to run multiple times - only executes if columns need updating. + */ + public function migrate_columns(): void { + // Check if user_id column already exists. + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery + $column = $this->wpdb->get_var( + $this->wpdb->prepare( + "SELECT COLUMN_NAME + FROM information_schema.COLUMNS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND COLUMN_NAME = 'user_id'", + DB_NAME, + $this->table_name + ) + ); + + if ( null === $column ) { + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.SchemaChange + $result = $this->wpdb->query( + "ALTER TABLE {$this->table_name} + ADD COLUMN user_id bigint(20) unsigned NOT NULL DEFAULT 0 AFTER pipeline_id, + ADD KEY user_id (user_id)" + ); + + if ( false === $result ) { + do_action( + 'datamachine_log', + 'error', + 'Failed to add user_id column to flows table', + array( + 'table_name' => $this->table_name, + 'db_error' => $this->wpdb->last_error, + ) + ); + return; + } + + do_action( + 'datamachine_log', + 'info', + 'Added user_id column to flows table for multi-agent support', + array( 'table_name' => $this->table_name ) + ); + } + } + public function create_flow( array $flow_data ) { // Validate required fields @@ -70,8 +123,11 @@ public function create_flow( array $flow_data ) { $flow_config = wp_json_encode( $flow_data['flow_config'] ); $scheduling_config = wp_json_encode( $flow_data['scheduling_config'] ); + $user_id = isset( $flow_data['user_id'] ) ? absint( $flow_data['user_id'] ) : 0; + $insert_data = array( 'pipeline_id' => intval( $flow_data['pipeline_id'] ), + 'user_id' => $user_id, 'flow_name' => sanitize_text_field( $flow_data['flow_name'] ), 'flow_config' => $flow_config, 'scheduling_config' => $scheduling_config, @@ -79,6 +135,7 @@ public function create_flow( array $flow_data ) { $insert_format = array( '%d', // pipeline_id + '%d', // user_id '%s', // flow_name '%s', // flow_config '%s', // scheduling_config @@ -197,14 +254,23 @@ public function get_flows_for_pipeline( int $pipeline_id ): array { * * Used for global operations like handler-based filtering across the entire system. * + * @param int|null $user_id Optional user ID to filter by. * @return array All flows with decoded configs. */ - public function get_all_flows(): array { - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $flows = $this->wpdb->get_results( - $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_id ASC, flow_id ASC', $this->table_name ), - ARRAY_A - ); + public function get_all_flows( ?int $user_id = null ): array { + if ( null !== $user_id ) { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $flows = $this->wpdb->get_results( + $this->wpdb->prepare( 'SELECT * FROM %i WHERE user_id = %d ORDER BY pipeline_id ASC, flow_id ASC', $this->table_name, $user_id ), + ARRAY_A + ); + } else { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $flows = $this->wpdb->get_results( + $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_id ASC, flow_id ASC', $this->table_name ), + ARRAY_A + ); + } if ( null === $flows ) { return array(); @@ -522,6 +588,9 @@ public function get_flow_scheduling( int $flow_id ): ?array { * Get flows ready for execution based on scheduling. * * Uses jobs table to determine last run time (single source of truth). + * + * Note: No user_id filter here — the scheduler must run ALL users' flows. + * User-scoping happens at the pipeline/flow management level, not execution. */ public function get_flows_ready_for_execution(): array { diff --git a/inc/Core/Database/Jobs/Jobs.php b/inc/Core/Database/Jobs/Jobs.php index 3f458074..6fd075cb 100644 --- a/inc/Core/Database/Jobs/Jobs.php +++ b/inc/Core/Database/Jobs/Jobs.php @@ -127,6 +127,7 @@ public static function create_table() { // status is VARCHAR(255) to support compound statuses with reasons $sql = "CREATE TABLE $table_name ( job_id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + user_id bigint(20) unsigned NOT NULL DEFAULT 0, pipeline_id varchar(20) NOT NULL, flow_id varchar(20) NOT NULL, source varchar(50) NOT NULL DEFAULT 'pipeline', @@ -141,7 +142,8 @@ public static function create_table() { KEY pipeline_id (pipeline_id), KEY flow_id (flow_id), KEY source (source), - KEY parent_job_id (parent_job_id) + KEY parent_job_id (parent_job_id), + KEY user_id (user_id) ) $charset_collate;"; dbDelta( $sql ); @@ -179,7 +181,7 @@ private static function migrate_columns( string $table_name ): void { "SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s - AND COLUMN_NAME IN ('status', 'pipeline_id', 'flow_id', 'source', 'parent_job_id')", + AND COLUMN_NAME IN ('status', 'pipeline_id', 'flow_id', 'source', 'parent_job_id', 'user_id')", DB_NAME, $table_name ), @@ -286,5 +288,24 @@ private static function migrate_columns( string $table_name ): void { ); } } + + // Add user_id column for multi-agent support. + if ( ! isset( $columns['user_id'] ) ) { + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.SchemaChange + $result = $wpdb->query( + "ALTER TABLE {$table_name} + ADD COLUMN user_id bigint(20) unsigned NOT NULL DEFAULT 0 AFTER job_id, + ADD KEY user_id (user_id)" + ); + + if ( false !== $result ) { + do_action( + 'datamachine_log', + 'info', + 'Added user_id column to jobs table for multi-agent support', + array( 'table_name' => $table_name ) + ); + } + } } } diff --git a/inc/Core/Database/Jobs/JobsOperations.php b/inc/Core/Database/Jobs/JobsOperations.php index 99121467..d3767540 100644 --- a/inc/Core/Database/Jobs/JobsOperations.php +++ b/inc/Core/Database/Jobs/JobsOperations.php @@ -62,16 +62,18 @@ public function create_job( array $job_data ): int|false { $label = isset( $job_data['label'] ) ? sanitize_text_field( $job_data['label'] ) : null; $parent_job_id = isset( $job_data['parent_job_id'] ) ? absint( $job_data['parent_job_id'] ) : 0; + $user_id = isset( $job_data['user_id'] ) ? absint( $job_data['user_id'] ) : 0; $data = array( 'pipeline_id' => $pipeline_id, 'flow_id' => $flow_id, + 'user_id' => $user_id, 'source' => $source, 'label' => $label, 'status' => 'pending', ); - $format = array( '%s', '%s', '%s', '%s', '%s' ); + $format = array( '%s', '%s', '%d', '%s', '%s', '%s' ); if ( $parent_job_id > 0 ) { $data['parent_job_id'] = $parent_job_id; @@ -153,6 +155,11 @@ public function get_jobs_count( array $args = array() ): int { $where_values[] = sanitize_text_field( $args['source'] ); } + if ( isset( $args['user_id'] ) ) { + $where_clauses[] = 'user_id = %d'; + $where_values[] = absint( $args['user_id'] ); + } + if ( ! empty( $args['since'] ) ) { $where_clauses[] = 'created_at >= %s'; $where_values[] = sanitize_text_field( $args['since'] ); @@ -244,6 +251,11 @@ public function get_jobs_for_list_table( array $args ): array { $where_values[] = sanitize_text_field( $args['source'] ); } + if ( isset( $args['user_id'] ) ) { + $where_clauses[] = 'j.user_id = %d'; + $where_values[] = absint( $args['user_id'] ); + } + if ( ! empty( $args['since'] ) ) { $where_clauses[] = 'j.created_at >= %s'; $where_values[] = sanitize_text_field( $args['since'] ); diff --git a/inc/Core/Database/Pipelines/Pipelines.php b/inc/Core/Database/Pipelines/Pipelines.php index 9d59bdee..29a52278 100644 --- a/inc/Core/Database/Pipelines/Pipelines.php +++ b/inc/Core/Database/Pipelines/Pipelines.php @@ -39,15 +39,17 @@ public function create_pipeline( array $pipeline_data ): int|false { $pipeline_name = sanitize_text_field( $pipeline_data['pipeline_name'] ); $pipeline_config = $pipeline_data['pipeline_config'] ?? array(); $pipeline_config_json = wp_json_encode( $pipeline_config ); + $user_id = isset( $pipeline_data['user_id'] ) ? absint( $pipeline_data['user_id'] ) : 0; $data = array( + 'user_id' => $user_id, 'pipeline_name' => $pipeline_name, 'pipeline_config' => $pipeline_config_json, 'created_at' => current_time( 'mysql', true ), 'updated_at' => current_time( 'mysql', true ), ); - $format = array( '%s', '%s', '%s', '%s' ); + $format = array( '%d', '%s', '%s', '%s', '%s' ); // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching $inserted = $this->wpdb->insert( $this->table_name, $data, $format ); @@ -104,11 +106,17 @@ public function get_pipeline( int $pipeline_id ): ?array { /** * Get all pipelines from the database. * + * @param int|null $user_id Optional user ID to filter by. * @return array Array of all pipeline records */ - public function get_all_pipelines(): array { - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY updated_at DESC', $this->table_name ), ARRAY_A ); + public function get_all_pipelines( ?int $user_id = null ): array { + if ( null !== $user_id ) { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i WHERE user_id = %d ORDER BY updated_at DESC', $this->table_name, $user_id ), ARRAY_A ); + } else { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY updated_at DESC', $this->table_name ), ARRAY_A ); + } foreach ( $results as &$pipeline ) { if ( ! empty( $pipeline['pipeline_config'] ) ) { @@ -121,10 +129,17 @@ public function get_all_pipelines(): array { /** * Get lightweight pipelines list for UI dropdowns. + * + * @param int|null $user_id Optional user ID to filter by. */ - public function get_pipelines_list(): array { - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT pipeline_id, pipeline_name FROM %i ORDER BY pipeline_name ASC', $this->table_name ), ARRAY_A ); + public function get_pipelines_list( ?int $user_id = null ): array { + if ( null !== $user_id ) { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT pipeline_id, pipeline_name FROM %i WHERE user_id = %d ORDER BY pipeline_name ASC', $this->table_name, $user_id ), ARRAY_A ); + } else { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT pipeline_id, pipeline_name FROM %i ORDER BY pipeline_name ASC', $this->table_name ), ARRAY_A ); + } return $results ? $results : array(); } @@ -324,10 +339,17 @@ public function get_pipeline_config( int $pipeline_id ): array { /** * Get pipeline count. + * + * @param int|null $user_id Optional user ID to filter by. */ - public function get_pipelines_count(): int { - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $count = $this->wpdb->get_var( $this->wpdb->prepare( 'SELECT COUNT(pipeline_id) FROM %i', $this->table_name ) ); + public function get_pipelines_count( ?int $user_id = null ): int { + if ( null !== $user_id ) { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $count = $this->wpdb->get_var( $this->wpdb->prepare( 'SELECT COUNT(pipeline_id) FROM %i WHERE user_id = %d', $this->table_name, $user_id ) ); + } else { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $count = $this->wpdb->get_var( $this->wpdb->prepare( 'SELECT COUNT(pipeline_id) FROM %i', $this->table_name ) ); + } return (int) $count; } @@ -341,22 +363,28 @@ public function get_pipelines_for_list_table( array $args ): array { $order = strtoupper( $args['order'] ?? 'DESC' ); $per_page = (int) ( $args['per_page'] ?? 20 ); $offset = (int) ( $args['offset'] ?? 0 ); + $user_id = isset( $args['user_id'] ) ? absint( $args['user_id'] ) : null; $is_asc = ( 'ASC' === $order ); + $where = ''; + if ( null !== $user_id ) { + $where = $this->wpdb->prepare( ' WHERE user_id = %d', $user_id ); + } + // phpcs:disable WordPress.DB.DirectDatabaseQuery,WordPress.DB.PreparedSQL $results = match ( $orderby ) { 'pipeline_name' => $is_asc - ? $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_name ASC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ) - : $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_name DESC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ), + ? $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY pipeline_name ASC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ) + : $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY pipeline_name DESC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ), 'created_at' => $is_asc - ? $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY created_at ASC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ) - : $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY created_at DESC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ), + ? $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY created_at ASC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ) + : $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY created_at DESC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ), 'updated_at' => $is_asc - ? $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY updated_at ASC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ) - : $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY updated_at DESC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ), + ? $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY updated_at ASC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ) + : $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY updated_at DESC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ), default => $is_asc - ? $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_id ASC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ) - : $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_id DESC LIMIT %d OFFSET %d', $this->table_name, $per_page, $offset ), ARRAY_A ), + ? $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY pipeline_id ASC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ) + : $this->wpdb->get_results( $this->wpdb->prepare( "SELECT * FROM %i{$where} ORDER BY pipeline_id DESC LIMIT %d OFFSET %d", $this->table_name, $per_page, $offset ), ARRAY_A ), }; // phpcs:enable WordPress.DB.DirectDatabaseQuery,WordPress.DB.PreparedSQL @@ -470,6 +498,57 @@ public function get_pipeline_step_config( string $pipeline_step_id ): array { return $step_config; } + /** + * Migrate existing table columns to current schema. + * + * Handles: + * - user_id column: added for multi-agent support + * + * Safe to run multiple times - only executes if columns need updating. + */ + public function migrate_columns(): void { + // Check if user_id column already exists. + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery + $column = $this->wpdb->get_var( + $this->wpdb->prepare( + "SELECT COLUMN_NAME + FROM information_schema.COLUMNS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND COLUMN_NAME = 'user_id'", + DB_NAME, + $this->table_name + ) + ); + + if ( null === $column ) { + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.SchemaChange + $result = $this->wpdb->query( + "ALTER TABLE {$this->table_name} + ADD COLUMN user_id bigint(20) unsigned NOT NULL DEFAULT 0 AFTER pipeline_id, + ADD KEY user_id (user_id)" + ); + + if ( false === $result ) { + do_action( + 'datamachine_log', + 'error', + 'Failed to add user_id column to pipelines table', + array( + 'table_name' => $this->table_name, + 'db_error' => $this->wpdb->last_error, + ) + ); + return; + } + + do_action( + 'datamachine_log', + 'info', + 'Added user_id column to pipelines table for multi-agent support', + array( 'table_name' => $this->table_name ) + ); + } + } + public static function create_table() { global $wpdb; $table_name = $wpdb->prefix . 'datamachine_pipelines'; @@ -480,11 +559,13 @@ public static function create_table() { $sql = "CREATE TABLE $table_name ( pipeline_id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + user_id bigint(20) unsigned NOT NULL DEFAULT 0, pipeline_name varchar(255) NOT NULL, pipeline_config longtext NULL, created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (pipeline_id), + KEY user_id (user_id), KEY pipeline_name (pipeline_name), KEY created_at (created_at), KEY updated_at (updated_at) diff --git a/inc/Core/FilesRepository/AgentMemory.php b/inc/Core/FilesRepository/AgentMemory.php index 29fe5bf4..e2584efe 100644 --- a/inc/Core/FilesRepository/AgentMemory.php +++ b/inc/Core/FilesRepository/AgentMemory.php @@ -35,9 +35,23 @@ class AgentMemory { */ private string $file_path; - public function __construct() { + /** + * WordPress user ID for per-agent partitioning. 0 = legacy shared directory. + * + * @since 0.37.0 + * @var int + */ + private int $user_id; + + /** + * @since 0.37.0 Added $user_id parameter for multi-agent partitioning. + * + * @param int $user_id WordPress user ID. 0 = legacy shared directory. + */ + public function __construct( int $user_id = 0 ) { + $this->user_id = $user_id; $this->directory_manager = new DirectoryManager(); - $agent_dir = $this->directory_manager->get_agent_directory(); + $agent_dir = $this->directory_manager->get_agent_directory( $user_id ); $this->file_path = "{$agent_dir}/MEMORY.md"; // Self-heal: ensure agent files exist on first use. @@ -375,7 +389,7 @@ private function replace_section_content( string $file_content, array $position, * so a recreated MEMORY.md includes the standard sections. */ private function ensure_file_exists(): void { - $this->directory_manager->ensure_agent_directory_writable(); + $this->directory_manager->ensure_agent_directory_writable( $this->user_id ); if ( ! file_exists( $this->file_path ) ) { $content = "# Agent Memory\n"; diff --git a/inc/Core/FilesRepository/DailyMemory.php b/inc/Core/FilesRepository/DailyMemory.php index 031b313f..aef96e22 100644 --- a/inc/Core/FilesRepository/DailyMemory.php +++ b/inc/Core/FilesRepository/DailyMemory.php @@ -31,9 +31,14 @@ class DailyMemory { */ private string $base_path; - public function __construct() { + /** + * @since 0.37.0 Added $user_id parameter for multi-agent partitioning. + * + * @param int $user_id WordPress user ID. 0 = legacy shared directory. + */ + public function __construct( int $user_id = 0 ) { $this->directory_manager = new DirectoryManager(); - $agent_dir = $this->directory_manager->get_agent_directory(); + $agent_dir = $this->directory_manager->get_agent_directory( $user_id ); $this->base_path = "{$agent_dir}/daily"; } diff --git a/inc/Core/FilesRepository/DirectoryManager.php b/inc/Core/FilesRepository/DirectoryManager.php index ae6fc717..a23ae824 100644 --- a/inc/Core/FilesRepository/DirectoryManager.php +++ b/inc/Core/FilesRepository/DirectoryManager.php @@ -128,16 +128,60 @@ public function get_pipeline_context_directory( int|string $pipeline_id, string } /** - * Get agent directory path + * Get agent directory path. * - * @return string Full path to agent directory + * When a user_id is provided, returns the per-agent subdirectory + * ({base}/agent/{user_id}). When user_id is 0, returns the legacy + * shared directory ({base}/agent) for backward compatibility. + * + * @since 0.37.0 Added $user_id parameter for multi-agent partitioning. + * + * @param int $user_id WordPress user ID. 0 = legacy shared directory. + * @return string Full path to agent directory. */ - public function get_agent_directory(): string { + public function get_agent_directory( int $user_id = 0 ): string { $upload_dir = wp_upload_dir(); $base = trailingslashit( $upload_dir['basedir'] ) . self::REPOSITORY_DIR; + + if ( 0 < $user_id ) { + return "{$base}/agent/{$user_id}"; + } + return "{$base}/agent"; } + /** + * Get the default agent user ID. + * + * For single-agent installs, returns the configured default or the first admin user. + * + * @since 0.37.0 + * @return int Default agent user ID. + */ + public static function get_default_agent_user_id(): int { + if ( defined( 'DATAMACHINE_DEFAULT_AGENT_USER' ) ) { + return absint( DATAMACHINE_DEFAULT_AGENT_USER ); + } + + // Cache in a static to avoid repeated DB queries. + static $default_id = null; + if ( null !== $default_id ) { + return $default_id; + } + + // First admin user. + $admins = get_users( array( + 'role' => 'administrator', + 'number' => 1, + 'orderby' => 'ID', + 'order' => 'ASC', + 'fields' => 'ID', + ) ); + + $default_id = ! empty( $admins ) ? absint( $admins[0] ) : 1; + return $default_id; + } + /** * Get workspace directory path. * @@ -214,10 +258,13 @@ public function ensure_directory_exists( string $directory ): bool { * directory and its parent (datamachine-files/). * * @since 0.32.0 + * @since 0.37.0 Added $user_id parameter for multi-agent partitioning. + * + * @param int $user_id WordPress user ID. 0 = legacy shared directory. * @return bool True if directory exists and permissions were set. */ - public function ensure_agent_directory_writable(): bool { - $agent_dir = $this->get_agent_directory(); + public function ensure_agent_directory_writable( int $user_id = 0 ): bool { + $agent_dir = $this->get_agent_directory( $user_id ); if ( ! $this->ensure_directory_exists( $agent_dir ) ) { return false; diff --git a/inc/Engine/AI/Directives/CoreMemoryFilesDirective.php b/inc/Engine/AI/Directives/CoreMemoryFilesDirective.php index 9e1de329..7e9fd15e 100644 --- a/inc/Engine/AI/Directives/CoreMemoryFilesDirective.php +++ b/inc/Engine/AI/Directives/CoreMemoryFilesDirective.php @@ -49,6 +49,7 @@ public static function get_outputs( string $provider_name, array $tools, ?string DirectoryManager::ensure_agent_files(); $directory_manager = new DirectoryManager(); + // TODO: Multi-agent Phase 2 — resolve user_id from execution context (#565). $agent_dir = $directory_manager->get_agent_directory(); $outputs = array(); diff --git a/inc/Engine/AI/Directives/MemoryFilesReader.php b/inc/Engine/AI/Directives/MemoryFilesReader.php index 47dc4eda..665aab7d 100644 --- a/inc/Engine/AI/Directives/MemoryFilesReader.php +++ b/inc/Engine/AI/Directives/MemoryFilesReader.php @@ -31,6 +31,7 @@ public static function read( array $memory_files, string $scope_label, int $scop } $directory_manager = new DirectoryManager(); + // TODO: Multi-agent Phase 2 — resolve user_id from execution context (#565). $agent_dir = $directory_manager->get_agent_directory(); $outputs = array();