diff --git a/documentation/README.md b/documentation/README.md index 4b981907..86b52369 100644 --- a/documentation/README.md +++ b/documentation/README.md @@ -1347,15 +1347,16 @@ IPQualityScore MISP Expansion Module for IP reputation, Email Validation, Phone [[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/ipqs_fraud_and_risk_scoring.py)] - **features**: ->This Module takes the IP Address, Domain, URL, Email and Phone Number MISP Attributes as input to query the IPQualityScore API. +> This Module takes the IP Address, Domain, URL, Email, Phone Number, Username MISP Attributes as input to query the IPQualityScore API. > The results of the IPQualityScore API are than returned as IPQS Fraud and Risk Scoring Object. > The object contains a copy of the enriched attribute with added tags presenting the verdict based on fraud score,risk score and other attributes from IPQualityScore. +> This module takes an attachment or malware-sample attribute as input to query the IPQS Malware Scanner API. - **config**: >apikey - **input**: ->A MISP attribute of type IP Address(ip-src, ip-dst), Domain(hostname, domain), URL(url, uri), Email Address(email, email-src, email-dst, target-email, whois-registrant-email) and Phone Number(phone-number, whois-registrant-phone). +>A MISP attribute of type IP Address(ip-src, ip-dst), Domain(hostname, domain), URL(url, uri), Email Address(email, email-src, email-dst, target-email, whois-registrant-email), Phone Number(phone-number, whois-registrant-phone), File(attachment, malware-sample), Username(first-name, last-name, middle-name,github-username) and Password(text). - **output**: >IPQualityScore object, resulting from the query on the IPQualityScore API. diff --git a/misp_modules/modules/expansion/ipqs_fraud_and_risk_scoring.py b/misp_modules/modules/expansion/ipqs_fraud_and_risk_scoring.py index 7d2449ec..b5e5d63d 100644 --- a/misp_modules/modules/expansion/ipqs_fraud_and_risk_scoring.py +++ b/misp_modules/modules/expansion/ipqs_fraud_and_risk_scoring.py @@ -1,9 +1,23 @@ +"""IPQualityScore expansion module for MISP. + +Provides IPQualityScore lookup and enrichment for IPs, URLs, emails, phones, +usernames, passwords, and files. Implements RequestHandler and +IPQualityScoreParser to call the IPQualityScore API and build MISP objects. +""" + +# pylint: disable=too-many-lines + +import base64 +import io import json import logging +import time +import zipfile import requests from pymisp import Distribution, MISPAttribute, MISPEvent, MISPObject, MISPTag -from requests.exceptions import ConnectTimeout, HTTPError, InvalidURL, ProxyError +from requests.exceptions import (ConnectTimeout, HTTPError, InvalidURL, + ProxyError) from . import check_input_attribute, standard_error_message @@ -17,83 +31,158 @@ "whois-registrant-email", ] phone_query_input_type = ["phone-number", "whois-registrant-phone"] +username_query_input_type = [ + "first-name", + "last-name", + "middle-name", + "github-username" + ] +password_query_input_type = ["text"] +file_query_input_type = ["attachment", "malware-sample"] misperrors = {"error": "Error"} mispattributes = { - "input": ip_query_input_type + url_query_input_type + email_query_input_type + phone_query_input_type, + "input": ( + ip_query_input_type + + url_query_input_type + + email_query_input_type + + username_query_input_type + + password_query_input_type + + phone_query_input_type + + file_query_input_type + ), "format": "misp_standard", } moduleinfo = { "version": "0.1", "author": "David Mackler", "description": ( - "IPQualityScore MISP Expansion Module for IP reputation, Email Validation, Phone Number Validation, Malicious" - " Domain and Malicious URL Scanner." + "IPQualityScore MISP Expansion Module for IP reputation,\ + Email Validation, Phone Number Validation," + " Malicious Domain,Malicious URL Scanner,Malicious File Scanner " + "& Compromised username, Password, Email " ), "module-type": ["expansion", "hover"], "name": "IPQualityScore Lookup", "logo": "ipqualityscore.png", "requirements": ["A IPQualityScore API Key."], "features": ( - "This Module takes the IP Address, Domain, URL, Email and Phone Number MISP Attributes as input to query the" - " IPQualityScore API.\n The results of the IPQualityScore API are than returned as IPQS Fraud and Risk Scoring" - " Object. \n The object contains a copy of the enriched attribute with added tags presenting the verdict based" - " on fraud score,risk score and other attributes from IPQualityScore." + """This Module takes the IP Address, Domain, URL, Email, Phone Number, + Username,Password, and MISP Attributes as input to query the + IPQualityScore API. + The results of the IPQualityScore API are then returned as an + IPQS Fraud, Risk,and Exposure Scoring Object. + The object contains a copy of the enriched attribute with added tags + presenting the verdict based on fraud score, risk score, + darkweb exposure status, and other attributes from IPQualityScore. + This module also contains IPQS Darkweb Leak, IPQS Malware + File Scanner API's + IPQS Darkweb Leak - Monitor Dark Web Activity & + Compromised User Accounts, + IPQS Malware File Scanner - Detect malicious files. + """ ), "references": ["https://www.ipqualityscore.com/"], "input": ( - "A MISP attribute of type IP Address(ip-src, ip-dst), Domain(hostname, domain), URL(url, uri), Email" - " Address(email, email-src, email-dst, target-email, whois-registrant-email) and Phone Number(phone-number," - " whois-registrant-phone)." + "A MISP attribute of type IP Address(ip-src, ip-dst)," + "Domain(hostname, domain), URL(url, uri)," + " Email Address(email, email-src, email-dst, target-email," + " whois-registrant-email) and Phone" + " Number(phone-number, whois-registrant-phone)," + " Username(first-name, last-name, middle-name," + " github-username), Password(text)." ), - "output": "IPQualityScore object, resulting from the query on the IPQualityScore API.", + "output": "IPQualityScore object, resulting from the query " + "on the IPQualityScore API.", } -moduleconfig = ["apikey"] +moduleconfig = ["apikey", "base_url", "poll_delay"] logger = logging.getLogger("ipqualityscore") logger.setLevel(logging.DEBUG) -BASE_URL = "https://ipqualityscore.com/api/json" DEFAULT_DISTRIBUTION_SETTING = Distribution.your_organisation_only.value IP_ENRICH = "ip" URL_ENRICH = "url" EMAIL_ENRICH = "email" PHONE_ENRICH = "phone" +USERNAME_ENRICH = "username" +PASSWORD_ENRICH = "password" class RequestHandler: """A class for handling any outbound requests from this module.""" - def __init__(self, apikey): + def __init__(self, apikey, base_url): self.session = requests.Session() self.api_key = apikey + self.base_url = base_url - def get(self, url: str, headers: dict = None, params: dict = None) -> requests.Response: + def get( + self, + url: str, + headers: dict = None, + params: dict = None + ) -> requests.Response: """General get method to fetch the response from IPQualityScore.""" try: - response = self.session.get(url, headers=headers, params=params).json() + response = self.session.get(url, + headers=headers, + params=params).json() if str(response["success"]) != "True": msg = response["message"] - logger.error(f"Error: {msg}") + logger.error("Error: %s", {msg}) misperrors["error"] = msg + raise else: return response except (ConnectTimeout, ProxyError, InvalidURL) as error: msg = "Error connecting with the IPQualityScore." - logger.error(f"{msg} Error: {error}") + logger.error("%s Error: %s", msg, error) misperrors["error"] = msg def ipqs_lookup(self, reputation_type: str, ioc: str) -> requests.Response: """Do a lookup call.""" - url = f"{BASE_URL}/{reputation_type}" + url = f"{self.base_url.rstrip('/')}/{reputation_type}" payload = {reputation_type: ioc} headers = {"IPQS-KEY": self.api_key} try: response = self.get(url, headers, payload) except HTTPError as error: - msg = f"Error when requesting data from IPQualityScore. {error.response}: {error.response.reason}" + response = error.response + reason = getattr(response, "reason", "Unknown reason") + msg = ( + "Error when requesting data from IPQualityScore. " + f"{response}: {reason}" + ) logger.error(msg) misperrors["error"] = msg raise + if response is None: + return {} + return response + + def ipqs_darkweb_lookup( + self, + reputation_type: str, + ioc: str + ) -> requests.Response: + """Do a lookup call for darkweb.""" + url = f"{self.base_url.rstrip('/')}/leaked/{reputation_type}" + payload = {reputation_type: ioc} + headers = {"IPQS-KEY": self.api_key} + try: + response = self.get(url, headers, payload) + except HTTPError as error: + response = error.response + reason = getattr(response, "reason", "Unknown reason") + msg = ( + "Error when requesting data from IPQualityScore. " + f"{response}: {reason}" + ) + logger.error(msg) + misperrors["error"] = msg + raise + if response is None: + return {} return response @@ -135,7 +224,10 @@ def __init__(self, attribute): self.ipqs_object.template_id = "1" self.ipqs_object.description = "IPQS Fraud and Risk Scoring Data" setattr(self.ipqs_object, "meta-category", "network") - description = "An object containing the enriched attribute and related entities from IPQualityScore." + description = ( + "An object containing the enriched attribute and " + "related entities from IPQualityScore." + ) self.ipqs_object.from_dict( **{ "meta-category": "misc", @@ -155,6 +247,81 @@ def __init__(self, attribute): } ) self.ipqs_object.distribution = DEFAULT_DISTRIBUTION_SETTING + + self.username_data_items = [ + "success", + "message", + "request_hash", + "source", + "exposed", + "first_seen.human", + "first_seen.timestamp", + "first_seen.iso", + "request_id", + ] + + self.username_data_items_friendly_names = { + "success": "IPQS Darkweb Leak: Success", + "message": "IPQS Darkweb Leak: Message", + "request_hash": "IPQS Darkweb Leak: Request Hash", + "source": "IPQS Darkweb Leak: Source", + "exposed": "IPQS Darkweb Leak: Exposed", + "first_seen.human": "IPQS Darkweb Leak: First Seen Human", + "first_seen.timestamp": "IPQS Darkweb Leak: First Seen Timestamp", + "first_seen.iso": "IPQS Darkweb Leak: First Seen ISO", + "request_id": "IPQS Darkweb Leak: Request ID", + } + + self.password_data_items = [ + "success", + "message", + "request_hash", + "source", + "exposed", + "first_seen.human", + "first_seen.timestamp", + "first_seen.iso", + "request_id", + ] + + self.password_data_items_friendly_names = { + "success": "IPQS Darkweb Leak: Success", + "message": "IPQS Darkweb Leak: Message", + "request_hash": "IPQS Darkweb Leak: Request Hash", + "source": "IPQS Darkweb Leak: Source", + "exposed": "IPQS Darkweb Leak: Exposed", + "first_seen.human": "IPQS Darkweb Leak: First Seen Human", + "first_seen.timestamp": "IPQS Darkweb Leak: First Seen Timestamp", + "first_seen.iso": "IPQS Darkweb Leak: First Seen ISO", + "request_id": "IPQS Darkweb Leak: Request ID", + } + + self.leaked_email_data_items = [ + "success", + "message", + "request_hash", + "source", + "exposed", + "first_seen.human", + "first_seen.timestamp", + "first_seen.iso", + "plain_text_password", + "request_id", + ] + + self.leaked_email_data_items_friendly_names = { + "success": "IPQS Darkweb Leak: Success", + "message": "IPQS Darkweb Leak: Message", + "request_hash": "IPQS Darkweb Leak: Request Hash", + "source": "IPQS Darkweb Leak: Source", + "exposed": "IPQS Darkweb Leak: Exposed", + "first_seen.human": "IPQS Darkweb Leak: First Seen Human", + "first_seen.timestamp": "IPQS Darkweb Leak: First Seen Timestamp", + "first_seen.iso": "IPQS Darkweb Leak: First Seen ISO", + "plain_text_password": "IPQS Darkweb Leak: Plain Text Password", + "request_id": "IPQS Darkweb Leak: Request ID", + } + self.ip_data_items = [ "fraud_score", "country_code", @@ -168,6 +335,13 @@ def __init__(self, attribute): "timezone", "mobile", "host", + "shared_connection", + "dynamic_connection", + "frequent_abuser", + "high_risk_attacks", + "security_scanner", + "trusted_network", + "abuse_events", "proxy", "vpn", "tor", @@ -193,6 +367,13 @@ def __init__(self, attribute): "timezone": "IPQS: Timezone", "mobile": "IPQS: Mobile", "host": "IPQS: Host", + "shared_connection": "IPQS: Shared Connection", + "dynamic_connection": "IPQS: Dynamic Connection", + "frequent_abuser": "IPQS: Frequent Abuser", + "high_risk_attacks": "IPQS: High Risk Attacks", + "security_scanner": "IPQS: Security Scanner", + "trusted_network": "IPQS: Trusted Network", + "abuse_events": "IPQS: Abuse Events", "proxy": "IPQS: Proxy", "vpn": "IPQS: VPN", "tor": "IPQS: TOR", @@ -208,11 +389,31 @@ def __init__(self, attribute): self.url_data_items = [ "unsafe", "domain", + "root_domain", "ip_address", "server", "domain_rank", + "content_type", + "status_code", + "page_size", "dns_valid", "parking", + "country_code", + "language_code", + "domain_trust", + "redirected", + "short_link_redirect", + "hosted_content", + "page_title", + "risky_tld", + "spf_record", + "dmarc_record", + "technologies", + "a_records", + "mx_records", + "ns_records", + "final_url", + "scanned_url", "spamming", "malware", "phishing", @@ -220,16 +421,38 @@ def __init__(self, attribute): "adult", "risk_score", "category", - "domain_age", + "domain_age.human", + "domain_age.timestamp", + "domain_age.iso", ] self.url_data_items_friendly_names = { "unsafe": "IPQS: Unsafe", "domain": "IPQS: Domain", + "root_domain": "IPQS: Root Domain", "ip_address": "IPQS: IP Address", "server": "IPQS: Server", "domain_rank": "IPQS: Domain Rank", + "content_type": "IPQS: Content Type", + "status_code": "IPQS: Status Code", + "page_size": "IPQS: Page Size", "dns_valid": "IPQS: DNS Valid", "parking": "IPQS: Parking", + "country_code": "IPQS: Country Code", + "language_code": "IPQS: Language Code", + "domain_trust": "IPQS: Domain Trust", + "redirected": "IPQS: Redirected", + "short_link_redirect": "IPQS: Short Link Redirect", + "hosted_content": "IPQS: Hosted Content", + "page_title": "IPQS: Page Title", + "risky_tld": "IPQS: Risky TLD", + "spf_record": "IPQS: SPF Record", + "dmarc_record": "IPQS: DMARC Record", + "technologies": "IPQS: Technologies", + "a_records": "IPQS: A Records", + "mx_records": "IPQS: MX Records", + "ns_records": "IPQS: NS Records", + "final_url": "IPQS: Final URL", + "scanned_url": "IPQS: Scanned URL", "spamming": "IPQS: Spamming", "malware": "IPQS: Malware", "phishing": "IPQS: Phishing", @@ -237,8 +460,11 @@ def __init__(self, attribute): "adult": "IPQS: Adult", "risk_score": "IPQS: Risk Score", "category": "IPQS: Category", - "domain_age": "IPQS: Domain Age", + "domain_age.human": "IPQS: Domain Age Human", + "domain_age.timestamp": "IPQS: Domain Age Timestamp", + "domain_age.iso": "IPQS: Domain Age ISO", } + self.email_data_items = [ "valid", "disposable", @@ -251,6 +477,18 @@ def __init__(self, attribute): "honeypot", "deliverability", "frequent_complainer", + "domain_trust", + "domain_velocity", + "user_activity", + "associated_names.status", + "associated_names.names", + "associated_phone_numbers.status", + "associated_phone_numbers.phone_numbers", + "risky_tld", + "spf_record", + "dmarc_record", + "mx_records", + "a_records", "spam_trap_score", "catch_all", "timed_out", @@ -260,8 +498,12 @@ def __init__(self, attribute): "suggested_domain", "leaked", "sanitized_email", - "domain_age", - "first_seen", + "domain_age.human", + "domain_age.timestamp", + "domain_age.iso", + "first_seen.human", + "first_seen.timestamp", + "first_seen.iso", ] self.email_data_items_friendly_names = { "valid": "IPQS: Valid", @@ -284,8 +526,28 @@ def __init__(self, attribute): "suggested_domain": "IPQS: Suggested Domain", "leaked": "IPQS: Leaked", "sanitized_email": "IPQS: Sanitized Email", - "domain_age": "IPQS: Domain Age", - "first_seen": "IPQS: First Seen", + "domain_trust": "IPQS: Domain Trust", + "domain_velocity": "IPQS: Domain Velocity", + "user_activity": "IPQS: User Activity", + "associated_names.status": "IPQS: Associated Names Status", + "associated_names.names": "IPQS: Associated Names", + "associated_phone_numbers.status": ( + "IPQS: Associated Phone Number Status" + ), + "associated_phone_numbers.phone_numbers": ( + "IPQS: Associated Phone Numbers" + ), + "risky_tld": "IPQS: Risky TLD", + "spf_record": "IPQS: SPF Record", + "dmarc_record": "IPQS: DMARC Record", + "mx_records": "IPQS: MX Record", + "a_records": "IPQS: A Records", + "domain_age.human": "IPQS: Domain Age", + "domain_age.timestamp": "IPQS: Domain Age Timestamp", + "domain_age.iso": "IPQS: Domain Age ISO", + "first_seen.human": "IPQS: First Seen Human", + "first_seen.timestamp": "IPQS: First Seen Timestamp", + "first_seen.iso": "IPQS: First Seen ISO", } self.phone_data_items = [ "formatted", @@ -297,6 +559,13 @@ def __init__(self, attribute): "prepaid", "risky", "active", + "sms_domain", + "associated_email_addresses.status", + "associated_email_addresses.emails", + "user_activity", + "mnc", + "mcc", + "spammer", "carrier", "line_type", "country", @@ -309,6 +578,9 @@ def __init__(self, attribute): "name", "timezone", "do_not_call", + "tcpa_blacklist", + "accurate_country_code", + "sms_email", ] self.phone_data_items_friendly_names = { "formatted": "IPQS: Formatted", @@ -323,6 +595,20 @@ def __init__(self, attribute): "carrier": "IPQS: Carrier", "line_type": "IPQS: Line Type", "country": "IPQS: Country", + "sms_domain": "IPQS: SMS Domain", + "associated_email_addresses.status": ( + "IPQS: Associated Email Addresses Status" + ), + "associated_email_addresses.emails": ( + "IPQS: Associated Email Addresses" + ), + "user_activity": "IPQS: User Activity", + "mnc": "IPQS: MNC", + "mcc": "IPQS: MCC", + "spammer": "IPQS: Spammer", + "tcpa_blacklist": "IPQS: TCPA Blacklist", + "accurate_country_code": "IPQS: Accurate Country Code", + "sms_email": "IPQS: SMS Email", "city": "IPQS: City", "zip_code": "IPQS: Zip Code", "region": "IPQS: Region", @@ -333,12 +619,6 @@ def __init__(self, attribute): "timezone": "IPQS: Timezone", "do_not_call": "IPQS: Do Not Call", } - self.timestamp_items_friendly_name = { - "human": " Human", - "timestamp": " Timestamp", - "iso": " ISO", - } - self.timestamp_items = ["human", "timestamp", "iso"] def criticality_color(self, criticality) -> str: """method which maps the color to the criticality level""" @@ -366,15 +646,81 @@ def add_tag(self, tag_name: str, hex_color: str = None) -> None: self.enriched_attribute.add_tag(tag) def ipqs_parser(self, query_response, enrich_type): - """helper method to call the enrichment function according to the type""" + """ + helper method to call the enrichment function according to the type + """ + flatten_json_response = self.flatten_json(query_response) if enrich_type == IP_ENRICH: - self.ip_reputation_data(query_response) + self.ip_reputation_data(flatten_json_response) elif enrich_type == URL_ENRICH: - self.url_reputation_data(query_response) + self.url_reputation_data(flatten_json_response) elif enrich_type == EMAIL_ENRICH: - self.email_reputation_data(query_response) + email_reputation = self.flatten_json( + query_response.get("email_reputation", {})) + leaked_email = self.flatten_json( + query_response.get("leaked_email", {})) + self.email_reputation_data(email_reputation, leaked_email) elif enrich_type == PHONE_ENRICH: - self.phone_reputation_data(query_response) + self.phone_reputation_data(flatten_json_response) + elif enrich_type == USERNAME_ENRICH: + self.username_reputation_data(flatten_json_response) + elif enrich_type == PASSWORD_ENRICH: + self.password_reputation_data(flatten_json_response) + + def flatten_json(self, data, parent_key="", sep="."): + """Flatten only dicts; lists become comma-joined values.""" + items = {} + for key, value in data.items(): + new_key = f"{parent_key}{sep}{key}" if parent_key else key + if isinstance(value, dict): + # If value is a dict → recurse + items.update(self.flatten_json(value, new_key, sep)) + elif isinstance(value, list): + # If value is a list → join values as comma-separated string + items[new_key] = ", ".join(map(str, value)) + else: + # Normal values + items[new_key] = value + return items + + def username_reputation_data(self, query_response): + """method to create object for Username""" + comment = "Results from IPQualityScore Username Exposure API" + for username_data_item in self.username_data_items: + if username_data_item in query_response: + data_item = self.username_data_items_friendly_names[ + username_data_item] + data_item_value = str(query_response[ + username_data_item]) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, data_item, data_item_value)) + self.ipqs_object.add_attribute("Enriched attribute", + **self.enriched_attribute) + self.ipqs_object.add_reference(self.attribute["uuid"], "related-to") + self.misp_event.add_object(self.ipqs_object) + + def password_reputation_data(self, query_response): + """method to create object for Password""" + comment = "Results from IPQualityScore Password Exposure API" + for password_data_item in self.password_data_items: + if password_data_item in query_response: + data_item = self.password_data_items_friendly_names[ + password_data_item + ] + data_item_value = str(query_response[ + password_data_item + ]) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, + data_item, + data_item_value + )) + self.ipqs_object.add_attribute("Enriched attribute", + **self.enriched_attribute) + self.ipqs_object.add_reference(self.attribute["uuid"], "related-to") + self.misp_event.add_object(self.ipqs_object) def ip_reputation_data(self, query_response): """method to create object for IP address""" @@ -383,12 +729,15 @@ def ip_reputation_data(self, query_response): if ip_data_item in query_response: data_item = self.ip_data_items_friendly_names[ip_data_item] data_item_value = str(query_response[ip_data_item]) - self.ipqs_object.add_attribute(**parse_attribute(comment, data_item, data_item_value)) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, data_item, data_item_value)) if ip_data_item == "fraud_score": fraud_score = int(data_item_value) self.ip_address_risk_scoring(fraud_score) - self.ipqs_object.add_attribute("Enriched attribute", **self.enriched_attribute) + self.ipqs_object.add_attribute("Enriched attribute", + **self.enriched_attribute) self.ipqs_object.add_reference(self.attribute["uuid"], "related-to") self.misp_event.add_object(self.ipqs_object) @@ -418,20 +767,12 @@ def url_reputation_data(self, query_response): comment = "Results from IPQualityScore Malicious URL Scanner API" for url_data_item in self.url_data_items: if url_data_item in query_response: - data_item_value = "" - if url_data_item == "domain_age": - for timestamp_item in self.timestamp_items: - data_item = ( - self.url_data_items_friendly_names[url_data_item] - + self.timestamp_items_friendly_name[timestamp_item] - ) - data_item_value = str(query_response[url_data_item][timestamp_item]) - self.ipqs_object.add_attribute(**parse_attribute(comment, data_item, data_item_value)) - else: - data_item = self.url_data_items_friendly_names[url_data_item] - data_item_value = str(query_response[url_data_item]) - self.ipqs_object.add_attribute(**parse_attribute(comment, data_item, data_item_value)) + data_item = self.url_data_items_friendly_names[url_data_item] + data_item_value = str(query_response[url_data_item]) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, data_item, data_item_value)) if url_data_item == "malware": malware = data_item_value if url_data_item == "phishing": @@ -440,7 +781,8 @@ def url_reputation_data(self, query_response): risk_score = int(data_item_value) self.url_risk_scoring(risk_score, malware, phishing) - self.ipqs_object.add_attribute("Enriched attribute", **self.enriched_attribute) + self.ipqs_object.add_attribute("Enriched attribute", + **self.enriched_attribute) self.ipqs_object.add_reference(self.attribute["uuid"], "related-to") self.misp_event.add_object(self.ipqs_object) @@ -466,37 +808,44 @@ def url_risk_scoring(self, score, malware, phishing): tag_name = f'IPQS:VERDICT="{risk_criticality}"' self.add_tag(tag_name, hex_color) - def email_reputation_data(self, query_response): + def email_reputation_data(self, email_reputation_resp, leaked_email_resp): """method to create object for Email Address""" comment = "Results from IPQualityScore Email Verification API" disposable = False valid = False fraud_score = 0 for email_data_item in self.email_data_items: - if email_data_item in query_response: - data_item_value = "" - if email_data_item not in ("domain_age", "first_seen"): - data_item = self.email_data_items_friendly_names[email_data_item] - data_item_value = str(query_response[email_data_item]) - self.ipqs_object.add_attribute(**parse_attribute(comment, data_item, data_item_value)) - else: - for timestamp_item in self.timestamp_items: - data_item = ( - self.email_data_items_friendly_names[email_data_item] - + self.timestamp_items_friendly_name[timestamp_item] - ) - data_item_value = str(query_response[email_data_item][timestamp_item]) - self.ipqs_object.add_attribute(**parse_attribute(comment, data_item, data_item_value)) - + if email_data_item in email_reputation_resp: + data_item = self.email_data_items_friendly_names[ + email_data_item + ] + data_item_value = str(email_reputation_resp[ + email_data_item + ]) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, data_item, data_item_value)) if email_data_item == "disposable": disposable = data_item_value if email_data_item == "valid": valid = data_item_value if email_data_item == "fraud_score": fraud_score = int(data_item_value) - self.email_address_risk_scoring(fraud_score, disposable, valid) - self.ipqs_object.add_attribute("Enriched attribute", **self.enriched_attribute) + for leaked_email_data_item in self.leaked_email_data_items: + if leaked_email_data_item in leaked_email_resp: + data_item = self.leaked_email_data_items_friendly_names[ + leaked_email_data_item + ] + data_item_value = str( + leaked_email_resp[leaked_email_data_item] + ) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, data_item, data_item_value)) + + self.ipqs_object.add_attribute("Enriched attribute", + **self.enriched_attribute) self.ipqs_object.add_reference(self.attribute["uuid"], "related-to") self.misp_event.add_object(self.ipqs_object) @@ -528,9 +877,13 @@ def phone_reputation_data(self, query_response): comment = "Results from IPQualityScore Phone Number Validation API" for phone_data_item in self.phone_data_items: if phone_data_item in query_response: - data_item = self.phone_data_items_friendly_names[phone_data_item] + data_item = self.phone_data_items_friendly_names[ + phone_data_item + ] data_item_value = str(query_response[phone_data_item]) - self.ipqs_object.add_attribute(**parse_attribute(comment, data_item, data_item_value)) + if len(data_item_value) > 0: + self.ipqs_object.add_attribute(**parse_attribute( + comment, data_item, data_item_value)) if phone_data_item == "active": active = data_item_value if phone_data_item == "valid": @@ -539,7 +892,8 @@ def phone_reputation_data(self, query_response): fraud_score = int(data_item_value) self.phone_address_risk_scoring(fraud_score, valid, active) - self.ipqs_object.add_attribute("Enriched attribute", **self.enriched_attribute) + self.ipqs_object.add_attribute("Enriched attribute", + **self.enriched_attribute) self.ipqs_object.add_reference(self.attribute["uuid"], "related-to") self.misp_event.add_object(self.ipqs_object) @@ -570,7 +924,8 @@ def get_results(self): def handler(q=False): - """The function which accepts a JSON document to expand the values and return a dictionary of the expanded + """The function which accepts a JSON document to expand the\ + values and return a dictionary of the expanded values.""" if q is False: return False @@ -580,31 +935,142 @@ def handler(q=False): misperrors["error"] = "IPQualityScore apikey is missing" return misperrors apikey = request["config"].get("apikey") - # check attribute is added to the event - if not request.get("attribute") or not check_input_attribute(request["attribute"]): - return {"error": f"{standard_error_message}, which should contain at least a type, a value and an uuid."} + base_url = request.get("config").get("base_url") + if ( + not request.get("attribute") + or not check_input_attribute(request["attribute"]) + ): + error_msg = ( + f"{standard_error_message}, which should contain at least a type, " + "a value, and a uuid." + ) + return {"error": error_msg} attribute = request["attribute"] attribute_type = attribute["type"] attribute_value = attribute["value"] + headers = {"IPQS-KEY": apikey} + # check if the attribute type is supported by IPQualityScore if attribute_type not in mispattributes["input"]: - return {"error": "Unsupported attributes type for IPqualityScore Enrichment"} - request_handler = RequestHandler(apikey) + return { + "error": "Unsupported attributes type for IPQS Enrichment" + } + request_handler = RequestHandler(apikey, base_url) enrich_type = "" + json_response = {} if attribute_type in ip_query_input_type: enrich_type = IP_ENRICH - json_response = request_handler.ipqs_lookup(IP_ENRICH, attribute_value) + json_response = request_handler.ipqs_lookup(IP_ENRICH, + attribute_value) elif attribute_type in url_query_input_type: enrich_type = URL_ENRICH - json_response = request_handler.ipqs_lookup(URL_ENRICH, attribute_value) + json_response = request_handler.ipqs_lookup(URL_ENRICH, + attribute_value) elif attribute_type in email_query_input_type: enrich_type = EMAIL_ENRICH - json_response = request_handler.ipqs_lookup(EMAIL_ENRICH, attribute_value) + json_response1 = request_handler.ipqs_lookup(EMAIL_ENRICH, + attribute_value) + json_response2 = request_handler.ipqs_darkweb_lookup(EMAIL_ENRICH, + attribute_value) + json_response = { + "email_reputation": json_response1, + "leaked_email": json_response2 + } elif attribute_type in phone_query_input_type: enrich_type = PHONE_ENRICH - json_response = request_handler.ipqs_lookup(PHONE_ENRICH, attribute_value) + json_response = request_handler.ipqs_lookup(PHONE_ENRICH, + attribute_value) + elif attribute_type in username_query_input_type: + enrich_type = USERNAME_ENRICH + json_response = request_handler.ipqs_darkweb_lookup(USERNAME_ENRICH, + attribute_value) + elif attribute_type in password_query_input_type: + enrich_type = PASSWORD_ENRICH + json_response = request_handler.ipqs_darkweb_lookup(PASSWORD_ENRICH, + attribute_value) + elif attribute_type in file_query_input_type: + try: + data = request.get("attribute").get("data", "") + if "malware-sample" in attribute_type: + sample_filename = attribute_value.split("|")[0] + logger.info("Processing malware-sample: %s", sample_filename) + decoded = base64.b64decode(data) + with zipfile.ZipFile(io.BytesIO(decoded)) as zf: + zipped_file = zf.namelist()[0] + data = zf.read(zipped_file, pwd=b"infected") + elif "attachment" in attribute_type: + sample_filename = request["attachment"] + logger.info("Processing attachment: %s", sample_filename) + data = base64.b64decode(data) + else: + logger.warning("No file supplied in request") + misperrors["error"] = ( + "No malware sample or attachment supplied" + ) + return misperrors + except Exception: + logger.exception("Sample processing failed") + misperrors["error"] = "Unable to process submitted sample data" + return misperrors + + poll_delay = request.get("config", {}).get("poll_delay", "1") + try: + poll_delay = int(poll_delay) + except Exception: + poll_delay = 1 + try: + files = {"file": (sample_filename, io.BytesIO(data))} + max_retries = 3 + retries = 0 + headers = {"IPQS-KEY": apikey} + try: + if base_url and apikey: + if not base_url: + misperrors["error"] = ( + "IPQS configuration missing: " + "Please check the Base URL." + ) + return misperrors + if not apikey: + misperrors["error"] = ( + "IPQS configuration missing: " + "Please check the API Key." + ) + return misperrors + response = requests.post( + f"{base_url.strip('/')}/malware/lookup/", + headers=headers, files=files, timeout=30 + ) + json_response = response.json() + if json_response.get("success") is True: + if json_response.get("status", False) == "cached": + return ipqs_process(json_response) + response = requests.post( + f"{base_url.strip('/')}/malware/scan/", + headers=headers, files=files, timeout=30 + ) + json_response = response.json() + payload = {"request_id": json_response.get("request_id")} + while (retries <= max_retries and + json_response.get("status") == "pending"): + retries += 1 + time.sleep(poll_delay) + response = requests.get( + f"{base_url.strip('/')}/postback/", + headers=headers, json=payload, timeout=30 + ) + json_response = response.json() + return ipqs_process(json_response) + except requests.exceptions.RequestException: + time.sleep(poll_delay) + logger.info("IPQS response status: %s", response.status_code) + + except Exception: + logger.exception("IPQS submission or polling failed") + misperrors["error"] = "IPQS submission failed" + return misperrors parser = IPQualityScoreParser(attribute) parser.ipqs_parser(json_response, enrich_type) @@ -612,12 +1078,65 @@ def handler(q=False): def introspection(): - """The function that returns a dict of the supported attributes (input and output) by your expansion module.""" + """The function that returns a dict of the supported \ + attributes (input and output) by your expansion module.""" return mispattributes def version(): - """The function that returns a dict with the version and the associated meta-data including potential + """The function that returns a dict with the version and \ + the associated meta-data including potential configurations required of the module.""" moduleinfo["config"] = moduleconfig return moduleinfo + + +def ipqs_process(ipqsdata): + """Process the JSON file returned by IPQS where 'result' is a list""" + if not ipqsdata: + misperrors["error"] = "Unable to parse results." + return misperrors + + try: + sample = ipqsdata + r = {"results": []} + + # 1. Determine Tags by checking the results list + # We look for any entry that is 'detected' to mark the whole sample + results_list = sample.get("result", []) + # 2. Extract Base File Metadata + # We apply the tags here so the MD5/SHA256 are categorized in MISP/SOAR + field_map = { + "md5": "md5", + "sha1": "sha1", + "file_name": "filename", + "file_hash": "sha256", + "file_size": "size-in-bytes", + "file_type": "mime-type", + } + + for key, misp_type in field_map.items(): + if sample.get(key): + r["results"].append( + { + "types": misp_type, + "values": sample[key], + } + ) + + # 3. Process the results list (Threat Names / Detections) + for res in results_list: + tags = [ + f"Detected:{res.get('detected', 'none')}", + f"Error:{res.get('error', '')}", + ] + if res.get("name"): + r["results"].append( + {"types": "text", "values": res["name"], "tags": tags} + ) + logger.info("IPQS submission processed successfully") + return r + except Exception as e: + logger.error("Error processing IPQS data: %s", str(e)) + misperrors["error"] = f"Processing error: {str(e)}" + return misperrors