Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data-machine.php
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,11 @@ function datamachine_activate_plugin( $network_wide = false ) {
function datamachine_activate_for_site() {
$db_pipelines = new \DataMachine\Core\Database\Pipelines\Pipelines();
$db_pipelines->create_table();
$db_pipelines->migrate_columns();

$db_flows = new \DataMachine\Core\Database\Flows\Flows();
$db_flows->create_table();
$db_flows->migrate_columns();

$db_jobs = new \DataMachine\Core\Database\Jobs\Jobs();
$db_jobs->create_table();
Expand Down
83 changes: 76 additions & 7 deletions inc/Core/Database/Flows/Flows.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public static function create_table(): void {
$sql = "CREATE TABLE $table_name (
flow_id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
pipeline_id bigint(20) unsigned NOT NULL,
user_id bigint(20) unsigned NOT NULL DEFAULT 0,
flow_name varchar(255) NOT NULL,
flow_config longtext NOT NULL,
scheduling_config longtext NOT NULL,
PRIMARY KEY (flow_id),
KEY pipeline_id (pipeline_id)
KEY pipeline_id (pipeline_id),
KEY user_id (user_id)
) $charset_collate;";

require_once ABSPATH . 'wp-admin/includes/upgrade.php';
Expand All @@ -48,6 +50,57 @@ public static function create_table(): void {
);
}

/**
* Migrate existing table columns to current schema.
*
* Handles:
* - user_id column: added for multi-agent support
*
* Safe to run multiple times - only executes if columns need updating.
*/
public function migrate_columns(): void {
// Check if user_id column already exists.
// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery
$column = $this->wpdb->get_var(
$this->wpdb->prepare(
"SELECT COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s AND COLUMN_NAME = 'user_id'",
DB_NAME,
$this->table_name
)
);

if ( null === $column ) {
// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.SchemaChange
$result = $this->wpdb->query(
"ALTER TABLE {$this->table_name}
ADD COLUMN user_id bigint(20) unsigned NOT NULL DEFAULT 0 AFTER pipeline_id,
ADD KEY user_id (user_id)"
);

if ( false === $result ) {
do_action(
'datamachine_log',
'error',
'Failed to add user_id column to flows table',
array(
'table_name' => $this->table_name,
'db_error' => $this->wpdb->last_error,
)
);
return;
}

do_action(
'datamachine_log',
'info',
'Added user_id column to flows table for multi-agent support',
array( 'table_name' => $this->table_name )
);
}
}

public function create_flow( array $flow_data ) {

// Validate required fields
Expand All @@ -70,15 +123,19 @@ public function create_flow( array $flow_data ) {
$flow_config = wp_json_encode( $flow_data['flow_config'] );
$scheduling_config = wp_json_encode( $flow_data['scheduling_config'] );

$user_id = isset( $flow_data['user_id'] ) ? absint( $flow_data['user_id'] ) : 0;

$insert_data = array(
'pipeline_id' => intval( $flow_data['pipeline_id'] ),
'user_id' => $user_id,
'flow_name' => sanitize_text_field( $flow_data['flow_name'] ),
'flow_config' => $flow_config,
'scheduling_config' => $scheduling_config,
);

$insert_format = array(
'%d', // pipeline_id
'%d', // user_id
'%s', // flow_name
'%s', // flow_config
'%s', // scheduling_config
Expand Down Expand Up @@ -197,14 +254,23 @@ public function get_flows_for_pipeline( int $pipeline_id ): array {
*
* Used for global operations like handler-based filtering across the entire system.
*
* @param int|null $user_id Optional user ID to filter by.
* @return array All flows with decoded configs.
*/
public function get_all_flows(): array {
// phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
$flows = $this->wpdb->get_results(
$this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_id ASC, flow_id ASC', $this->table_name ),
ARRAY_A
);
public function get_all_flows( ?int $user_id = null ): array {
if ( null !== $user_id ) {
// phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
$flows = $this->wpdb->get_results(
$this->wpdb->prepare( 'SELECT * FROM %i WHERE user_id = %d ORDER BY pipeline_id ASC, flow_id ASC', $this->table_name, $user_id ),
ARRAY_A
);
} else {
// phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
$flows = $this->wpdb->get_results(
$this->wpdb->prepare( 'SELECT * FROM %i ORDER BY pipeline_id ASC, flow_id ASC', $this->table_name ),
ARRAY_A
);
}

if ( null === $flows ) {
return array();
Expand Down Expand Up @@ -522,6 +588,9 @@ public function get_flow_scheduling( int $flow_id ): ?array {
* Get flows ready for execution based on scheduling.
*
* Uses jobs table to determine last run time (single source of truth).
*
* Note: No user_id filter here — the scheduler must run ALL users' flows.
* User-scoping happens at the pipeline/flow management level, not execution.
*/
public function get_flows_ready_for_execution(): array {

Expand Down
25 changes: 23 additions & 2 deletions inc/Core/Database/Jobs/Jobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public static function create_table() {
// status is VARCHAR(255) to support compound statuses with reasons
$sql = "CREATE TABLE $table_name (
job_id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
user_id bigint(20) unsigned NOT NULL DEFAULT 0,
pipeline_id varchar(20) NOT NULL,
flow_id varchar(20) NOT NULL,
source varchar(50) NOT NULL DEFAULT 'pipeline',
Expand All @@ -141,7 +142,8 @@ public static function create_table() {
KEY pipeline_id (pipeline_id),
KEY flow_id (flow_id),
KEY source (source),
KEY parent_job_id (parent_job_id)
KEY parent_job_id (parent_job_id),
KEY user_id (user_id)
) $charset_collate;";

dbDelta( $sql );
Expand Down Expand Up @@ -179,7 +181,7 @@ private static function migrate_columns( string $table_name ): void {
"SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
AND COLUMN_NAME IN ('status', 'pipeline_id', 'flow_id', 'source', 'parent_job_id')",
AND COLUMN_NAME IN ('status', 'pipeline_id', 'flow_id', 'source', 'parent_job_id', 'user_id')",
DB_NAME,
$table_name
),
Expand Down Expand Up @@ -286,5 +288,24 @@ private static function migrate_columns( string $table_name ): void {
);
}
}

// Add user_id column for multi-agent support.
if ( ! isset( $columns['user_id'] ) ) {
// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.SchemaChange
$result = $wpdb->query(
"ALTER TABLE {$table_name}
ADD COLUMN user_id bigint(20) unsigned NOT NULL DEFAULT 0 AFTER job_id,
ADD KEY user_id (user_id)"
);

if ( false !== $result ) {
do_action(
'datamachine_log',
'info',
'Added user_id column to jobs table for multi-agent support',
array( 'table_name' => $table_name )
);
}
}
}
}
14 changes: 13 additions & 1 deletion inc/Core/Database/Jobs/JobsOperations.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,18 @@ public function create_job( array $job_data ): int|false {
$label = isset( $job_data['label'] ) ? sanitize_text_field( $job_data['label'] ) : null;

$parent_job_id = isset( $job_data['parent_job_id'] ) ? absint( $job_data['parent_job_id'] ) : 0;
$user_id = isset( $job_data['user_id'] ) ? absint( $job_data['user_id'] ) : 0;

$data = array(
'pipeline_id' => $pipeline_id,
'flow_id' => $flow_id,
'user_id' => $user_id,
'source' => $source,
'label' => $label,
'status' => 'pending',
);

$format = array( '%s', '%s', '%s', '%s', '%s' );
$format = array( '%s', '%s', '%d', '%s', '%s', '%s' );

if ( $parent_job_id > 0 ) {
$data['parent_job_id'] = $parent_job_id;
Expand Down Expand Up @@ -153,6 +155,11 @@ public function get_jobs_count( array $args = array() ): int {
$where_values[] = sanitize_text_field( $args['source'] );
}

if ( isset( $args['user_id'] ) ) {
$where_clauses[] = 'user_id = %d';
$where_values[] = absint( $args['user_id'] );
}

if ( ! empty( $args['since'] ) ) {
$where_clauses[] = 'created_at >= %s';
$where_values[] = sanitize_text_field( $args['since'] );
Expand Down Expand Up @@ -244,6 +251,11 @@ public function get_jobs_for_list_table( array $args ): array {
$where_values[] = sanitize_text_field( $args['source'] );
}

if ( isset( $args['user_id'] ) ) {
$where_clauses[] = 'j.user_id = %d';
$where_values[] = absint( $args['user_id'] );
}

if ( ! empty( $args['since'] ) ) {
$where_clauses[] = 'j.created_at >= %s';
$where_values[] = sanitize_text_field( $args['since'] );
Expand Down
Loading
Loading