Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Monitoring_with_zmq
16 changes: 15 additions & 1 deletion metrics/AbstractMetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ def _flush_metrics(self):
def close(self):
self.__end_time = time()
self.__time_in_millis = int((self.__end_time - self.__start_time)*1000)
self.__time = self.__end_time - self.__start_time
self.__block_mutates = True
self._flush_metrics()

def __properties__(self):
properties = self.__properties
properties.update(self.__dates)
properties.update(self.__timing)
properties.update(self.__metrics)
time = {'time_to_execute':self.__time}
properties.update(time)
return properties

def __str__(self):
display_string = []
Expand All @@ -99,6 +108,7 @@ def __str__(self):
''' display all the counters items'''

display_string.append(AbstractMetrics.__EOE)

''' display end of metrics'''
return ''.join(display_string)

Expand Down Expand Up @@ -148,8 +158,12 @@ def __isTimingMetric(self, unit):
class AbstractMetricsFactory(MetricsFactory):
''' An abstract factory which creates some basic attributes of a metric object
'''

# TODO: This classes should not be instantiable.
# To be removed... But we need to solve this problem first.
# metaclass conflict: the metaclass of a derived class must be a (non-strict)
# subclass of the metaclasses of all its bases
__metaclass__ = ABCMeta

'''This prevents instantiation of the class since it is an interface. This
class however cannot be instantiated as it is an abstract class
'''
Expand Down
4 changes: 4 additions & 0 deletions metrics/Metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def close(self):
class MetricsFactory(object):
""" This is an interface factory method to create an instance of Metrics class
"""
# TODO: This classes should not be instantiable.
# But we need to solve this problem first.
# metaclass conflict: the metaclass of a derived class must be a (non-strict)
# subclass of the metaclasses of all its bases
__metaclass__ = ABCMeta
"""This prevents instantiation of the class since it is an interface
"""
Expand Down
96 changes: 71 additions & 25 deletions metrics/ThreadLocalMetrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'''
Created on Jan 27, 2016

@author: souvik
'''

Expand All @@ -9,34 +8,24 @@
from abc import ABCMeta
from AbstractMetrics import AbstractMetrics, AbstractMetricsFactory
import threading

def create_timed_rotating_log(path):
''' This method describes the logging type of service logs
'''
logger = logging.getLogger("service.log")
# logger.propagate = False
# Uncomment this after thorough validation in Production, that all metrics are in service
# logs. This will remove the service logs from cinder API.
handler = logging.handlers.WatchedFileHandler(path)
# Cinder itself uses watched file handler. LogRotation is handled externally using logrotate.d
logger.addHandler(handler)
return logger

# TODO: This needs to be configurable and passed through in the class.
logger = create_timed_rotating_log("/var/log/cinder/service.log")
import zmq
import json
import configparser

class ThreadLocalMetrics(AbstractMetrics):
''' Number of files to retain in the log folder
'''

__threadLocal = threading.local()

def __init__(self):
def __init__(self, logger, use_zmq):
'''
Constructor
'''
super(ThreadLocalMetrics, self).__init__()
ThreadLocalMetrics.__threadLocal.metrics = self
self.__logger = logger
self.use_zmq = use_zmq

@staticmethod
def get():
Expand All @@ -48,7 +37,12 @@ def _initialize_metrics(self):
def _flush_metrics(self):
''' This just prints out the Metric object
'''
logger.info(self.__str__())
if self.use_zmq != True:
self.__logger.info(self.__str__())
elif self.use_zmq==True:
properties = self.__properties__()
properties = json.dumps(properties)
self.__logger.send_json(properties)

def __str(self):
return super(self).__str__()
Expand All @@ -58,18 +52,70 @@ def close(self):
ThreadLocalMetrics.__threadLocal.__dict__.clear()



class Singleton(type):
def __init__(cls, name, bases, dic):
super(Singleton, cls).__init__(name, bases, dic)
cls.instance = None

def __call__(cls, *args, **kwargs):
if cls.instance is None:
cls.instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls.instance

class TLMFMeta(ABCMeta, Singleton):
pass

class ThreadLocalMetricsFactory(AbstractMetricsFactory):
''' Factory method to create Thread Local Metrics
Example Usage:
metricsFactory = ThreadLocalMetricsFactory("/tmp/service_log").with_account_id("xxxxxxxxxxxxxxx").with_marketplace_id("IDC1").with_program_name("CinderAPI").with_operation_name("CreateVolume");
metricsFactory = ThreadLocalMetricsFactory("/tmp/service_log").with_marketplace_id("IDC1").with_program_name("CinderAPI")
metrics = metricsFactory.create_metrics();
'''

def __init__(self, service_log_path):
super(ThreadLocalMetricsFactory, self).__init__()

__metaclass__ = TLMFMeta

def __init__(self, use_zmq, service_log_path, propagate_to_application_logs = True):
super(ThreadLocalMetricsFactory, self).__init__()
self.use_zmq = use_zmq
if use_zmq==True:
self.__logger = self.connect_zmq()
else:
# This is done so that metrics flowing in cinder do not break. To be removed soon.
service_log_path = service_log_path.replace("service_log", "service.log")
self.__logger = self.create_timed_rotating_log(service_log_path, propagate_to_application_logs)
'''
This method creates a thread local metrics
'''
def create_metrics(self):

metrics = ThreadLocalMetrics()
metrics = ThreadLocalMetrics(self.__logger,self.use_zmq)
self._add_metric_attributes(metrics)
return metrics

def connect_zmq(self):
path = '/home/zmq.conf'
config = configparser.ConfigParser()
config.read(path)
if 'zmq' in config:
port = config['zmq']['port']
add = config['zmq']['add']
protocol = config['zmq']['protocol']
zmqPath = str(protocol)+"://"+str(add)+":"+str(port)
context = zmq.Context()
self.zmq_socket = context.socket(zmq.PUSH)
self.zmq_socket.connect(zmqPath)
return self.zmq_socket

#The default path is given to ensure backward compatibility to cinder. To be removed soon.
def create_timed_rotating_log(self, path = "/var/log/cinder/service.log", propagate_to_application_logs = True):
''' This method describes the logging type of service logs
'''
logger = logging.getLogger("service.log")
logger.propagate = propagate_to_application_logs
# Uncomment this after thorough validation in Production, that all metrics are in service
# logs. This will remove the service logs from cinder API.

handler = logging.handlers.WatchedFileHandler(path)

# Cinder itself uses watched file handler. LogRotation is handled externally using logrotate.d
logger.addHandler(handler)
return logger
100 changes: 100 additions & 0 deletions metrics/metric_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
'''
Created on Jan 29, 2016

@author: souvik
'''
from metrics.ThreadLocalMetrics import ThreadLocalMetrics, ThreadLocalMetricsFactory
from metrics.Metrics import Unit
from oslo_log import log as logging
from time import time
LOG = logging.getLogger(__name__)

'''
This decorator wraps around any method and captures latency around it. If the parameter 'report_error' is set to True
then it also emits metrics on whether the method throws an exception or not
'''
class ReportMetrics(object):
'''
@:param metric_name This variable declares what is the latency of an afforsaid sub component call.
@:param report_error If this is set to True it adds an error counter to 1 if there is an error and 0 if there are no
error
'''
def __init__(self, metric_name, report_error = False):
self.__metric_name = metric_name
self.__report_error = report_error
def __call__(self, function):
def metrics_wrapper(*args, **kwargs):
start_time = time()
error = 0
try:
return function(*args, **kwargs)
except Exception as e:
LOG.error("Exception while executing " + function.__name__)
error = 1
raise e
finally:
end_time = time()
try:
metrics = ThreadLocalMetrics.get()
metric_time = self.__metric_name + "_time"
metrics.add_time(metric_time, int((end_time - start_time)*1000), Unit.MILLIS)
if self.__report_error == True:
metric_error = self.__metric_name + "_error"
metrics.add_count(metric_error, error)
except AttributeError as e:
LOG.exception("No threadlocal metrics object: %s", e)

return metrics_wrapper

class MetricUtil(object):
'''
Metric Utility class to put and fetch request add_timescoped metrics in cinder api
'''
METRICS_OBJECT = "metrics_object"
def __init__(self):
'''
Constructor for Metric Utils.
'''

'''
This method takes in a service log path and program name (like CinderAPI, EC2API layer etc) and initializes
the thread local metrics if it has not already been initiatized before in the same request
'''
def initialize_thread_local_metrics(self, use_zmq, service_log_path, program_name):
try:
metrics = self.fetch_thread_local_metrics()
except AttributeError:
metrics = ThreadLocalMetricsFactory(use_zmq,service_log_path).with_marketplace_id(self.get_marketplace_id())\
.with_program_name(program_name).create_metrics()
return metrics

'''
This method fetches the current thread local metrics from thread local
'''
def fetch_thread_local_metrics(self):
return ThreadLocalMetrics.get()

'''
This method fetches the market place id
'''
def get_marketplace_id(self):
# TODO:Get this from the config when we have multiple zones/regions
return "IDC1"

'''
Closing the metrics emits the metrics in logs
'''
def closeMetrics(self, request):
metrics = self.fetch_thread_local_metrics()
metrics.close()

'''
Add the following metric name to the timing metrics taking the difference between start_time and end_time
'''
def report_timing_metric_utc_time(self, metric_name, end_time, start_time):
metrics = self.fetch_thread_local_metrics()
delta = end_time - start_time
seconds = delta.seconds
microseconds = delta.microseconds
milliseconds = int(microseconds/1000 + seconds*1000)
metrics.add_time(metric_name, milliseconds, Unit.MILLIS)