diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 7080dfef009d..bfe023901bd2 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -92,7 +92,8 @@ def counter( @staticmethod def distribution( namespace: Union[Type, str], - name: str) -> 'Metrics.DelegatingDistribution': + name: str, + process_wide: bool = False) -> 'Metrics.DelegatingDistribution': """Obtains or creates a Distribution metric. Distribution metrics are restricted to integer-only distributions. @@ -105,11 +106,14 @@ def distribution( A Distribution object. """ namespace = Metrics.get_namespace(namespace) - return Metrics.DelegatingDistribution(MetricName(namespace, name)) + return Metrics.DelegatingDistribution( + MetricName(namespace, name), process_wide=process_wide) @staticmethod def gauge( - namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingGauge': + namespace: Union[Type, str], + name: str, + process_wide: bool = False) -> 'Metrics.DelegatingGauge': """Obtains or creates a Gauge metric. Gauge metrics are restricted to integer-only values. @@ -117,12 +121,15 @@ def gauge( Args: namespace: A class or string that gives the namespace to a metric name: A string that gives a unique name to a metric + process_wide: Whether or not the metric is specific to the current bundle + or should be calculated for the entire process. Returns: - A Distribution object. + A Gauge object. """ namespace = Metrics.get_namespace(namespace) - return Metrics.DelegatingGauge(MetricName(namespace, name)) + return Metrics.DelegatingGauge( + MetricName(namespace, name), process_wide=process_wide) @staticmethod def string_set( @@ -210,15 +217,20 @@ def __init__( class DelegatingDistribution(Distribution): """Metrics Distribution Delegates functionality to MetricsEnvironment.""" - def __init__(self, metric_name: MetricName) -> None: + def __init__( + self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self.update = MetricUpdater(cells.DistributionCell, metric_name) # type: ignore[method-assign] + self.update = MetricUpdater(cells.DistributionCell, metric_name, process_wide=process_wide) # type: ignore[method-assign] class DelegatingGauge(Gauge): """Metrics Gauge that Delegates functionality to MetricsEnvironment.""" - def __init__(self, metric_name: MetricName) -> None: + def __init__( + self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[method-assign] + self.set = MetricUpdater( # type: ignore[method-assign] + cells.GaugeCell, + metric_name, + process_wide=process_wide) class DelegatingStringSet(StringSet): """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" diff --git a/sdks/python/apache_beam/ml/inference/model_manager.py b/sdks/python/apache_beam/ml/inference/model_manager.py index 186611984df0..bae18f492351 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager.py +++ b/sdks/python/apache_beam/ml/inference/model_manager.py @@ -45,9 +45,11 @@ import torch from scipy.optimize import nnls +from apache_beam.metrics.metric import Metrics from apache_beam.utils import multi_process_shared logger = logging.getLogger(__name__) +_MODEL_MANAGER_METRICS_NAMESPACE = "BeamML_ModelManager" class GPUMonitor: @@ -205,6 +207,10 @@ def set_initial_estimate(self, model_tag: str, cost: float): with self._lock: self.estimates[model_tag] = cost self.known_models.add(model_tag) + Metrics.distribution( + _MODEL_MANAGER_METRICS_NAMESPACE, + f"memory_estimate_mb_{model_tag}", + process_wide=True).update(int(cost)) self.logging_info("Initial Profile for %s: %s MB", model_tag, cost) def add_observation( @@ -291,6 +297,11 @@ def _solve(self): self.logging_info( "Updated Estimate for %s: %.1f MB", model, self.estimates[model]) + + Metrics.distribution( + _MODEL_MANAGER_METRICS_NAMESPACE, + f"memory_estimate_mb_{model}", + process_wide=True).update(int(self.estimates[model])) self.logging_info("System Bias: %s MB", bias) except Exception as e: @@ -374,6 +385,20 @@ def __init__( self._monitor.start() + def _update_model_count_metric(self): + for tag, instances in self._models.items(): + Metrics.distribution( + _MODEL_MANAGER_METRICS_NAMESPACE, + f"num_loaded_models_{tag}", + process_wide=True).update(len(instances)) + + def _clear_all_model_metrics(self): + for tag in self._models: + Metrics.distribution( + _MODEL_MANAGER_METRICS_NAMESPACE, + f"num_loaded_models_{tag}", + process_wide=True).update(0) + def logging_info(self, message: str, *args): if self._verbose_logging: logger.info(message, *args) @@ -719,6 +744,7 @@ def _perform_eviction(self, key: str, tag: str, instance: Any, score: int): self._monitor.reset_peak() curr, _, _ = self._monitor.get_stats() self.logging_info("Resource Usage After Eviction: %.1f MB", curr) + self._update_model_count_metric() def _spawn_new_model( self, @@ -741,6 +767,7 @@ def _spawn_new_model( self._pending_reservations = max( 0.0, self._pending_reservations - est_cost) self._models[tag].append(instance) + self._update_model_count_metric() return instance except Exception as e: @@ -758,6 +785,7 @@ def _spawn_new_model( raise e def _delete_all_models(self): + self._clear_all_model_metrics() self._idle_lru.clear() for _, instances in self._models.items(): for instance in instances: diff --git a/sdks/python/apache_beam/ml/inference/model_manager_test.py b/sdks/python/apache_beam/ml/inference/model_manager_test.py index 270401857e04..370961eb61dd 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager_test.py +++ b/sdks/python/apache_beam/ml/inference/model_manager_test.py @@ -26,6 +26,8 @@ from apache_beam.utils import multi_process_shared try: + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName from apache_beam.ml.inference.model_manager import GPUMonitor from apache_beam.ml.inference.model_manager import ModelManager from apache_beam.ml.inference.model_manager import ResourceEstimator @@ -335,6 +337,69 @@ def dummy_loader(): instance = self.manager.acquire_model(model_name, lambda: "model_instance") self.manager.release_model(model_name, instance) + def test_model_manager_metrics(self): + """Test that distribution metrics are updated correctly.""" + tag1 = "model1" + tag2 = "model2" + + def _get_count_dist_max(tag): + dist = MetricsEnvironment.process_wide_container().get_distribution( + MetricName('BeamML_ModelManager', f'num_loaded_models_{tag}')) + return dist.get_cumulative().max + + def _get_est_dist_mean(tag): + dist = MetricsEnvironment.process_wide_container().get_distribution( + MetricName('BeamML_ModelManager', f'memory_estimate_mb_{tag}')) + val = dist.get_cumulative() + return int(val.sum / val.count) if val.count > 0 else 0 + + # Verify that initial estimates correctly export int metrics + self.manager._estimator.set_initial_estimate(tag1, 1000.5) + self.assertEqual(_get_est_dist_mean(tag1), 1000) + + self.manager._estimator.set_initial_estimate(tag2, 2000.9) + self.assertEqual(_get_est_dist_mean(tag2), 2000) + + # 1. Acquire a model + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_count_dist_max(tag1), 1) + self.assertEqual(_get_est_dist_mean(tag1), 1000) + + # 2. Acquire another instance of same model + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_count_dist_max(tag1), 2) + self.assertEqual(_get_est_dist_mean(tag1), 1000) + + # 3. Acquire a different model + self.manager.acquire_model( + tag2, lambda: MockModel(tag2, 2000.0, self.mock_monitor)) + self.assertEqual(_get_count_dist_max(tag2), 1) + self.assertEqual(_get_est_dist_mean(tag2), 2000) + + # tag1 max count should remain 2 + self.assertEqual(_get_count_dist_max(tag1), 2) + self.assertEqual(_get_est_dist_mean(tag1), 1000) + + # 4. Delete all models + self.manager._delete_all_models() + # It retains the highest count it ever saw. + self.assertEqual(_get_count_dist_max(tag1), 2) + self.assertEqual(_get_count_dist_max(tag2), 1) + + self.assertEqual(_get_est_dist_mean(tag1), 1000) + self.assertEqual(_get_est_dist_mean(tag2), 2000) + + # 5. Repopulate and force reset + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + # Max is still 2 from earlier in the test run + self.assertEqual(_get_count_dist_max(tag1), 2) + + self.manager._force_reset() + self.assertEqual(_get_count_dist_max(tag1), 2) + def test_single_model_convergence_with_fluctuations(self): """ Tests that the estimator converges to the true usage with fluctuations.