Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions intelmq/bots/experts/domain_suffix/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import requests.exceptions

from intelmq import VAR_STATE_PATH
from intelmq.lib.bot import ExpertBot
from intelmq.lib.exceptions import InvalidArgument
from intelmq.lib.utils import get_bots_settings, create_request_session
Expand All @@ -31,7 +32,7 @@
class DomainSuffixExpertBot(ExpertBot):
"""Extract the domain suffix from a domain and save it in the the domain_suffix field. Requires a local file with valid domain suffixes"""
field: str = None
suffix_file: str = None # TODO: should be pathlib.Path
suffix_file: str = f'{VAR_STATE_PATH}domain_suffix/public_suffix_list.dat' # TODO: should be pathlib.Path
autoupdate_cached_database: bool = True # Activate/deactivate update-database functionality

def init(self):
Expand All @@ -53,10 +54,11 @@ def process(self):

@staticmethod
def check(parameters):
if not os.path.exists(parameters.get('suffix_file', '')):
suffix_file = parameters.get('suffix_file', DomainSuffixExpertBot.suffix_file)
if not os.path.exists(suffix_file):
return [["error", "File given as parameter 'suffix_file' does not exist."]]
try:
with codecs.open(parameters['suffix_file'], encoding='UTF-8') as database:
with codecs.open(suffix_file, encoding='UTF-8') as database:
PublicSuffixList(source=database, only_icann=True)
except Exception as exc:
return [["error", "Error reading database: %r." % exc]]
Expand Down Expand Up @@ -86,7 +88,7 @@ def update_database(cls, verbose=False):
try:
for bot in runtime_conf:
if runtime_conf[bot]["module"] == __name__ and runtime_conf[bot]['parameters'].get('autoupdate_cached_database', True):
bots[bot] = runtime_conf[bot]["parameters"]["suffix_file"]
bots[bot] = runtime_conf[bot]["parameters"].get("suffix_file", cls.suffix_file)

except KeyError as e:
sys.exit(f"Database update failed. Your configuration of {bot} is missing key {e}.")
Expand Down
31 changes: 31 additions & 0 deletions intelmq/tests/bots/experts/domain_suffix/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

# -*- coding: utf-8 -*-
import os.path
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest import mock

import intelmq.lib.test as test
from intelmq.bots.experts.domain_suffix import expert as domain_suffix_expert
from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot


Expand Down Expand Up @@ -93,5 +98,31 @@ def test_wildcard(self):
self.assertMessageEqual(0, WILDCARD_OUTPUT)


class TestDomainSuffixDatabaseUpdate(unittest.TestCase):
def test_update_database_uses_default_suffix_file(self):
with tempfile.TemporaryDirectory() as tmp_dir:
suffix_file = Path(tmp_dir) / "domain_suffix" / "public_suffix_list.dat"
session = SimpleNamespace(get=mock.Mock(return_value=SimpleNamespace(
content=b"// public suffix list\ncom\n",
ok=True,
status_code=200,
url="https://publicsuffix.org/list/public_suffix_list.dat",
)))

with mock.patch.object(DomainSuffixExpertBot, "suffix_file", str(suffix_file)), \
mock.patch.object(domain_suffix_expert, "get_bots_settings", return_value={
"domain-suffix": {
"module": "intelmq.bots.experts.domain_suffix.expert",
"parameters": {},
}
}), \
mock.patch.object(domain_suffix_expert, "create_request_session", return_value=session), \
mock.patch.object(domain_suffix_expert, "IntelMQController") as controller:
DomainSuffixExpertBot.update_database()

self.assertEqual(suffix_file.read_bytes(), b"// public suffix list\ncom\n")
controller.return_value.bot_reload.assert_called_once_with("domain-suffix")


if __name__ == '__main__': # pragma: no cover
unittest.main()