Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Please refer to the [NEWS](NEWS.md) for a list of changes which have an affect o
### Tests
- `intelmq.tests.bots.parsers.openphish.test_parser_commercial`: Replace dummy key with more obvious dummy key, generate raw from input (PR#2665 by Sebastian Wagner, fixes #2663).
- `intelmq.tests.bots.experts.gethostbyname.test_expert`: Update IP address (PR#2697 by Sebastian Wagner).
- `intelmq.lib.test`: Support comparing messages with regular expressions, reducing the workload to adapt to frequently changing externally controlled data like AS names and IP addresses (PR#2700 by Sebastian Wagner).

### Tools
- `intelmq.lib.bot_debugger`: Optionally read input messages from stdin instead of parameter value (PR#2678 by Sebastian Wager).
Expand Down
8 changes: 8 additions & 0 deletions docs/dev/bot-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,14 @@ When calling the file directly, only the tests in this file for the bot will be

See the `testing` section about how to run the tests.

#### Regular expression matching

The comparison message (variable `assertMessageEqual` in the example above) can contain regular expressions using `re.compile`.
See `intelmq/tests/bots/experts/cymru_whois/test_expert.py` for an example.
The pattern is used with `re.search`.
To match a full string, use a pattern in the form of `^begintoend$` using `^` for the start of the line and `$` for the end.


### Cache

Bots can use a Redis database as cache instance. Use the `intelmq.lib.utils.Cache` class to set this up and/or look at existing bots, like the `cymru_whois` expert how the cache can be used. Bots must set a TTL for all keys that are cached to avoid caches growing endless over time. Bots must use the Redis databases >= 10, but not those already used by other bots. Look at `find intelmq -type f -name '*.py' -exec grep -r 'redis_cache_db' {} +` to see which databases are already used.
Expand Down
23 changes: 22 additions & 1 deletion intelmq/lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,29 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d
event_dict['output'] = json.loads(event_dict['output'])
if 'output' in expected:
expected['output'] = json.loads(expected['output'])
self.assertDictRegexEqual(expected=expected, actual=event_dict)

self.assertDictEqual(expected, event_dict)
def assertDictRegexEqual(self, expected, actual, msg=None):
"""
works the same way as self.assertDictEqual but supports regular expressions
Only works on flat dictinaries (messages)
if the value in the exprected dict is a re.Pattern object, it is matched with re.search

Examples:
>>> assertDictRegexEqual({'text': 'basic', 'other': re.compile('RIPE')},
{'text': 'basic', 'other': 'AFRINIC'})
False
>>> assertDictRegexEqual({'number': re.compile('4')},
{'number': 140})
True
"""
self.assertEqual(set(expected.keys()), set(actual.keys()), msg=msg)
for key, exp_val in expected.items():
act_val = actual[key]
if isinstance(exp_val, re.Pattern):
self.assertRegex(str(act_val), exp_val, msg=msg)
else:
self.assertEqual(exp_val, act_val, msg=msg)

def tearDown(self):
"""
Expand Down
10 changes: 6 additions & 4 deletions intelmq/tests/bots/experts/cymru_whois/test_expert.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# SPDX-FileCopyrightText: 2015 Sebastian Wagner
# SPDX-FileCopyrightText: 2015-2021 CERT.at GmbH, 2015-2017, 2019 agesic.gub.uy, Tomás Lima, 2023 Aaron Kaplan, 2023 CERT.ee, 2024-2026 Institute for Common Good Technology
#
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import json
import unittest

from re import compile as re_compile, IGNORECASE as re_IGNORECASE
Comment thread
e3rd marked this conversation as resolved.

import intelmq.lib.test as test
from intelmq.bots.experts.cymru_whois.expert import CymruExpertBot

Expand All @@ -20,7 +22,7 @@
"source.network": "78.104.0.0/16",
"source.allocated": "2007-06-07T00:00:00+00:00",
"source.asn": 1853,
"source.as_name": "ACONET ACOnet Backbone, AT",
"source.as_name": re_compile("ACONET", flags=re_IGNORECASE),
"time.observation": "2015-01-01T00:00:00+00:00",
}
EXAMPLE_INPUT6 = {"__type": "Event",
Expand All @@ -31,7 +33,7 @@
"destination.ip": "2001:500:88:200::8", # iana.org
"destination.registry": "ARIN",
"destination.allocated": "2010-02-18T00:00:00+00:00",
"destination.as_name": "ICANN-DC, US",
"destination.as_name": re_compile("ICANN", flags=re_IGNORECASE),
"destination.geolocation.cc": "US",
"time.observation": "2015-01-01T00:00:00+00:00",
"destination.asn": 16876,
Expand All @@ -52,7 +54,7 @@
"source.network": "78.104.0.0/16",
"source.allocated": "2007-06-07T00:00:00+00:00",
"source.asn": 1853,
"source.as_name": "ACONET ACOnet Backbone, AT",
"source.as_name": re_compile("ACONET", flags=re_IGNORECASE),
"time.observation": "2015-01-01T00:00:00+00:00",
}
UNEXPECTED_UNICODE = {"__type": "Event",
Expand Down
11 changes: 7 additions & 4 deletions intelmq/tests/bots/experts/gethostbyname/test_expert.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2016 Sebastian Wagner
# SPDX-FileCopyrightText: 2016-2021 CERT.at GmbH, 2024-2026 Institute for Common Good Technology
#
# SPDX-License-Identifier: AGPL-3.0-or-later

Expand All @@ -9,6 +9,8 @@

import unittest

from re import compile as re_compile, IGNORECASE as re_IGNORECASE

import intelmq.lib.test as test
from intelmq.bots.experts.gethostbyname.expert import GethostbynameExpertBot

Expand All @@ -20,8 +22,9 @@
EXAMPLE_OUTPUT = {"__type": "Event",
"source.fqdn": "iana.org",
"destination.fqdn": "nic.at",
"source.ip": "192.0.43.8",
"destination.ip": "131.130.249.233",
# the addresses change frequently, just test there is anything here. Syntax is checked by harmonization.
"source.ip": re_compile("^[0-9.]+$"),
"destination.ip": re_compile("^[0-9.]+$"),
"time.observation": "2015-01-01T00:00:00+00:00"
}
NONEXISTING_INPUT = {"__type": "Event",
Expand All @@ -34,7 +37,7 @@
}
EXAMPLE_URL_OUTPUT = {"__type": "Event",
"source.url": "http://iana.org",
"source.ip": "192.0.43.8",
"source.ip": re_compile("^[0-9.]+$"),
}
EXISITNG_INPUT = {"__type": "Event",
"source.fqdn": "iana.org",
Expand Down
Loading