diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ba883cfc5..5e98bcc425 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Please refer to the [NEWS](NEWS.md) for a list of changes which have an affect o ### Tests - `intelmq.tests.bots.parsers.openphish.test_parser_commercial`: Replace dummy key with more obvious dummy key, generate raw from input (PR#2665 by Sebastian Wagner, fixes #2663). - `intelmq.tests.bots.experts.gethostbyname.test_expert`: Update IP address (PR#2697 by Sebastian Wagner). +- `intelmq.lib.test`: Support comparing messages with regular expressions, reducing the workload to adapt to frequently changing externally controlled data like AS names and IP addresses (PR#2700 by Sebastian Wagner). ### Tools - `intelmq.lib.bot_debugger`: Optionally read input messages from stdin instead of parameter value (PR#2678 by Sebastian Wager). diff --git a/docs/dev/bot-development.md b/docs/dev/bot-development.md index 9531ab96c1..52ee318c2f 100644 --- a/docs/dev/bot-development.md +++ b/docs/dev/bot-development.md @@ -488,6 +488,14 @@ When calling the file directly, only the tests in this file for the bot will be See the `testing` section about how to run the tests. +#### Regular expression matching + +The comparison message (variable `assertMessageEqual` in the example above) can contain regular expressions using `re.compile`. +See `intelmq/tests/bots/experts/cymru_whois/test_expert.py` for an example. +The pattern is used with `re.search`. +To match a full string, use a pattern in the form of `^begintoend$` using `^` for the start of the line and `$` for the end. + + ### Cache Bots can use a Redis database as cache instance. Use the `intelmq.lib.utils.Cache` class to set this up and/or look at existing bots, like the `cymru_whois` expert how the cache can be used. Bots must set a TTL for all keys that are cached to avoid caches growing endless over time. Bots must use the Redis databases >= 10, but not those already used by other bots. Look at `find intelmq -type f -name '*.py' -exec grep -r 'redis_cache_db' {} +` to see which databases are already used. diff --git a/intelmq/lib/test.py b/intelmq/lib/test.py index f49821eff2..ae55f63d2d 100644 --- a/intelmq/lib/test.py +++ b/intelmq/lib/test.py @@ -587,8 +587,29 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d event_dict['output'] = json.loads(event_dict['output']) if 'output' in expected: expected['output'] = json.loads(expected['output']) + self.assertDictRegexEqual(expected=expected, actual=event_dict) - self.assertDictEqual(expected, event_dict) + def assertDictRegexEqual(self, expected, actual, msg=None): + """ + works the same way as self.assertDictEqual but supports regular expressions + Only works on flat dictinaries (messages) + if the value in the exprected dict is a re.Pattern object, it is matched with re.search + + Examples: + >>> assertDictRegexEqual({'text': 'basic', 'other': re.compile('RIPE')}, + {'text': 'basic', 'other': 'AFRINIC'}) + False + >>> assertDictRegexEqual({'number': re.compile('4')}, + {'number': 140}) + True + """ + self.assertEqual(set(expected.keys()), set(actual.keys()), msg=msg) + for key, exp_val in expected.items(): + act_val = actual[key] + if isinstance(exp_val, re.Pattern): + self.assertRegex(str(act_val), exp_val, msg=msg) + else: + self.assertEqual(exp_val, act_val, msg=msg) def tearDown(self): """ diff --git a/intelmq/tests/bots/experts/cymru_whois/test_expert.py b/intelmq/tests/bots/experts/cymru_whois/test_expert.py index 3f5377a29e..463cb2f5e4 100644 --- a/intelmq/tests/bots/experts/cymru_whois/test_expert.py +++ b/intelmq/tests/bots/experts/cymru_whois/test_expert.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2015 Sebastian Wagner +# SPDX-FileCopyrightText: 2015-2021 CERT.at GmbH, 2015-2017, 2019 agesic.gub.uy, Tomás Lima, 2023 Aaron Kaplan, 2023 CERT.ee, 2024-2026 Institute for Common Good Technology # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -6,6 +6,8 @@ import json import unittest +from re import compile as re_compile, IGNORECASE as re_IGNORECASE + import intelmq.lib.test as test from intelmq.bots.experts.cymru_whois.expert import CymruExpertBot @@ -20,7 +22,7 @@ "source.network": "78.104.0.0/16", "source.allocated": "2007-06-07T00:00:00+00:00", "source.asn": 1853, - "source.as_name": "ACONET ACOnet Backbone, AT", + "source.as_name": re_compile("ACONET", flags=re_IGNORECASE), "time.observation": "2015-01-01T00:00:00+00:00", } EXAMPLE_INPUT6 = {"__type": "Event", @@ -31,7 +33,7 @@ "destination.ip": "2001:500:88:200::8", # iana.org "destination.registry": "ARIN", "destination.allocated": "2010-02-18T00:00:00+00:00", - "destination.as_name": "ICANN-DC, US", + "destination.as_name": re_compile("ICANN", flags=re_IGNORECASE), "destination.geolocation.cc": "US", "time.observation": "2015-01-01T00:00:00+00:00", "destination.asn": 16876, @@ -52,7 +54,7 @@ "source.network": "78.104.0.0/16", "source.allocated": "2007-06-07T00:00:00+00:00", "source.asn": 1853, - "source.as_name": "ACONET ACOnet Backbone, AT", + "source.as_name": re_compile("ACONET", flags=re_IGNORECASE), "time.observation": "2015-01-01T00:00:00+00:00", } UNEXPECTED_UNICODE = {"__type": "Event", diff --git a/intelmq/tests/bots/experts/gethostbyname/test_expert.py b/intelmq/tests/bots/experts/gethostbyname/test_expert.py index bfbe92e922..a983892cff 100644 --- a/intelmq/tests/bots/experts/gethostbyname/test_expert.py +++ b/intelmq/tests/bots/experts/gethostbyname/test_expert.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2016 Sebastian Wagner +# SPDX-FileCopyrightText: 2016-2021 CERT.at GmbH, 2024-2026 Institute for Common Good Technology # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -9,6 +9,8 @@ import unittest +from re import compile as re_compile, IGNORECASE as re_IGNORECASE + import intelmq.lib.test as test from intelmq.bots.experts.gethostbyname.expert import GethostbynameExpertBot @@ -20,8 +22,9 @@ EXAMPLE_OUTPUT = {"__type": "Event", "source.fqdn": "iana.org", "destination.fqdn": "nic.at", - "source.ip": "192.0.43.8", - "destination.ip": "131.130.249.233", + # the addresses change frequently, just test there is anything here. Syntax is checked by harmonization. + "source.ip": re_compile("^[0-9.]+$"), + "destination.ip": re_compile("^[0-9.]+$"), "time.observation": "2015-01-01T00:00:00+00:00" } NONEXISTING_INPUT = {"__type": "Event", @@ -34,7 +37,7 @@ } EXAMPLE_URL_OUTPUT = {"__type": "Event", "source.url": "http://iana.org", - "source.ip": "192.0.43.8", + "source.ip": re_compile("^[0-9.]+$"), } EXISITNG_INPUT = {"__type": "Event", "source.fqdn": "iana.org",