diff --git a/README.md b/README.md new file mode 100644 index 0000000..6b6e12f --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Monitoring_with_zmq diff --git a/metrics/AbstractMetrics.py b/metrics/AbstractMetrics.py index 959da7b..1006811 100644 --- a/metrics/AbstractMetrics.py +++ b/metrics/AbstractMetrics.py @@ -72,9 +72,18 @@ def _flush_metrics(self): def close(self): self.__end_time = time() self.__time_in_millis = int((self.__end_time - self.__start_time)*1000) + self.__time = self.__end_time - self.__start_time self.__block_mutates = True self._flush_metrics() + def __properties__(self): + properties = self.__properties + properties.update(self.__dates) + properties.update(self.__timing) + properties.update(self.__metrics) + time = {'time_to_execute':self.__time} + properties.update(time) + return properties def __str__(self): display_string = [] @@ -99,6 +108,7 @@ def __str__(self): ''' display all the counters items''' display_string.append(AbstractMetrics.__EOE) + ''' display end of metrics''' return ''.join(display_string) @@ -148,8 +158,12 @@ def __isTimingMetric(self, unit): class AbstractMetricsFactory(MetricsFactory): ''' An abstract factory which creates some basic attributes of a metric object ''' - + # TODO: This classes should not be instantiable. + # To be removed... But we need to solve this problem first. + # metaclass conflict: the metaclass of a derived class must be a (non-strict) + # subclass of the metaclasses of all its bases __metaclass__ = ABCMeta + '''This prevents instantiation of the class since it is an interface. This class however cannot be instantiated as it is an abstract class ''' diff --git a/metrics/Metrics.py b/metrics/Metrics.py index 1c0c84c..d020ddf 100644 --- a/metrics/Metrics.py +++ b/metrics/Metrics.py @@ -158,6 +158,10 @@ def close(self): class MetricsFactory(object): """ This is an interface factory method to create an instance of Metrics class """ + # TODO: This classes should not be instantiable. + # But we need to solve this problem first. + # metaclass conflict: the metaclass of a derived class must be a (non-strict) + # subclass of the metaclasses of all its bases __metaclass__ = ABCMeta """This prevents instantiation of the class since it is an interface """ diff --git a/metrics/ThreadLocalMetrics.py b/metrics/ThreadLocalMetrics.py index ca9f898..a647259 100644 --- a/metrics/ThreadLocalMetrics.py +++ b/metrics/ThreadLocalMetrics.py @@ -1,6 +1,5 @@ ''' Created on Jan 27, 2016 - @author: souvik ''' @@ -9,21 +8,9 @@ from abc import ABCMeta from AbstractMetrics import AbstractMetrics, AbstractMetricsFactory import threading - -def create_timed_rotating_log(path): - ''' This method describes the logging type of service logs - ''' - logger = logging.getLogger("service.log") - # logger.propagate = False - # Uncomment this after thorough validation in Production, that all metrics are in service - # logs. This will remove the service logs from cinder API. - handler = logging.handlers.WatchedFileHandler(path) - # Cinder itself uses watched file handler. LogRotation is handled externally using logrotate.d - logger.addHandler(handler) - return logger - -# TODO: This needs to be configurable and passed through in the class. -logger = create_timed_rotating_log("/var/log/cinder/service.log") +import zmq +import json +import configparser class ThreadLocalMetrics(AbstractMetrics): ''' Number of files to retain in the log folder @@ -31,12 +18,14 @@ class ThreadLocalMetrics(AbstractMetrics): __threadLocal = threading.local() - def __init__(self): + def __init__(self, logger, use_zmq): ''' Constructor ''' super(ThreadLocalMetrics, self).__init__() ThreadLocalMetrics.__threadLocal.metrics = self + self.__logger = logger + self.use_zmq = use_zmq @staticmethod def get(): @@ -48,7 +37,12 @@ def _initialize_metrics(self): def _flush_metrics(self): ''' This just prints out the Metric object ''' - logger.info(self.__str__()) + if self.use_zmq != True: + self.__logger.info(self.__str__()) + elif self.use_zmq==True: + properties = self.__properties__() + properties = json.dumps(properties) + self.__logger.send_json(properties) def __str(self): return super(self).__str__() @@ -58,18 +52,70 @@ def close(self): ThreadLocalMetrics.__threadLocal.__dict__.clear() + +class Singleton(type): + def __init__(cls, name, bases, dic): + super(Singleton, cls).__init__(name, bases, dic) + cls.instance = None + + def __call__(cls, *args, **kwargs): + if cls.instance is None: + cls.instance = super(Singleton, cls).__call__(*args, **kwargs) + return cls.instance + +class TLMFMeta(ABCMeta, Singleton): + pass + class ThreadLocalMetricsFactory(AbstractMetricsFactory): ''' Factory method to create Thread Local Metrics Example Usage: - metricsFactory = ThreadLocalMetricsFactory("/tmp/service_log").with_account_id("xxxxxxxxxxxxxxx").with_marketplace_id("IDC1").with_program_name("CinderAPI").with_operation_name("CreateVolume"); + metricsFactory = ThreadLocalMetricsFactory("/tmp/service_log").with_marketplace_id("IDC1").with_program_name("CinderAPI") metrics = metricsFactory.create_metrics(); ''' - - def __init__(self, service_log_path): - super(ThreadLocalMetricsFactory, self).__init__() - + __metaclass__ = TLMFMeta + + def __init__(self, use_zmq, service_log_path, propagate_to_application_logs = True): + super(ThreadLocalMetricsFactory, self).__init__() + self.use_zmq = use_zmq + if use_zmq==True: + self.__logger = self.connect_zmq() + else: + # This is done so that metrics flowing in cinder do not break. To be removed soon. + service_log_path = service_log_path.replace("service_log", "service.log") + self.__logger = self.create_timed_rotating_log(service_log_path, propagate_to_application_logs) + ''' + This method creates a thread local metrics + ''' def create_metrics(self): - - metrics = ThreadLocalMetrics() + metrics = ThreadLocalMetrics(self.__logger,self.use_zmq) self._add_metric_attributes(metrics) return metrics + + def connect_zmq(self): + path = '/home/zmq.conf' + config = configparser.ConfigParser() + config.read(path) + if 'zmq' in config: + port = config['zmq']['port'] + add = config['zmq']['add'] + protocol = config['zmq']['protocol'] + zmqPath = str(protocol)+"://"+str(add)+":"+str(port) + context = zmq.Context() + self.zmq_socket = context.socket(zmq.PUSH) + self.zmq_socket.connect(zmqPath) + return self.zmq_socket + + #The default path is given to ensure backward compatibility to cinder. To be removed soon. + def create_timed_rotating_log(self, path = "/var/log/cinder/service.log", propagate_to_application_logs = True): + ''' This method describes the logging type of service logs + ''' + logger = logging.getLogger("service.log") + logger.propagate = propagate_to_application_logs + # Uncomment this after thorough validation in Production, that all metrics are in service + # logs. This will remove the service logs from cinder API. + + handler = logging.handlers.WatchedFileHandler(path) + + # Cinder itself uses watched file handler. LogRotation is handled externally using logrotate.d + logger.addHandler(handler) + return logger diff --git a/metrics/metric_util.py b/metrics/metric_util.py new file mode 100644 index 0000000..90000e6 --- /dev/null +++ b/metrics/metric_util.py @@ -0,0 +1,100 @@ +''' +Created on Jan 29, 2016 + +@author: souvik +''' +from metrics.ThreadLocalMetrics import ThreadLocalMetrics, ThreadLocalMetricsFactory +from metrics.Metrics import Unit +from oslo_log import log as logging +from time import time +LOG = logging.getLogger(__name__) + +''' +This decorator wraps around any method and captures latency around it. If the parameter 'report_error' is set to True +then it also emits metrics on whether the method throws an exception or not +''' +class ReportMetrics(object): + ''' + @:param metric_name This variable declares what is the latency of an afforsaid sub component call. + @:param report_error If this is set to True it adds an error counter to 1 if there is an error and 0 if there are no + error + ''' + def __init__(self, metric_name, report_error = False): + self.__metric_name = metric_name + self.__report_error = report_error + def __call__(self, function): + def metrics_wrapper(*args, **kwargs): + start_time = time() + error = 0 + try: + return function(*args, **kwargs) + except Exception as e: + LOG.error("Exception while executing " + function.__name__) + error = 1 + raise e + finally: + end_time = time() + try: + metrics = ThreadLocalMetrics.get() + metric_time = self.__metric_name + "_time" + metrics.add_time(metric_time, int((end_time - start_time)*1000), Unit.MILLIS) + if self.__report_error == True: + metric_error = self.__metric_name + "_error" + metrics.add_count(metric_error, error) + except AttributeError as e: + LOG.exception("No threadlocal metrics object: %s", e) + + return metrics_wrapper + +class MetricUtil(object): + ''' + Metric Utility class to put and fetch request add_timescoped metrics in cinder api + ''' + METRICS_OBJECT = "metrics_object" + def __init__(self): + ''' + Constructor for Metric Utils. + ''' + + ''' + This method takes in a service log path and program name (like CinderAPI, EC2API layer etc) and initializes + the thread local metrics if it has not already been initiatized before in the same request + ''' + def initialize_thread_local_metrics(self, use_zmq, service_log_path, program_name): + try: + metrics = self.fetch_thread_local_metrics() + except AttributeError: + metrics = ThreadLocalMetricsFactory(use_zmq,service_log_path).with_marketplace_id(self.get_marketplace_id())\ + .with_program_name(program_name).create_metrics() + return metrics + + ''' + This method fetches the current thread local metrics from thread local + ''' + def fetch_thread_local_metrics(self): + return ThreadLocalMetrics.get() + + ''' + This method fetches the market place id + ''' + def get_marketplace_id(self): + # TODO:Get this from the config when we have multiple zones/regions + return "IDC1" + + ''' + Closing the metrics emits the metrics in logs + ''' + def closeMetrics(self, request): + metrics = self.fetch_thread_local_metrics() + metrics.close() + + ''' + Add the following metric name to the timing metrics taking the difference between start_time and end_time + ''' + def report_timing_metric_utc_time(self, metric_name, end_time, start_time): + metrics = self.fetch_thread_local_metrics() + delta = end_time - start_time + seconds = delta.seconds + microseconds = delta.microseconds + milliseconds = int(microseconds/1000 + seconds*1000) + metrics.add_time(metric_name, milliseconds, Unit.MILLIS)