Skip to content

influxdb: Add ability to specify tags#467

Open
BrianJKoopman wants to merge 13 commits into
mainfrom
koopman/influxdb-tagging
Open

influxdb: Add ability to specify tags#467
BrianJKoopman wants to merge 13 commits into
mainfrom
koopman/influxdb-tagging

Conversation

@BrianJKoopman
Copy link
Copy Markdown
Member

@BrianJKoopman BrianJKoopman commented Apr 7, 2026

Description

This PR (built on top of the refactor in #466) adds the ability to specify tags for each field published to InfluxDB by including the new 'influxdb_tags' dict in the message published on an OCS Feed.

The new tagging structure is only applied for agents that publish this new 'influxdb_tags' dict. Messages with this dict will now look like:

    message = {
        'block_name': <Key to identify group of co-sampled data>
        'timestamps': [ctime1, ctime2, ... ]
        'influxdb_tags': {
            'field_name_1': {'tag1': tag1_1, 'tag2': tag1_2, '_field': 'value'},
            'field_name_2': {'tag1': tag2_1, 'tag2': tag2_2, '_field': 'value'}
        }
        'data': {
            'field_name_1': [data1_1, data1_2, ...],
            'field_name_2': [data2_1, data2_2, ...]
        }
    }

Each 'tag' provided for the (ocs) fields will be applied, the data will be published to a measurement set by the 'agent_class' and the (influx) field will be the string provided in '_field'.

With the new tags, this changes the structure of a write from something like:

['observatory.fake-data1,feed=false_temperatures channel_00=0.20307 1775502374078489088',
 'observatory.fake-data1,feed=false_temperatures channel_01=0.35795 1775502374078489088',
 'observatory.fake-data1,feed=false_temperatures channel_00=0.20548 1775502375078489088',
 'observatory.fake-data1,feed=false_temperatures channel_01=0.36313 1775502375078489088']

to:

['FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20307 1775502374078489088',
 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.35795 1775502374078489088',
 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20548 1775502375078489088',
 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.36313 1775502375078489088']

Motivation and Context

Resolves #175.

Some agents in socs have a large number of fields and writing Grafana dashboards to query all of their fields results in a large number of queries to the InfluxDB. Tagging, as performed here, should enable writing more efficient queries.

The old structure in OCS was violating essentially every suggestion for schema and data layout made by InfluxDB. We were encoding data in both the measurements and the field keys. This new structure fixes both of these issues.

How Has This Been Tested?

I have run the agent locally in both 'json' and 'line' protocol modes and verified how the data is presented in InfluxDB through Grafana while running the modified FakeDataAgent.

This results in being able to make queries like this:
image

I also ran an unmodified FakeDataAgent alongside the modified one. While writing to InfluxDB I also wrote to .g3 file via the Aggregator agent.

I'm not 100% sure, so checking me on this would be great in this review, but I don't see the influxdb_tags making it into the .g3 files:

$ spt3g-dump 1775508288.g3
Frame (Housekeeping) [
"description" (spt3g.core.G3String) => "HK data"
"hkagg_type" (spt3g.core.G3Int) => 0
"hkagg_version" (spt3g.core.G3Int) => 2
"session_id" (spt3g.core.G3Int) => 476608725980044724
"start_time" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"hkagg_type" (spt3g.core.G3Int) => 1
"hkagg_version" (spt3g.core.G3Int) => 2
"providers" (spt3g.core.G3VectorFrameObject) => [0x561e443ba4c0]
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.fake-data1.feeds.false_temperatures"
"block_names" (spt3g.core.G3VectorString) => [temps]
"blocks" (spt3g.core.G3VectorFrameObject) => [0x561e443a7820]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 2
"prov_id" (spt3g.core.G3Int) => 1
"provider_session_id" (spt3g.core.G3String) => "1775505864.4119794"
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.fake-data1.feeds.false_temperatures"
"block_names" (spt3g.core.G3VectorString) => [temps]
"blocks" (spt3g.core.G3VectorFrameObject) => [0x561e443dea00]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 2
"prov_id" (spt3g.core.G3Int) => 1
"provider_session_id" (spt3g.core.G3String) => "1775505864.4119794"
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.fake-data1.feeds.false_temperatures"
"block_names" (spt3g.core.G3VectorString) => [temps]
"blocks" (spt3g.core.G3VectorFrameObject) => [0x561e443f4470]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 2
"prov_id" (spt3g.core.G3Int) => 1
"provider_session_id" (spt3g.core.G3String) => "1775505864.4119794"
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]

Trying to look more closely:

$ python
Python 3.11.9 (main, Jul 29 2024, 17:04:41) [GCC 14.1.1 20240720] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from so3g import hk
>>> scanner = hk.HKArchiveScanner()
>>> import glob
>>> files = glob.glob('*.g3')
>>> files
['1775508987.g3', '1775503131.g3', '1775508113.g3', '1775500288.g3', '1775509110.g3', '1775505088.g3', '1775500466.g3', '1775501885.g3', '1775504376.g3', '1775507755.g3', '1775500111.g3', '1775507576.g3', '1775503488.g3', '1775500999.g3', '1775507399.g3', '1775508664.g3', '1775506339.g3', '1775507933.g3', '1775503665.g3', '1775508542.g3', '1775506162.g3', '1775500642.g3', '1775506693.g3', '1775502597.g3', '1775508468.g3', '1775507221.g3', '1775506868.g3', '1775507046.g3', '1775501707.g3', '1775502242.g3', '1775509287.g3', '1775503310.g3', '1775506517.g3', '1775502954.g3', '1775508816.g3', '1775504198.g3', '1775501530.g3', '1775500819.g3', '1775504021.g3', '1775502775.g3', '1775508288.g3', '1775502065.g3', '1775502422.g3', '1775504912.g3', '1775504734.g3', '1775501351.g3', '1775503842.g3', '1775505863.g3', '1775501176.g3', '1775505984.g3', '1775504555.g3']
>>> for f in files:
...     scanner.process_file(f)
...
>>> arc = scanner.finalize()
>>> arc.simple('temperature')
(array([], dtype=float64), array([], dtype=float64))
>>> arc.simple('temperatures')
(array([], dtype=float64), array([], dtype=float64))
>>> arc.simple('channel_00')
(array([1.77550005e+09, 1.77550005e+09, 1.77550005e+09, ...,
       1.77550928e+09, 1.77550928e+09, 1.77550929e+09]), array([0.13146559, 0.12866655, 0.13144499, ..., 0.13128748, 0.1345487 ,
       0.13324801]))
>>> arc.simple('channel_01')
(array([1.77550005e+09, 1.77550005e+09, 1.77550005e+09, ...,
       1.77550928e+09, 1.77550928e+09, 1.77550929e+09]), array([0.37091482, 0.36679523, 0.36781594, ..., 0.08374199, 0.0795275 ,
       0.08244495]))
>>> arc.simple('influxdb_tags')
(array([], dtype=float64), array([], dtype=float64))

So I think the old structure remains in place within .g3.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Checklist:

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.

@BrianJKoopman
Copy link
Copy Markdown
Member Author

BrianJKoopman commented May 1, 2026

In order to make the comparison easier I generated some 2 minute .g3 files with and without tagging: influxdb-tag-vs-no-tag-g3-files.tar.gz

Can someone take a look at these for differences? Maybe @kmharrington or @mhasself?

EDIT: These just have a fake data agent (w/2 channels), the registry, influxdb v1 publisher agent, and the aggregator agent running.

Copy link
Copy Markdown
Member

@mhasself mhasself left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good -- some comments, but I think they are minor (despite how many paragraphs were spent).

I agree that this should have no impact on the G3 data.

Comment thread ocs/ocs_feed.py
Comment thread ocs/ocs_feed.py
Comment thread ocs/ocs_feed.py Outdated
raise Exception("Block structure does not match: {}".format(self.name))

self.timestamps.extend(block['timestamps'])
self.tags = block.get('influxdb_tags')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be required that the latest tags match the previous tags?

I think, as written, someone could do this:

  • publish some data with tags={...} fully populated -- these get cached.
  • publish another point, with tags=None -- this gets cached.
  • If the buffer is now flushed, the result will have tags=None, because that was the most recent thing.

That would pass all checks, but the stored tags (when the block gets flushed) would be None. It's probably not what the user wanted. I think either:

  • Require user to pass the same tags every time they push data to this block OR
  • If tags=None, don't change the stored tags in the block.

At some level, data producers are doomed to always provide tags, so the the first thing makes more sense to me.

Granted, this would require modification to the aggregator agent (so far unchanged), because you'd need to initialize each new Block (in Provider.save_to_block) with influxdb_tags set from the first transmission. That is messier in the present instance but seems logically safer on the whole. I think the best way would be to have a constructor for Block that parses the block data dict (just as Block.append and Block.extend do), and sets all the things from that (including influxdb_tags). This would make Aggregator, in the long run, less fragile vis a vis stuff in the feed that it doesn't care about.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. By the same logic can a block structure change say if you publish data, flush, then publish a modified block structure (i.e. drop a key in d['data'])? I'm wondering if it's enough to have a similar check to the 'Block structure does not match' check that's at the top of append and extend here.

Since your second option didn't require changes to the aggregator, I implemented it in 39eb27f for now. I plan to convert this discussion to an issue, and we can improve the handling in the aggregator separately. Sound good?

Base automatically changed from koopman/influxdb-format-data-refactor to main May 5, 2026 23:45
@BrianJKoopman
Copy link
Copy Markdown
Member Author

Thanks for the review, I'll address your comments soon.

A quick note to myself, I shouldn't have squashed #466. Before merging this I should rebase on main and drop the first three commits.

@BrianJKoopman BrianJKoopman requested a review from mhasself May 12, 2026 00:11
@BrianJKoopman
Copy link
Copy Markdown
Member Author

I think checks are blocked by the conflicts. I'll do that rebase and drop those first three commits once the review is finalized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve tagging in InfluxDB Publisher Agent

2 participants