diff --git a/docs/api.rst b/docs/api.rst index 6676b1db..1fd1f7bc 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -130,6 +130,19 @@ ocs.client_t :undoc-members: :show-inheritance: +ocs.common +---------- + +The ``common/`` directory contains driver code shared among agents. + +ocs.common.influxdb_drivers +``````````````````````````` + +.. automodule:: ocs.common.influxdb_drivers + :members: + :undoc-members: + :show-inheritance: + .. _ocs_agent_api: ocs.ocs_agent diff --git a/docs/developer/feeds.rst b/docs/developer/feeds.rst index ddf4e4ff..1c409c7c 100644 --- a/docs/developer/feeds.rst +++ b/docs/developer/feeds.rst @@ -179,6 +179,36 @@ Attempting to publish an invalid field name should raise an error by the agent. However, if invalid field names somehow make it to the aggregator, the aggregator will attempt to correct them before writing to disk. +Using InfluxDB Tags +''''''''''''''''''' +ocs v0.12.1 introduced improved InfluxDB tagging. This changes the structure +with which the InfluxDB Publisher agents writes to InfluxDB. In order to take +advantage of this improved tagging agents must supply tag information when +publishing data to their feeds. + +This data extends the existing message structure using the 'influxdb_tags' +key:: + + message = { + 'block_name': + 'timestamps': [ctime1, ctime2, ... ] + 'influxdb_tags': { + 'field_name_1': {'tag1': tag1_1, 'tag2': tag1_2, '_field': 'value'}, + 'field_name_2': {'tag1': tag2_1, 'tag2': tag2_2, '_field': 'value'} + } + 'data': { + 'field_name_1': [data1_1, data1_2, ...], + 'field_name_2': [data2_1, data2_2, ...] + } + } + +The field names within the 'data' and 'influxdb_tags' dicts must match, and +there must be a set of tags for every field. Each set of tags must also contain +the special '_field' key, which determine the field name within InfluxDB. In +this example it is just the word 'value', but a more practical example would +be 'temperature', 'resistance', or 'voltage' for a device measuring +temperatures in a cryostat. + Subscribing to a Feed --------------------- diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index 3e247e0d..566dc067 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -151,7 +151,14 @@ def acq(self, session, params): next_timestamp += n_data / self.sample_rate # self.log.info('Sending %i data on %i channels.' % (len(t), len(T))) - session.app.publish_to_feed('false_temperatures', block.encoded()) + tags = {} + for channel in self.channel_names: + _channel_tag = {channel: {'channel': int(channel.split('_')[1]), + '_field': 'temperature'}} + tags.update(_channel_tag) + publish_block = block.encoded() + publish_block.update(influxdb_tags=tags) + session.app.publish_to_feed('false_temperatures', publish_block) # Update session.data data_cache = {"fields": {}, "timestamp": None} diff --git a/ocs/common/influxdb_drivers.py b/ocs/common/influxdb_drivers.py index 864c4ac2..bde8fcfb 100644 --- a/ocs/common/influxdb_drivers.py +++ b/ocs/common/influxdb_drivers.py @@ -7,9 +7,9 @@ def timestamp2influxtime(time, protocol): Args: time: - ctime timestamp + Unix 'ctime' timestamp, i.e. ``1775500953.5108523`` protocol: - 'json' or line' + InfluxDB protocol to format timestamp for. Either 'json' or line'. """ if protocol == 'json': @@ -35,6 +35,23 @@ def _format_field_line(field_key, field_value): return line +@dataclass +class InfluxTags: + """Stores tags to apply to a set of data within an InfluxBlock. + + Examples: + >>> tags = InfluxTags(shared_tags={'feed': 'example_fed'}, + ... field_tags={'key1': 1, '_field': 'value'}) + + + """ + #: Tags to apply to all data points. + shared_tags: dict + + #: Tags to apply per field, along with '_field' value to use. + field_tags: dict = None + + @dataclass class InfluxBlock: """Holds and can convert the data and feed information into a format @@ -54,7 +71,7 @@ class InfluxBlock: measurement: str #: Tags to apply to the measurements. - tags: dict + tags: InfluxTags def _group_data(self): """Takes the block structured data and groups each data point in a set @@ -135,8 +152,23 @@ def _encode_line(self, fields, timestamp): """ # Convert json format tags to line format tag_list = [] - for k, v in self.tags.items(): - tag_list.append(f"{k}={v}") + for k, v in self.tags.shared_tags.items(): + tag_list.append(f'{k}={v}') + + # Add unique field tags to the list and overwrite the field key + if self.tags.field_tags: + field_name = fields.split('=')[0] + tags_to_add = self.tags.field_tags.get(field_name) + for k, v in tags_to_add.items(): + if k == '_field': + continue + tag_list.append(f'{k}={v}') + + # Overwrite field name with _field from tags_to_add (influxdb_tags) + new_field_key = tags_to_add.get('_field') + field_value = fields.split('=')[1] + fields = f'{new_field_key}={field_value}' + tags = ','.join(tag_list) try: @@ -170,6 +202,24 @@ def _encode_json(self, fields, timestamp): 'tags': {'feed': 'false_temperatures'}} """ + # Add unique field tags to the list and overwrite the field key + if self.tags.field_tags: + (field_name, field_value), = fields.items() # Unpack single (k, v) + tags_to_add = self.tags.field_tags.get(field_name) + tags = {} + for k, v in tags_to_add.items(): + if k == '_field': + continue + tags[k] = v + + tags.update(self.tags.shared_tags) + + # Overwrite field name with _field from tags_to_add (influxdb_tags) + new_field_key = tags_to_add.get('_field') + fields = {new_field_key: field_value} + else: + tags = self.tags.shared_tags + try: t_influx = timestamp2influxtime(timestamp, protocol='json') except OverflowError: @@ -181,7 +231,7 @@ def _encode_json(self, fields, timestamp): "measurement": self.measurement, "time": t_influx, "fields": fields, - "tags": self.tags, + "tags": tags, } return json @@ -201,18 +251,42 @@ def encode(self, protocol='line'): """ encoded_list = [] if protocol == 'line': - fields_lines = self._group_fields_lines() - for fields, time_ in zip(fields_lines, self.timestamps): - line = self._encode_line(fields, time_) - if line is not None: - encoded_list.append(line) + # If we don't have unique field tags, group the fields together + if self.tags.field_tags is None: + fields_lines = self._group_fields_lines() + for fields, time_ in zip(fields_lines, self.timestamps): + line = self._encode_line(fields, time_) + if line is not None: + encoded_list.append(line) + + # If we do have field_tags, encode each line separately + else: + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + for (field, value) in fields.items(): + f_line = _format_field_line(field, value) + line = self._encode_line(f_line, time_) + if line is not None: + encoded_list.append(line) elif protocol == 'json': - grouped_data = self._group_data() - for fields, time_ in zip(grouped_data, self.timestamps): - text = self._encode_json(fields, time_) - if text is not None: - encoded_list.append(text) + # If we don't have unique field tags, group the fields together + if self.tags.field_tags is None: + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + text = self._encode_json(fields, time_) + if text is not None: + encoded_list.append(text) + + # If we do have field_tags, encode each line separately + else: + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + for (field, value) in fields.items(): + single_field_dict = {field: value} + text = self._encode_json(single_field_dict, time_) + if text is not None: + encoded_list.append(text) else: print(f"Protocol '{protocol}' not supported.") @@ -243,38 +317,66 @@ def _convert_single_to_group(message): def format_data(data, feed, protocol): """Format the data from an OCS feed for publishing to InfluxDB. - The scheme here is as follows: - - agent_address is the "measurement" (conceptually like an SQL - table) - - feed names are an indexed "tag" on the data structure - (effectively a table column) - - keys within an OCS block's 'data' dictionary are the field names - (effectively a table column) + The scheme used depends on whether 'influxdb_tags' are published to the Feed. + + Without 'influxdb_tags' the measurement consists of the agent address, i.e. + ``address_root.instance_id``, there is a single tag for the feed name, and + each data field from the OCS feed is used directly as the field name in + InfluxDB. This structure, however, is not ideal for InfluxDB query + performance. + + When 'influxdb_tags' are provided by the agent then the measurement becomes + the agent class, and the address root and instance-id are added as tags. + The 'influxdb_tags' are also used to add additional tags and provide a + simple field name. See the examples below. Args: data (dict): - data from the OCS Feed subscription + Data from the OCS Feed subscription. feed (dict): - feed from the OCS Feed subscription, contains feed information - used to structure our influxdb query + Feed from the OCS Feed subscription, contains feed information + used to structure our influxdb query. protocol (str): Protocol for writing data. Either 'line' or 'json'. Returns: list: Data ready to publish to influxdb, in the specified protocol. + Examples: + >>> # without 'influxdb_tags' + >>> format_data(data, feed, protocol='line') + ['observatory.fake-data1,feed=false_temperatures channel_00=0.20307 1775502374078489088', + 'observatory.fake-data1,feed=false_temperatures channel_01=0.35795 1775502374078489088', + 'observatory.fake-data1,feed=false_temperatures channel_00=0.20548 1775502375078489088', + 'observatory.fake-data1,feed=false_temperatures channel_01=0.36313 1775502375078489088'] + + >>> # with 'influxdb_tags' + >>> format_data(data, feed, protocol='line') + ['FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20307 1775502374078489088', + 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.35795 1775502374078489088', + 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20548 1775502375078489088', + 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.36313 1775502375078489088'] + """ # Load data into InfluxBlock objects. blocks = [] for _, bv in data.items(): - tags = {'feed': feed['feed_name']} + shared_tags = {'feed': feed['feed_name']} + measurement = feed.get('agent_address') + if bv.get('influxdb_tags'): + measurement = feed.get('agent_class') + shared_tags['address_root'] = feed['agent_address'].split('.')[0] + shared_tags['instance_id'] = feed['agent_address'].split('.')[1] + tags = InfluxTags( + shared_tags=shared_tags, + field_tags=bv.get('influxdb_tags')) if 'timestamp' in bv: bv = _convert_single_to_group(bv) block = InfluxBlock( block_name=bv['block_name'], data=bv['data'], timestamps=bv['timestamps'], - measurement=feed['agent_address'], + measurement=measurement, tags=tags) blocks.append(block) diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index d127e3af..57f38be8 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -13,6 +13,7 @@ def __init__(self, name, keys): """ self.name = name self.timestamps = [] + self.tags = None self.data = { k: [] for k in keys } @@ -37,6 +38,9 @@ def append(self, d): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.append(d['timestamp']) + tags = d.get('influxdb_tags') + if tags is not None: + self.tags = d['influxdb_tags'] for k in self.data: self.data[k].append(d['data'][k]) @@ -49,6 +53,9 @@ def extend(self, block): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.extend(block['timestamps']) + tags = block.get('influxdb_tags') + if tags is not None: + self.tags = block['influxdb_tags'] for k in self.data: self.data[k].extend(block['data'][k]) @@ -58,6 +65,7 @@ def encoded(self): return { 'block_name': self.name, 'data': {k: self.data[k] for k in self.data.keys()}, + 'influxdb_tags': self.tags, 'timestamps': self.timestamps, } @@ -213,6 +221,9 @@ def publish_message(self, message, timestamp=None): Feed.verify_data_field_string(k) Feed.verify_message_data_type(v) + # check influxdb_tags + Feed.verify_influxdb_tags(message) + # Data is stored in Block objects block_name = message['block_name'] try: @@ -244,6 +255,50 @@ def publish_message(self, message, timestamp=None): self.agent.log.error('Could not publish to Feed. TransportLost. ' + 'crossbar server likely unreachable.') + @staticmethod + def verify_influxdb_tags(message): + """Check the 'influxdb_tags' to make sure all the needed information is + provided. + + This checks to make sure each tag has a '_field' provided, and that + each field has a corresponding tag. + + Note: + 'influxdb_tags' can be ``None``. This will be the case for any + agents that do not implement tagging. + + Args: + message (dict): + 'message' dictionary value published (see Feed.publish_message for details). + + Raises: + ValueError: If the 'influxdb_tags' provided in the message do not + meet the required format. + + """ + tags = message.get('influxdb_tags') + if tags is None: + return + + # check '_field' supplied with each tag + for v in tags.values(): + if '_field' not in v: + error_msg = f"'_field' not supplied with 'influxdb_tags' for tag set {v}" + raise ValueError(error_msg) + + # check that all fields have a corresponding tag + tag_fields = tags.keys() + for k in message['data'].keys(): + if k not in tag_fields: + error_msg = f"'influxdb_tags' does not contain tags for '{k}'" + raise ValueError(error_msg) + + # check for extra tags + for k in tag_fields: + if k not in message['data'].keys(): + error_msg = f"'influxdb_tags' contains extra tag '{k}'" + raise ValueError(error_msg) + @staticmethod def verify_message_data_type(value): """Aggregated Feeds can only store certain types of data. Here we check diff --git a/tests/test_influxdb_publisher.py b/tests/test_influxdb_publisher.py index a62a345d..3a045913 100644 --- a/tests/test_influxdb_publisher.py +++ b/tests/test_influxdb_publisher.py @@ -84,6 +84,55 @@ def test_format_data_json(): assert format_data(data, feed, 'json')[0] == expected +def test_format_data_w_tags(): + """Test passing int, float, string to InfluxDB line protocol, with new tags.""" + + # Not a real feed, but this is all we need for influxdb_drivers.format_data + feed = {'agent_address': 'test_address.test_instance', + 'feed_name': 'test_feed', + 'agent_class': 'TestAgent'} + data = {'test': {'block_name': 'test', + 'timestamps': [1615394417.3590388], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2, '_field': 'value'}, + 'key3': {'key': 3, '_field': 'value'}}, + 'data': {'key1': [1], + 'key2': [2.3], + 'key3': ["test"]}, + } + } + + # test 'line' protocol + expected = ['TestAgent,feed=test_feed,address_root=test_address,instance_id=test_instance,key=1 value=1i 1615394417359038720', + 'TestAgent,feed=test_feed,address_root=test_address,instance_id=test_instance,key=2 value=2.3 1615394417359038720', + 'TestAgent,feed=test_feed,address_root=test_address,instance_id=test_instance,key=3 value="test" 1615394417359038720'] + assert format_data(data, feed, 'line') == expected + + # test 'json' protocol + expected = [{'fields': {'value': 1}, + 'measurement': 'TestAgent', + 'tags': {'feed': 'test_feed', + 'instance_id': 'test_instance', + 'address_root': 'test_address', + 'key': 1}, + 'time': '2021-03-10T16:40:17.359039'}, + {'fields': {'value': 2.3}, + 'measurement': 'TestAgent', + 'tags': {'feed': 'test_feed', + 'instance_id': 'test_instance', + 'address_root': 'test_address', + 'key': 2}, + 'time': '2021-03-10T16:40:17.359039'}, + {'fields': {'value': 'test'}, + 'measurement': 'TestAgent', + 'tags': {'feed': 'test_feed', + 'instance_id': 'test_instance', + 'address_root': 'test_address', + 'key': 3}, + 'time': '2021-03-10T16:40:17.359039'}] + assert format_data(data, feed, 'json') == expected + + def test_format_data_inf_time(): """Test passing unrealistically large time.""" diff --git a/tests/test_ocs_feed.py b/tests/test_ocs_feed.py index e26268e6..1cdec95e 100644 --- a/tests/test_ocs_feed.py +++ b/tests/test_ocs_feed.py @@ -290,6 +290,90 @@ def test_empty_field_name(self): with pytest.raises(ValueError): test_feed.publish_message(test_message) + def test_valid_multi_sample_input_w_tags(self): + """We should be able to pass lists of ints and floats to a feed, with + the new influxdb_tags. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2, '_field': 'value'}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + test_feed.publish_message(test_message) + + def test_valid_multi_sample_input_w_invalid_tags_missing(self): + """Check if we don't supply influxdb_tags for all fields. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + with pytest.raises(ValueError): + test_feed.publish_message(test_message) + + def test_valid_multi_sample_input_w_extra_tags(self): + """Check if we supply extra influxdb_tags. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2, '_field': 'value'}, + 'key3': {'key': 3, '_field': 'value'}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + with pytest.raises(ValueError): + test_feed.publish_message(test_message) + + def test_valid_multi_sample_input_w_invalid_tags_field(self): + """Check if we don't supply an influxdb_tags '_field' value. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + with pytest.raises(ValueError): + test_feed.publish_message(test_message) + # ocs_feed.Block @@ -313,3 +397,66 @@ def test_block_append(): assert test_block.data['key1'][0] == data_samples assert test_block.timestamps[0] == time_samples + + +def test_block_append_influxdb_tags(): + """Test adding some data to a Block with changing influxdb tags. The tags + should not change when no tags are provided in the second append(). + + """ + test_block = ocs_feed.Block('test_block', ['key1']) + + time_samples = [1558044482.2398098, 1558044483.2398098, + 1558044484.2398098] + data_samples = [1, 2, 3] + + tags = {'key1': {'key': 1, '_field': 'value'}}, + data = {'timestamp': time_samples, + 'influxdb_tags': tags, + 'data': {'key1': data_samples}} + test_block.append(data) + + assert test_block.data['key1'][0] == data_samples + assert test_block.timestamps[0] == time_samples + + # Add new samples without tags + time_sample = 1558044485.2398098 + data_sample = 4 + + data = {'timestamp': time_sample, + 'data': {'key1': data_sample}} + test_block.append(data) + + assert test_block.tags == tags + + +def test_block_extend_influxdb_tags(): + """Test extending data in a Block with changing influxdb tags. The tags + should not change when no tags are provided in the second extend(). + + """ + test_block = ocs_feed.Block('test_block', ['key1']) + + time_samples = [1558044482.2398098, 1558044483.2398098, + 1558044484.2398098] + data_samples = [1, 2, 3] + + tags = {'key1': {'key': 1, '_field': 'value'}}, + data = {'timestamps': time_samples, + 'influxdb_tags': tags, + 'data': {'key1': data_samples}} + test_block.extend(data) + + assert test_block.data['key1'] == data_samples + assert test_block.timestamps == time_samples + + # Add new samples without tags + time_samples = [1558044485.2398098, 1558044486.2398098, + 1558044487.2398098] + data_samples = [4, 5, 6] + + data = {'timestamps': time_samples, + 'data': {'key1': data_samples}} + test_block.extend(data) + + assert test_block.tags == tags