-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore_affinity_queue.py
More file actions
371 lines (295 loc) · 12.4 KB
/
core_affinity_queue.py
File metadata and controls
371 lines (295 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# -*- coding: utf-8 -*-
# core_affinity_queue.py
"""
Core-affinity policy and reporting for token-managed routing.
This module defines task-weight categories, per-weight core preference rules,
and lightweight affinity metrics used to describe how work is distributed
across physical cores.
It does not own mailbox routing or execution. Actual token placement is
performed by the pinned worker queue layer.
"""
import threading
import time
from dataclasses import dataclass
from enum import Enum
from queue import Queue
from typing import List
from .token_system import TaskToken
class TaskWeight(Enum):
"""Routing weight classes used by the affinity policy."""
HEAVY = "heavy" # High difficulty work gets Core 1
MEDIUM = "medium" # Balanced work gets Core 2+
LIGHT = "light" # Simple work gets Core 3+
@dataclass
class CorePreference:
"""Allowed-core set and preferred starting core for one weight class."""
allowed_cores: List[int] # Cores this weight can use
preferred_core: int # First choice
class CoreAffinityPolicy:
"""Builds and exposes per-weight core eligibility rules.
The policy derives allowed-core chains from detected physical core count
and preserves weight isolation rules where possible.
"""
def __init__(self, num_cores: int):
self.num_cores = num_cores
self._build_preferences()
def _build_preferences(self):
"""Construct per-weight core preference chains from available core count."""
# Heavy can use ALL cores, prefers Core 1
heavy_cores = list(range(1, self.num_cores + 1))
# Medium starts at Core 2 (NEVER Core 1)
medium_cores = list(range(2, self.num_cores + 1)) if self.num_cores >= 2 else []
if not medium_cores:
# Fallback for 1-core system (shouldn't happen but handle it)
medium_cores = [1]
# Light starts at Core 3 (NEVER Cores 1-2)
light_cores = list(range(3, self.num_cores + 1)) if self.num_cores >= 3 else []
if not light_cores:
# Fallback: use medium's cores
light_cores = medium_cores
self.preferences = {
TaskWeight.HEAVY: CorePreference(
allowed_cores=heavy_cores,
preferred_core=heavy_cores[0]
),
TaskWeight.MEDIUM: CorePreference(
allowed_cores=medium_cores,
preferred_core=medium_cores[0]
),
TaskWeight.LIGHT: CorePreference(
allowed_cores=light_cores,
preferred_core=light_cores[0]
)
}
print(f"[AFFINITY] Policy for {self.num_cores} cores:")
print(f" Heavy: {heavy_cores} (preferred: {heavy_cores[0]})")
print(f" Medium: {medium_cores} (preferred: {medium_cores[0] if medium_cores else 'N/A'})")
print(f" Light: {light_cores} (preferred: {light_cores[0] if light_cores else 'N/A'})")
def get_preference_chain(self, weight: TaskWeight) -> List[int]:
"""Return allowed cores for the given weight in preference order."""
return self.preferences[weight].allowed_cores
def can_use_core(self, weight: TaskWeight, core_id: int) -> bool:
"""Return whether the given core is eligible for the given weight."""
return core_id in self.preferences[weight].allowed_cores
class CoreQueue:
"""Per-core queue and affinity counter container.
This class stores queue, slot, and worker-pattern state for a single core.
It is separate from the policy-only affinity manager and may be used by
execution-layer components that need concrete per-core queue objects.
"""
def __init__(self, core_id: int, workers_per_core: int = 4):
self.core_id = core_id
self.workers_per_core = workers_per_core
# Queue for tokens
self.queue = Queue()
# Slot management
self.semaphore = threading.Semaphore(workers_per_core)
# Affinity tracking
self.heavy_count = 0
self.medium_count = 0
self.light_count = 0
self._affinity_lock = threading.Lock()
# Dynamic workers (will be created later)
self.workers = []
self.current_pattern = workers_per_core # Default: all workers active
def has_capacity(self) -> bool:
"""Return whether the core currently has at least one free worker slot."""
return self.semaphore._value > 0
def record_task(self, weight: TaskWeight):
"""Increment the observed usage counter for the given weight."""
with self._affinity_lock:
if weight == TaskWeight.HEAVY:
self.heavy_count += 1
elif weight == TaskWeight.MEDIUM:
self.medium_count += 1
else:
self.light_count += 1
def get_affinity_stats(self) -> dict:
"""Return percentage and total task distribution for this core."""
with self._affinity_lock:
total = self.heavy_count + self.medium_count + self.light_count
if total == 0:
return {'heavy': 0.0, 'medium': 0.0, 'light': 0.0}
return {
'heavy': (self.heavy_count / total) * 100,
'medium': (self.medium_count / total) * 100,
'light': (self.light_count / total) * 100,
'total_tasks': total
}
def set_pattern(self, pattern: int):
"""Update the number of active workers associated with this core."""
if pattern == self.current_pattern:
return
self.current_pattern = pattern
# Dynamic workers handle this via set_active()
if self.workers:
for i, worker in enumerate(self.workers):
should_be_active = (i < pattern)
if hasattr(worker, 'set_active'):
worker.set_active(should_be_active)
class CoreAffinityQueue:
"""Policy and reporting layer for weight-based core affinity.
This class classifies tokens, exposes the allowed core chain for each
weight class, and records routing outcomes reported by the execution
queue layer.
It does not place tokens into mailboxes directly.
"""
def __init__(self, topology, workers_per_core: int = 4):
self.topology = topology
self.workers_per_core = workers_per_core
self.num_cores = topology.physical_cores
# Build affinity policy
self.policy = CoreAffinityPolicy(self.num_cores)
# Simple counters (no CoreQueue objects!)
self._affinity_counts = {
core_id: {'heavy': 0, 'medium': 0, 'light': 0}
for core_id in range(1, self.num_cores + 1)
}
# Metrics
self.total_routed = 0
self.routing_failures = 0
self._routing_lock = threading.Lock()
def classify_token_weight(self, token: TaskToken) -> TaskWeight:
"""Infer routing weight from token tags or operation type.
Tag-based weight takes precedence. If no explicit weight tag is present,
the operation type is inspected for heavy/light hints. Medium is the
fallback class.
"""
# Check tags first
if 'weight' in token.metadata.tags:
weight_str = token.metadata.tags['weight'].lower()
if weight_str == 'heavy':
return TaskWeight.HEAVY
elif weight_str == 'light':
return TaskWeight.LIGHT
else:
return TaskWeight.MEDIUM
# Check operation_type suffix
op_type = token.metadata.operation_type.lower()
if op_type.endswith('_heavy') or 'heavy' in op_type:
return TaskWeight.HEAVY
elif op_type.endswith('_light') or 'light' in op_type:
return TaskWeight.LIGHT
# Default to medium
return TaskWeight.MEDIUM
def get_valid_cores_for_weight(self, weight: TaskWeight) -> List[int]:
"""Return the allowed core chain for the given weight."""
return self.policy.get_preference_chain(weight)
def record_task_routed(self, core_id: int, weight: TaskWeight):
"""Record one completed routing decision reported by the queue layer."""
with self._routing_lock:
self.total_routed += 1
self._affinity_counts[core_id][weight.value] += 1
def get_affinity_report(self) -> dict:
"""Return per-core weight distribution percentages and totals."""
report = {}
for core_id, counts in self._affinity_counts.items():
total = sum(counts.values())
if total > 0:
# Calculate percentages
report[f'core_{core_id}'] = {
'heavy': (counts['heavy'] / total) * 100,
'medium': (counts['medium'] / total) * 100,
'light': (counts['light'] / total) * 100,
'total_tasks': total
}
else:
# No tasks yet
report[f'core_{core_id}'] = {
'heavy': 0.0,
'medium': 0.0,
'light': 0.0,
'total_tasks': 0
}
return report
def get_stats(self) -> dict:
"""Return a composite snapshot of affinity configuration and routing totals."""
affinity_report = self.get_affinity_report()
return {
'num_cores': self.num_cores,
'workers_per_core': self.workers_per_core,
'total_routed': self.total_routed,
'routing_failures': self.routing_failures,
'affinity_distribution': affinity_report
}
def print_affinity_report(self):
"""Print a human-readable per-core affinity distribution report."""
print()
print("=" * 70)
print("CORE AFFINITY REPORT")
print("=" * 70)
# Get report dict
report = self.get_affinity_report()
# Print each core's stats
for core_id in range(1, self.num_cores + 1):
core_key = f'core_{core_id}'
if core_key in report:
stats = report[core_key]
print(f"\nCore {core_id}:")
print(f" Heavy: {stats['heavy']:>5.1f}%")
print(f" Medium: {stats['medium']:>5.1f}%")
print(f" Light: {stats['light']:>5.1f}%")
print(f" Total: {stats['total_tasks']} tasks")
print("=" * 70)
# ============================================================================
# TESTING
# ============================================================================
if __name__ == '__main__':
from .topology_detector import TopologyDetector
from .token_system import TaskToken, TokenMetadata
print("=" * 70)
print("CORE AFFINITY QUEUE TEST")
print("=" * 70)
print()
# Detect topology
detector = TopologyDetector()
topology = detector.detect()
print(f"System: {topology.physical_cores} physical cores")
print()
# Create affinity queue
affinity_queue = CoreAffinityQueue(topology, workers_per_core=4)
print()
print("-" * 70)
print("TEST: Route tokens to cores")
print("-" * 70)
print()
# Create test tokens with different weights
def dummy_func():
return "test"
tokens = []
# Heavy tasks (should go to Core 1)
for i in range(3):
meta = TokenMetadata(
operation_type='compute_heavy',
created_at=time.time(),
tags={'weight': 'heavy'}
)
token = TaskToken(f'heavy_{i}', dummy_func, (), {}, meta)
tokens.append(('heavy', token))
# Medium tasks (should go to Core 2+)
for i in range(3):
meta = TokenMetadata(
operation_type='process_medium',
created_at=time.time(),
tags={'weight': 'medium'}
)
token = TaskToken(f'medium_{i}', dummy_func, (), {}, meta)
tokens.append(('medium', token))
# Light tasks (should go to Core 3+)
for i in range(3):
meta = TokenMetadata(
operation_type='io_light',
created_at=time.time(),
tags={'weight': 'light'}
)
token = TaskToken(f'light_{i}', dummy_func, (), {}, meta)
tokens.append(('light', token))
# Print report
affinity_queue.print_affinity_report()
print()
print("Stats:")
stats = affinity_queue.get_stats()
print(f" Total routed: {stats['total_routed']}")
print(f" Failures: {stats['routing_failures']}")
print()
print("Test complete!")