Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"@biomejs/biome": "^1.9.4",
"@ianvs/prettier-plugin-sort-imports": "^4.7.0",
"@types/node": "^25.3.3",
"poku": "^4.1.0",
"poku": "4.2.0",
"prettier": "^3.8.1",
"tsx": "^4.21.0",
"typescript": "^5.9.3"
Expand Down
9 changes: 9 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { SharedResourcesConfig } from './types.js';
import {
configureCodecs,
globalRegistry,
resetSharedResourcesRuntime,
setExecutionMode,
setupSharedResourceIPC,
} from './shared-resources.js';

Expand All @@ -15,11 +17,18 @@ export const sharedResources = (config?: SharedResourcesConfig): PokuPlugin => {
onTestProcess(child) {
setupSharedResourceIPC(child);
},
setup(context) {
const { isolation } = context.configs;

setExecutionMode(isolation === 'none' ? 'in-process' : 'process');
},
async teardown() {
const entries = Object.values(globalRegistry);

for (const entry of entries)
if (entry.onDestroy) await entry.onDestroy(entry.state);

resetSharedResourcesRuntime();
},
};
};
Expand Down
58 changes: 56 additions & 2 deletions src/shared-resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
ResourceContext,
SendIPCMessageOptions,
SharedResourceEntry,
SharedResourceExecutionMode,
} from './types.js';
import process from 'node:process';
import { pathToFileURL } from 'node:url';
Expand All @@ -29,6 +30,7 @@ const isWindows = process.platform === 'win32';

const resourceRegistry = new ResourceRegistry<SharedResourceEntry>();
const moduleCounters = new Map<string, number>();
let executionMode: SharedResourceExecutionMode = 'process';

export const SHARED_RESOURCE_MESSAGE_TYPES = {
REQUEST_RESOURCE: 'shared_resources_requestResource',
Expand All @@ -39,6 +41,19 @@ export const SHARED_RESOURCE_MESSAGE_TYPES = {

export const globalRegistry = resourceRegistry.getRegistry();

export const setExecutionMode = (mode: SharedResourceExecutionMode) => {
executionMode = mode;
};

export const getExecutionMode = () => executionMode;

export const resetSharedResourcesRuntime = () => {
resourceRegistry.clear();
resourceRegistry.setIsRegistering(false);
moduleCounters.clear();
executionMode = 'process';
};

const create = <T>(
factory: () => T,
options?: {
Expand Down Expand Up @@ -72,10 +87,19 @@ const use = async <T>(
): Promise<MethodsToRPC<T>> => {
const { name } = context;

// Parent Process (Host)
if (!process.send || resourceRegistry.getIsRegistering()) {
// In-process execution always resolves resources directly from the local registry.
if (
executionMode === 'in-process' ||
!process.send ||
resourceRegistry.getIsRegistering()
) {
const existing = resourceRegistry.get(name);
if (existing) {
if (executionMode === 'in-process')
return constructInProcessResource(
existing.state as Record<string, unknown>
) as MethodsToRPC<T>;

return existing.state as MethodsToRPC<T>;
}

Expand All @@ -87,6 +111,11 @@ const use = async <T>(
| undefined,
});

if (executionMode === 'in-process')
return constructInProcessResource(
state as Record<string, unknown>
) as MethodsToRPC<T>;

return state as MethodsToRPC<T>;
}

Expand Down Expand Up @@ -148,6 +177,12 @@ export const sendIPCMessage = <TResponse>(
};

const requestResource = async (name: string, module: string) => {
if (executionMode === 'in-process') {
throw new Error(
'Cannot request shared resources through IPC while running in in-process mode.'
);
}

const requestId = `${name}-${Date.now()}-${Math.random()}`;

const response = await sendIPCMessage<IPCResourceResultMessage>({
Expand Down Expand Up @@ -183,6 +218,12 @@ const remoteProcedureCall = async (
method: string,
args: unknown[]
) => {
if (executionMode === 'in-process') {
throw new Error(
'Cannot run shared resource RPCs through IPC while running in in-process mode.'
);
}

const requestId = `${name}-${method}-${Date.now()}-${Math.random()}`;

const response = await sendIPCMessage<IPCRemoteProcedureCallResultMessage>({
Expand Down Expand Up @@ -332,6 +373,8 @@ export const setupSharedResourceIPC = (
child: IPCEventEmitter | ChildProcess,
registry: Record<string, SharedResourceEntry> = globalRegistry
): void => {
if (executionMode === 'in-process') return;

child.on('message', async (message: IPCMessage) => {
if (message.type === SHARED_RESOURCE_MESSAGE_TYPES.REQUEST_RESOURCE)
await handleRequestResource(message, registry, child);
Expand Down Expand Up @@ -497,6 +540,17 @@ const constructSharedResourceWithRPCs = (
});
};

const constructInProcessResource = (target: Record<string, unknown>) =>
new Proxy(target, {
get(target, prop, receiver) {
const value = Reflect.get(target, prop, receiver);

if (typeof value !== 'function') return value;

return async (...args: unknown[]) => value.apply(target, args);
},
});

export const resource = {
create,
use,
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,4 @@ export type SharedResourcesConfig = {
// biome-ignore lint/suspicious/noExplicitAny: see ArgCodec
codecs?: ArgCodec<any>[];
};
export type SharedResourceExecutionMode = 'process' | 'in-process';
13 changes: 13 additions & 0 deletions test/integration/shared-resources.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ describe('Shared Resources', async () => {
noExit: true,
plugins: [sharedResources()],
concurrency: 0,
timeout: 10000,
});

assert.strictEqual(code, 0, 'Exit Code needs to be 0');
});

await it('Runs on isolation: none', async () => {
const code = await poku('test/__fixtures__/parallel', {
noExit: true,
plugins: [sharedResources()],
isolation: 'none',
concurrency: 0,
timeout: 10000,
});

assert.strictEqual(code, 0, 'Exit Code needs to be 0');
Expand Down
56 changes: 56 additions & 0 deletions test/unit/execution-mode.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { assert, test } from 'poku';
import {
getExecutionMode,
resetSharedResourcesRuntime,
setExecutionMode,
setupSharedResourceIPC,
} from '../../src/shared-resources.js';

test('Execution mode', async () => {
await test('should switch between process and in-process mode', () => {
setExecutionMode('in-process');
assert.strictEqual(
getExecutionMode(),
'in-process',
'Mode should be in-process'
);

setExecutionMode('process');
assert.strictEqual(getExecutionMode(), 'process', 'Mode should be process');
});

await test('should skip IPC setup when running in-process', () => {
setExecutionMode('in-process');

let listenerCalls = 0;
const child = {
on() {
listenerCalls++;
},
send() {
return true;
},
};

setupSharedResourceIPC(child as never);

assert.strictEqual(
listenerCalls,
0,
'No message listener should be attached in in-process mode'
);

setExecutionMode('process');
});

await test('should reset execution mode to process', () => {
setExecutionMode('in-process');
resetSharedResourcesRuntime();

assert.strictEqual(
getExecutionMode(),
'process',
'Reset should restore process mode'
);
});
});
Loading