From ed46dd37f7eb952577a0b998b18bdd99d9f7b639 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 Apr 2026 16:41:21 -0400 Subject: [PATCH 01/10] Add influxdb_tags to feed Blocks --- ocs/ocs_feed.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index d127e3af..5bf2998a 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -13,6 +13,7 @@ def __init__(self, name, keys): """ self.name = name self.timestamps = [] + self.tags = None self.data = { k: [] for k in keys } @@ -37,6 +38,7 @@ def append(self, d): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.append(d['timestamp']) + self.tags = d.get('influxdb_tags') for k in self.data: self.data[k].append(d['data'][k]) @@ -49,6 +51,7 @@ def extend(self, block): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.extend(block['timestamps']) + self.tags = block.get('influxdb_tags') for k in self.data: self.data[k].extend(block['data'][k]) @@ -58,6 +61,7 @@ def encoded(self): return { 'block_name': self.name, 'data': {k: self.data[k] for k in self.data.keys()}, + 'influxdb_tags': self.tags, 'timestamps': self.timestamps, } From 578277dba3b85e5c72e0ca997fec351ccf3dbaf7 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 Apr 2026 16:41:41 -0400 Subject: [PATCH 02/10] Add influxdb_tags to FakeDataAgent data --- ocs/agents/fake_data/agent.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index 3e247e0d..566dc067 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -151,7 +151,14 @@ def acq(self, session, params): next_timestamp += n_data / self.sample_rate # self.log.info('Sending %i data on %i channels.' % (len(t), len(T))) - session.app.publish_to_feed('false_temperatures', block.encoded()) + tags = {} + for channel in self.channel_names: + _channel_tag = {channel: {'channel': int(channel.split('_')[1]), + '_field': 'temperature'}} + tags.update(_channel_tag) + publish_block = block.encoded() + publish_block.update(influxdb_tags=tags) + session.app.publish_to_feed('false_temperatures', publish_block) # Update session.data data_cache = {"fields": {}, "timestamp": None} From ac640eecf3eca2681f28fc694f0ed32b261e6e91 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 Apr 2026 16:27:42 -0400 Subject: [PATCH 03/10] Create InfluxTags to manage shared and unique tags --- ocs/common/influxdb_drivers.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/ocs/common/influxdb_drivers.py b/ocs/common/influxdb_drivers.py index 864c4ac2..e1286fe7 100644 --- a/ocs/common/influxdb_drivers.py +++ b/ocs/common/influxdb_drivers.py @@ -35,6 +35,16 @@ def _format_field_line(field_key, field_value): return line +@dataclass +class InfluxTags: + """Stores tags to apply to a set of data within an InfluxBlock.""" + #: Tags to apply to all data points + shared_tags: dict + + #: Tags to apply per field + field_tags: dict = None + + @dataclass class InfluxBlock: """Holds and can convert the data and feed information into a format @@ -54,7 +64,7 @@ class InfluxBlock: measurement: str #: Tags to apply to the measurements. - tags: dict + tags: InfluxTags def _group_data(self): """Takes the block structured data and groups each data point in a set @@ -135,7 +145,7 @@ def _encode_line(self, fields, timestamp): """ # Convert json format tags to line format tag_list = [] - for k, v in self.tags.items(): + for k, v in self.tags.shared_tags.items(): tag_list.append(f"{k}={v}") tags = ','.join(tag_list) @@ -181,7 +191,7 @@ def _encode_json(self, fields, timestamp): "measurement": self.measurement, "time": t_influx, "fields": fields, - "tags": self.tags, + "tags": self.tags.shared_tags, } return json @@ -267,7 +277,8 @@ def format_data(data, feed, protocol): # Load data into InfluxBlock objects. blocks = [] for _, bv in data.items(): - tags = {'feed': feed['feed_name']} + feed_tag = {'feed': feed['feed_name']} + tags = InfluxTags(shared_tags=feed_tag) if 'timestamp' in bv: bv = _convert_single_to_group(bv) block = InfluxBlock( From 25d0022dc3fcc2a3c6fa2ff0f6ee74edc97fb22a Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 3 Apr 2026 17:45:51 -0400 Subject: [PATCH 04/10] Handle new influxdb_tags when formatting data --- ocs/common/influxdb_drivers.py | 83 +++++++++++++++++++++++++++----- tests/test_influxdb_publisher.py | 39 +++++++++++++++ 2 files changed, 109 insertions(+), 13 deletions(-) diff --git a/ocs/common/influxdb_drivers.py b/ocs/common/influxdb_drivers.py index e1286fe7..e78a33cf 100644 --- a/ocs/common/influxdb_drivers.py +++ b/ocs/common/influxdb_drivers.py @@ -146,7 +146,22 @@ def _encode_line(self, fields, timestamp): # Convert json format tags to line format tag_list = [] for k, v in self.tags.shared_tags.items(): - tag_list.append(f"{k}={v}") + tag_list.append(f'{k}={v}') + + # Add unique field tags to the list and overwrite the field key + if self.tags.field_tags: + field_name = fields.split('=')[0] + tags_to_add = self.tags.field_tags.get(field_name) + for k, v in tags_to_add.items(): + if k == '_field': + continue + tag_list.append(f'{k}={v}') + + # Overwrite field name with _field from tags_to_add (influxdb_tags) + new_field_key = tags_to_add.get('_field') + field_value = fields.split('=')[1] + fields = f'{new_field_key}={field_value}' + tags = ','.join(tag_list) try: @@ -180,6 +195,24 @@ def _encode_json(self, fields, timestamp): 'tags': {'feed': 'false_temperatures'}} """ + # Add unique field tags to the list and overwrite the field key + if self.tags.field_tags: + (field_name, field_value), = fields.items() # Unpack single (k, v) + tags_to_add = self.tags.field_tags.get(field_name) + tags = {} + for k, v in tags_to_add.items(): + if k == '_field': + continue + tags[k] = v + + tags.update(self.tags.shared_tags) + + # Overwrite field name with _field from tags_to_add (influxdb_tags) + new_field_key = tags_to_add.get('_field') + fields = {new_field_key: field_value} + else: + tags = self.tags.shared_tags + try: t_influx = timestamp2influxtime(timestamp, protocol='json') except OverflowError: @@ -191,7 +224,7 @@ def _encode_json(self, fields, timestamp): "measurement": self.measurement, "time": t_influx, "fields": fields, - "tags": self.tags.shared_tags, + "tags": tags, } return json @@ -211,18 +244,42 @@ def encode(self, protocol='line'): """ encoded_list = [] if protocol == 'line': - fields_lines = self._group_fields_lines() - for fields, time_ in zip(fields_lines, self.timestamps): - line = self._encode_line(fields, time_) - if line is not None: - encoded_list.append(line) + # If we don't have unique field tags, group the fields together + if self.tags.field_tags is None: + fields_lines = self._group_fields_lines() + for fields, time_ in zip(fields_lines, self.timestamps): + line = self._encode_line(fields, time_) + if line is not None: + encoded_list.append(line) + + # If we do have field_tags, encode each line separately + else: + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + for (field, value) in fields.items(): + f_line = _format_field_line(field, value) + line = self._encode_line(f_line, time_) + if line is not None: + encoded_list.append(line) elif protocol == 'json': - grouped_data = self._group_data() - for fields, time_ in zip(grouped_data, self.timestamps): - text = self._encode_json(fields, time_) - if text is not None: - encoded_list.append(text) + # If we don't have unique field tags, group the fields together + if self.tags.field_tags is None: + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + text = self._encode_json(fields, time_) + if text is not None: + encoded_list.append(text) + + # If we do have field_tags, encode each line separately + else: + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + for (field, value) in fields.items(): + single_field_dict = {field: value} + text = self._encode_json(single_field_dict, time_) + if text is not None: + encoded_list.append(text) else: print(f"Protocol '{protocol}' not supported.") @@ -278,7 +335,7 @@ def format_data(data, feed, protocol): blocks = [] for _, bv in data.items(): feed_tag = {'feed': feed['feed_name']} - tags = InfluxTags(shared_tags=feed_tag) + tags = InfluxTags(shared_tags=feed_tag, field_tags=bv.get('influxdb_tags')) if 'timestamp' in bv: bv = _convert_single_to_group(bv) block = InfluxBlock( diff --git a/tests/test_influxdb_publisher.py b/tests/test_influxdb_publisher.py index a62a345d..ad1f0d55 100644 --- a/tests/test_influxdb_publisher.py +++ b/tests/test_influxdb_publisher.py @@ -84,6 +84,45 @@ def test_format_data_json(): assert format_data(data, feed, 'json')[0] == expected +def test_format_data_w_tags(): + """Test passing int, float, string to InfluxDB line protocol, with new tags.""" + + # Not a real feed, but this is all we need for influxdb_drivers.format_data + feed = {'agent_address': 'test_address', + 'feed_name': 'test_feed'} + data = {'test': {'block_name': 'test', + 'timestamps': [1615394417.3590388], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2, '_field': 'value'}, + 'key3': {'key': 3, '_field': 'value'}}, + 'data': {'key1': [1], + 'key2': [2.3], + 'key3': ["test"]}, + } + } + + # test 'line' protocol + expected = ['test_address,feed=test_feed,key=1 value=1i 1615394417359038720', + 'test_address,feed=test_feed,key=2 value=2.3 1615394417359038720', + 'test_address,feed=test_feed,key=3 value="test" 1615394417359038720'] + assert format_data(data, feed, 'line') == expected + + # test 'json' protocol + expected = [{'fields': {'value': 1}, + 'measurement': 'test_address', + 'tags': {'feed': 'test_feed', 'key': 1}, + 'time': '2021-03-10T16:40:17.359039'}, + {'fields': {'value': 2.3}, + 'measurement': 'test_address', + 'tags': {'feed': 'test_feed', 'key': 2}, + 'time': '2021-03-10T16:40:17.359039'}, + {'fields': {'value': 'test'}, + 'measurement': 'test_address', + 'tags': {'feed': 'test_feed', 'key': 3}, + 'time': '2021-03-10T16:40:17.359039'}] + assert format_data(data, feed, 'json') == expected + + def test_format_data_inf_time(): """Test passing unrealistically large time.""" From fc7414e628e5aa128ca7fafebbdaacedfce7fe98 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 Apr 2026 13:55:02 -0400 Subject: [PATCH 05/10] Decouple data from 'measurement' name --- ocs/common/influxdb_drivers.py | 13 ++++++++++--- tests/test_influxdb_publisher.py | 32 +++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/ocs/common/influxdb_drivers.py b/ocs/common/influxdb_drivers.py index e78a33cf..dd5cd828 100644 --- a/ocs/common/influxdb_drivers.py +++ b/ocs/common/influxdb_drivers.py @@ -334,15 +334,22 @@ def format_data(data, feed, protocol): # Load data into InfluxBlock objects. blocks = [] for _, bv in data.items(): - feed_tag = {'feed': feed['feed_name']} - tags = InfluxTags(shared_tags=feed_tag, field_tags=bv.get('influxdb_tags')) + shared_tags = {'feed': feed['feed_name']} + measurement = feed.get('agent_address') + if bv.get('influxdb_tags'): + measurement = feed.get('agent_class') + shared_tags['address_root'] = feed['agent_address'].split('.')[0] + shared_tags['instance_id'] = feed['agent_address'].split('.')[1] + tags = InfluxTags( + shared_tags=shared_tags, + field_tags=bv.get('influxdb_tags')) if 'timestamp' in bv: bv = _convert_single_to_group(bv) block = InfluxBlock( block_name=bv['block_name'], data=bv['data'], timestamps=bv['timestamps'], - measurement=feed['agent_address'], + measurement=measurement, tags=tags) blocks.append(block) diff --git a/tests/test_influxdb_publisher.py b/tests/test_influxdb_publisher.py index ad1f0d55..3a045913 100644 --- a/tests/test_influxdb_publisher.py +++ b/tests/test_influxdb_publisher.py @@ -88,8 +88,9 @@ def test_format_data_w_tags(): """Test passing int, float, string to InfluxDB line protocol, with new tags.""" # Not a real feed, but this is all we need for influxdb_drivers.format_data - feed = {'agent_address': 'test_address', - 'feed_name': 'test_feed'} + feed = {'agent_address': 'test_address.test_instance', + 'feed_name': 'test_feed', + 'agent_class': 'TestAgent'} data = {'test': {'block_name': 'test', 'timestamps': [1615394417.3590388], 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, @@ -102,23 +103,32 @@ def test_format_data_w_tags(): } # test 'line' protocol - expected = ['test_address,feed=test_feed,key=1 value=1i 1615394417359038720', - 'test_address,feed=test_feed,key=2 value=2.3 1615394417359038720', - 'test_address,feed=test_feed,key=3 value="test" 1615394417359038720'] + expected = ['TestAgent,feed=test_feed,address_root=test_address,instance_id=test_instance,key=1 value=1i 1615394417359038720', + 'TestAgent,feed=test_feed,address_root=test_address,instance_id=test_instance,key=2 value=2.3 1615394417359038720', + 'TestAgent,feed=test_feed,address_root=test_address,instance_id=test_instance,key=3 value="test" 1615394417359038720'] assert format_data(data, feed, 'line') == expected # test 'json' protocol expected = [{'fields': {'value': 1}, - 'measurement': 'test_address', - 'tags': {'feed': 'test_feed', 'key': 1}, + 'measurement': 'TestAgent', + 'tags': {'feed': 'test_feed', + 'instance_id': 'test_instance', + 'address_root': 'test_address', + 'key': 1}, 'time': '2021-03-10T16:40:17.359039'}, {'fields': {'value': 2.3}, - 'measurement': 'test_address', - 'tags': {'feed': 'test_feed', 'key': 2}, + 'measurement': 'TestAgent', + 'tags': {'feed': 'test_feed', + 'instance_id': 'test_instance', + 'address_root': 'test_address', + 'key': 2}, 'time': '2021-03-10T16:40:17.359039'}, {'fields': {'value': 'test'}, - 'measurement': 'test_address', - 'tags': {'feed': 'test_feed', 'key': 3}, + 'measurement': 'TestAgent', + 'tags': {'feed': 'test_feed', + 'instance_id': 'test_instance', + 'address_root': 'test_address', + 'key': 3}, 'time': '2021-03-10T16:40:17.359039'}] assert format_data(data, feed, 'json') == expected From 516dfd532fe96339b2c65e0d05cbd149ac5ecce4 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 Apr 2026 14:06:30 -0400 Subject: [PATCH 06/10] Add checks within Feed on influxdb_tags --- ocs/ocs_feed.py | 37 +++++++++++++++++++++++++ tests/test_ocs_feed.py | 62 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index 5bf2998a..0f6b4fe4 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -217,6 +217,9 @@ def publish_message(self, message, timestamp=None): Feed.verify_data_field_string(k) Feed.verify_message_data_type(v) + # check influxdb_tags + Feed.verify_influxdb_tags(message) + # Data is stored in Block objects block_name = message['block_name'] try: @@ -248,6 +251,40 @@ def publish_message(self, message, timestamp=None): self.agent.log.error('Could not publish to Feed. TransportLost. ' + 'crossbar server likely unreachable.') + @staticmethod + def verify_influxdb_tags(message): + """Check the 'influxdb_tags' to make sure all the needed information is + provided. + + This checks to make sure each tag has a '_field' provided, and that + each field has a corresponding tag. + + Args: + message (dict): + 'message' dictionary value published (see Feed.publish_message for details). + + Raises: + ValueError: If the 'influxdb_tags' provided in the message do not + meet the required format. + + """ + tags = message.get('influxdb_tags') + if tags is None: + return + + # check '_field' supplied with each tag + for v in tags.values(): + if '_field' not in v: + error_msg = f"'_field' not supplied with 'influxdb_tags' for tag set {v}" + raise ValueError(error_msg) + + # check that all fields have a corresponding tag + tag_fields = tags.keys() + for k in message['data'].keys(): + if k not in tag_fields: + error_msg = f"'influxdb_tags' does not contain tags for '{k}'" + raise ValueError(error_msg) + @staticmethod def verify_message_data_type(value): """Aggregated Feeds can only store certain types of data. Here we check diff --git a/tests/test_ocs_feed.py b/tests/test_ocs_feed.py index e26268e6..f58d9779 100644 --- a/tests/test_ocs_feed.py +++ b/tests/test_ocs_feed.py @@ -290,6 +290,68 @@ def test_empty_field_name(self): with pytest.raises(ValueError): test_feed.publish_message(test_message) + def test_valid_multi_sample_input_w_tags(self): + """We should be able to pass lists of ints and floats to a feed, with + the new influxdb_tags. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2, '_field': 'value'}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + test_feed.publish_message(test_message) + + def test_valid_multi_sample_input_w_invalid_tags_missing(self): + """Check if we don't supply influxdb_tags for all fields. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + with pytest.raises(ValueError): + test_feed.publish_message(test_message) + + def test_valid_multi_sample_input_w_invalid_tags_field(self): + """Check if we don't supply an influxdb_tags '_field' value. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + with pytest.raises(ValueError): + test_feed.publish_message(test_message) + # ocs_feed.Block From ac623f4fa4988a0b2767647fa593c97894c2c8f4 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 Apr 2026 15:49:01 -0400 Subject: [PATCH 07/10] Update data access docs with tagging details --- docs/api.rst | 13 ++++++++ docs/developer/feeds.rst | 30 ++++++++++++++++++ ocs/common/influxdb_drivers.py | 57 +++++++++++++++++++++++++--------- 3 files changed, 85 insertions(+), 15 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 6676b1db..1fd1f7bc 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -130,6 +130,19 @@ ocs.client_t :undoc-members: :show-inheritance: +ocs.common +---------- + +The ``common/`` directory contains driver code shared among agents. + +ocs.common.influxdb_drivers +``````````````````````````` + +.. automodule:: ocs.common.influxdb_drivers + :members: + :undoc-members: + :show-inheritance: + .. _ocs_agent_api: ocs.ocs_agent diff --git a/docs/developer/feeds.rst b/docs/developer/feeds.rst index ddf4e4ff..1c409c7c 100644 --- a/docs/developer/feeds.rst +++ b/docs/developer/feeds.rst @@ -179,6 +179,36 @@ Attempting to publish an invalid field name should raise an error by the agent. However, if invalid field names somehow make it to the aggregator, the aggregator will attempt to correct them before writing to disk. +Using InfluxDB Tags +''''''''''''''''''' +ocs v0.12.1 introduced improved InfluxDB tagging. This changes the structure +with which the InfluxDB Publisher agents writes to InfluxDB. In order to take +advantage of this improved tagging agents must supply tag information when +publishing data to their feeds. + +This data extends the existing message structure using the 'influxdb_tags' +key:: + + message = { + 'block_name': + 'timestamps': [ctime1, ctime2, ... ] + 'influxdb_tags': { + 'field_name_1': {'tag1': tag1_1, 'tag2': tag1_2, '_field': 'value'}, + 'field_name_2': {'tag1': tag2_1, 'tag2': tag2_2, '_field': 'value'} + } + 'data': { + 'field_name_1': [data1_1, data1_2, ...], + 'field_name_2': [data2_1, data2_2, ...] + } + } + +The field names within the 'data' and 'influxdb_tags' dicts must match, and +there must be a set of tags for every field. Each set of tags must also contain +the special '_field' key, which determine the field name within InfluxDB. In +this example it is just the word 'value', but a more practical example would +be 'temperature', 'resistance', or 'voltage' for a device measuring +temperatures in a cryostat. + Subscribing to a Feed --------------------- diff --git a/ocs/common/influxdb_drivers.py b/ocs/common/influxdb_drivers.py index dd5cd828..bde8fcfb 100644 --- a/ocs/common/influxdb_drivers.py +++ b/ocs/common/influxdb_drivers.py @@ -7,9 +7,9 @@ def timestamp2influxtime(time, protocol): Args: time: - ctime timestamp + Unix 'ctime' timestamp, i.e. ``1775500953.5108523`` protocol: - 'json' or line' + InfluxDB protocol to format timestamp for. Either 'json' or line'. """ if protocol == 'json': @@ -37,11 +37,18 @@ def _format_field_line(field_key, field_value): @dataclass class InfluxTags: - """Stores tags to apply to a set of data within an InfluxBlock.""" - #: Tags to apply to all data points + """Stores tags to apply to a set of data within an InfluxBlock. + + Examples: + >>> tags = InfluxTags(shared_tags={'feed': 'example_fed'}, + ... field_tags={'key1': 1, '_field': 'value'}) + + + """ + #: Tags to apply to all data points. shared_tags: dict - #: Tags to apply per field + #: Tags to apply per field, along with '_field' value to use. field_tags: dict = None @@ -310,26 +317,46 @@ def _convert_single_to_group(message): def format_data(data, feed, protocol): """Format the data from an OCS feed for publishing to InfluxDB. - The scheme here is as follows: - - agent_address is the "measurement" (conceptually like an SQL - table) - - feed names are an indexed "tag" on the data structure - (effectively a table column) - - keys within an OCS block's 'data' dictionary are the field names - (effectively a table column) + The scheme used depends on whether 'influxdb_tags' are published to the Feed. + + Without 'influxdb_tags' the measurement consists of the agent address, i.e. + ``address_root.instance_id``, there is a single tag for the feed name, and + each data field from the OCS feed is used directly as the field name in + InfluxDB. This structure, however, is not ideal for InfluxDB query + performance. + + When 'influxdb_tags' are provided by the agent then the measurement becomes + the agent class, and the address root and instance-id are added as tags. + The 'influxdb_tags' are also used to add additional tags and provide a + simple field name. See the examples below. Args: data (dict): - data from the OCS Feed subscription + Data from the OCS Feed subscription. feed (dict): - feed from the OCS Feed subscription, contains feed information - used to structure our influxdb query + Feed from the OCS Feed subscription, contains feed information + used to structure our influxdb query. protocol (str): Protocol for writing data. Either 'line' or 'json'. Returns: list: Data ready to publish to influxdb, in the specified protocol. + Examples: + >>> # without 'influxdb_tags' + >>> format_data(data, feed, protocol='line') + ['observatory.fake-data1,feed=false_temperatures channel_00=0.20307 1775502374078489088', + 'observatory.fake-data1,feed=false_temperatures channel_01=0.35795 1775502374078489088', + 'observatory.fake-data1,feed=false_temperatures channel_00=0.20548 1775502375078489088', + 'observatory.fake-data1,feed=false_temperatures channel_01=0.36313 1775502375078489088'] + + >>> # with 'influxdb_tags' + >>> format_data(data, feed, protocol='line') + ['FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20307 1775502374078489088', + 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.35795 1775502374078489088', + 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20548 1775502375078489088', + 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.36313 1775502375078489088'] + """ # Load data into InfluxBlock objects. blocks = [] From eb9ed9cbbb448444d98f94e688fbbb11920494a0 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 11 May 2026 18:19:07 -0400 Subject: [PATCH 08/10] Add note about tags possibly being empty --- ocs/ocs_feed.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index 0f6b4fe4..27fba495 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -259,6 +259,10 @@ def verify_influxdb_tags(message): This checks to make sure each tag has a '_field' provided, and that each field has a corresponding tag. + Note: + 'influxdb_tags' can be ``None``. This will be the case for any + agents that do not implement tagging. + Args: message (dict): 'message' dictionary value published (see Feed.publish_message for details). From 23de6d9411b534246c9018d2f744b072d085b3d4 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 11 May 2026 18:42:49 -0400 Subject: [PATCH 09/10] Check for extra influxdb tags and raise exception --- ocs/ocs_feed.py | 6 ++++++ tests/test_ocs_feed.py | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index 27fba495..b7af2058 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -289,6 +289,12 @@ def verify_influxdb_tags(message): error_msg = f"'influxdb_tags' does not contain tags for '{k}'" raise ValueError(error_msg) + # check for extra tags + for k in tag_fields: + if k not in message['data'].keys(): + error_msg = f"'influxdb_tags' contains extra tag '{k}'" + raise ValueError(error_msg) + @staticmethod def verify_message_data_type(value): """Aggregated Feeds can only store certain types of data. Here we check diff --git a/tests/test_ocs_feed.py b/tests/test_ocs_feed.py index f58d9779..1c65f143 100644 --- a/tests/test_ocs_feed.py +++ b/tests/test_ocs_feed.py @@ -331,6 +331,28 @@ def test_valid_multi_sample_input_w_invalid_tags_missing(self): with pytest.raises(ValueError): test_feed.publish_message(test_message) + def test_valid_multi_sample_input_w_extra_tags(self): + """Check if we supply extra influxdb_tags. + + """ + mock_agent = MagicMock() + test_feed = ocs_feed.Feed(mock_agent, 'test_feed', record=True) + + test_message = { + 'block_name': 'test', + 'timestamps': [time.time(), time.time() + 1], + 'influxdb_tags': {'key1': {'key': 1, '_field': 'value'}, + 'key2': {'key': 2, '_field': 'value'}, + 'key3': {'key': 3, '_field': 'value'}}, + 'data': { + 'key1': [1., 2.], + 'key2': [10, 5] + } + } + + with pytest.raises(ValueError): + test_feed.publish_message(test_message) + def test_valid_multi_sample_input_w_invalid_tags_field(self): """Check if we don't supply an influxdb_tags '_field' value. From 822a05d73bedd2609079c6c14126670378b2ff88 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 11 May 2026 19:42:18 -0400 Subject: [PATCH 10/10] Avoid block change if tags is None --- ocs/ocs_feed.py | 8 ++++-- tests/test_ocs_feed.py | 63 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/ocs/ocs_feed.py b/ocs/ocs_feed.py index b7af2058..57f38be8 100644 --- a/ocs/ocs_feed.py +++ b/ocs/ocs_feed.py @@ -38,7 +38,9 @@ def append(self, d): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.append(d['timestamp']) - self.tags = d.get('influxdb_tags') + tags = d.get('influxdb_tags') + if tags is not None: + self.tags = d['influxdb_tags'] for k in self.data: self.data[k].append(d['data'][k]) @@ -51,7 +53,9 @@ def extend(self, block): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.extend(block['timestamps']) - self.tags = block.get('influxdb_tags') + tags = block.get('influxdb_tags') + if tags is not None: + self.tags = block['influxdb_tags'] for k in self.data: self.data[k].extend(block['data'][k]) diff --git a/tests/test_ocs_feed.py b/tests/test_ocs_feed.py index 1c65f143..1cdec95e 100644 --- a/tests/test_ocs_feed.py +++ b/tests/test_ocs_feed.py @@ -397,3 +397,66 @@ def test_block_append(): assert test_block.data['key1'][0] == data_samples assert test_block.timestamps[0] == time_samples + + +def test_block_append_influxdb_tags(): + """Test adding some data to a Block with changing influxdb tags. The tags + should not change when no tags are provided in the second append(). + + """ + test_block = ocs_feed.Block('test_block', ['key1']) + + time_samples = [1558044482.2398098, 1558044483.2398098, + 1558044484.2398098] + data_samples = [1, 2, 3] + + tags = {'key1': {'key': 1, '_field': 'value'}}, + data = {'timestamp': time_samples, + 'influxdb_tags': tags, + 'data': {'key1': data_samples}} + test_block.append(data) + + assert test_block.data['key1'][0] == data_samples + assert test_block.timestamps[0] == time_samples + + # Add new samples without tags + time_sample = 1558044485.2398098 + data_sample = 4 + + data = {'timestamp': time_sample, + 'data': {'key1': data_sample}} + test_block.append(data) + + assert test_block.tags == tags + + +def test_block_extend_influxdb_tags(): + """Test extending data in a Block with changing influxdb tags. The tags + should not change when no tags are provided in the second extend(). + + """ + test_block = ocs_feed.Block('test_block', ['key1']) + + time_samples = [1558044482.2398098, 1558044483.2398098, + 1558044484.2398098] + data_samples = [1, 2, 3] + + tags = {'key1': {'key': 1, '_field': 'value'}}, + data = {'timestamps': time_samples, + 'influxdb_tags': tags, + 'data': {'key1': data_samples}} + test_block.extend(data) + + assert test_block.data['key1'] == data_samples + assert test_block.timestamps == time_samples + + # Add new samples without tags + time_samples = [1558044485.2398098, 1558044486.2398098, + 1558044487.2398098] + data_samples = [4, 5, 6] + + data = {'timestamps': time_samples, + 'data': {'key1': data_samples}} + test_block.extend(data) + + assert test_block.tags == tags