Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 306 additions & 5 deletions extensions/business/cybersec/red_mesh/pentester_api_01.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@

"""

import ipaddress
import random

from urllib.parse import urlparse

from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin
from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module
from .redmesh_llm_agent_mixin import _RedMeshLlmAgentMixin
Expand All @@ -47,7 +50,7 @@
RISK_CRED_PENALTY_CAP,
)

__VER__ = '0.8.2'
__VER__ = '0.9.0'

_CONFIG = {
**BasePlugin.CONFIG,
Expand Down Expand Up @@ -94,6 +97,11 @@
"SCANNER_IDENTITY": "probe.redmesh.local", # EHLO domain for SMTP probes
"SCANNER_USER_AGENT": "", # HTTP User-Agent (empty = default requests UA)

# RedMesh attestation submission
"ATTESTATION_PRIVATE_KEY": "",
"ATTESTATION_ENABLED": True,
"ATTESTATION_MIN_SECONDS_BETWEEN_SUBMITS": 86400,

'VALIDATION_RULES': {
**BasePlugin.CONFIG['VALIDATION_RULES'],
},
Expand All @@ -120,6 +128,7 @@ class PentesterApi01Plugin(BasePlugin, _RedMeshLlmAgentMixin):
List of job_ids that completed locally (used for status responses).
"""
CONFIG = _CONFIG
REDMESH_ATTESTATION_DOMAIN = "0xced141225d43c56d8b224d12f0b9524a15dc86df0113c42ffa4bc859309e0d40"


def on_init(self):
Expand Down Expand Up @@ -211,6 +220,239 @@ def Pd(self, s, *args, score=-1, **kwargs):
return


def _attestation_get_tenant_private_key(self):
private_key = self.cfg_attestation_private_key
if private_key:
private_key = private_key.strip()
if not private_key:
return None
return private_key

@staticmethod
def _attestation_pack_cid_obfuscated(report_cid) -> str:
if not isinstance(report_cid, str) or len(report_cid.strip()) == 0:
return "0x" + ("00" * 10)
cid = report_cid.strip()
if len(cid) >= 10:
masked = cid[:5] + cid[-5:]
else:
masked = cid.ljust(10, "_")
safe = "".join(ch if 32 <= ord(ch) <= 126 else "_" for ch in masked)[:10]
data = safe.encode("ascii", errors="ignore")
if len(data) < 10:
data = data + (b"_" * (10 - len(data)))
return "0x" + data[:10].hex()

@staticmethod
def _attestation_extract_host(target):
if not isinstance(target, str):
return None
target = target.strip()
if not target:
return None
if "://" in target:
parsed = urlparse(target)
if parsed.hostname:
return parsed.hostname
host = target.split("/", 1)[0]
if host.count(":") == 1 and "." in host:
host = host.split(":", 1)[0]
return host

def _attestation_pack_ip_obfuscated(self, target) -> str:
host = self._attestation_extract_host(target)
if not host:
return "0x0000"
if ".." in host:
parts = host.split("..")
if len(parts) == 2 and all(part.isdigit() for part in parts):
first_octet = int(parts[0])
last_octet = int(parts[1])
if 0 <= first_octet <= 255 and 0 <= last_octet <= 255:
return f"0x{first_octet:02x}{last_octet:02x}"
try:
ip_obj = ipaddress.ip_address(host)
except Exception:
return "0x0000"
if ip_obj.version != 4:
return "0x0000"
octets = host.split(".")
first_octet = int(octets[0])
last_octet = int(octets[-1])
return f"0x{first_octet:02x}{last_octet:02x}"

@staticmethod
def _attestation_pack_execution_id(job_id) -> str:
if not isinstance(job_id, str):
raise ValueError("job_id must be a string")
job_id = job_id.strip()
if len(job_id) != 8:
raise ValueError("job_id must be exactly 8 characters")
try:
data = job_id.encode("ascii")
except UnicodeEncodeError as exc:
raise ValueError("job_id must contain only ASCII characters") from exc
return "0x" + data.hex()


def _attestation_get_worker_eth_addresses(self, workers: dict) -> list[str]:
if not isinstance(workers, dict):
return []
eth_addresses = []
for node_addr in workers.keys():
eth_addr = self.bc.node_addr_to_eth_addr(node_addr)
if not isinstance(eth_addr, str) or not eth_addr.startswith("0x"):
raise ValueError(f"Unable to convert worker node to EVM address: {node_addr}")
eth_addresses.append(eth_addr)
eth_addresses.sort()
return eth_addresses

def _attestation_pack_node_hashes(self, workers: dict) -> str:
eth_addresses = self._attestation_get_worker_eth_addresses(workers)
if len(eth_addresses) == 0:
return "0x" + ("00" * 32)
digest = self.bc.eth_hash_message(types=["address[]"], values=[eth_addresses], as_hex=True)
if isinstance(digest, str) and digest.startswith("0x"):
return digest
return "0x" + str(digest)

def _submit_redmesh_test_attestation(self, job_id: str, job_specs: dict, workers: dict, vulnerability_score=0):
self.P(f"[ATTESTATION] Test attestation requested for job {job_id} (score={vulnerability_score})")
if not self.cfg_attestation_enabled:
self.P("[ATTESTATION] Attestation is disabled via config. Skipping.", color='y')
return None
tenant_private_key = self._attestation_get_tenant_private_key()
if tenant_private_key is None:
self.P(
"[ATTESTATION] Tenant private key is missing. "
"Expected env var 'R1EN_ATTESTATION_PRIVATE_KEY'. Skipping.",
color='y'
)
return None

run_mode = str(job_specs.get("run_mode", "SINGLEPASS")).upper()
test_mode = 1 if run_mode == "CONTINUOUS_MONITORING" else 0
node_count = len(workers) if isinstance(workers, dict) else 0
target = job_specs.get("target")
execution_id = self._attestation_pack_execution_id(job_id)
report_cid = workers.get(self.ee_addr, {}).get("report_cid", None) #TODO: use the correct CID
node_eth_address = self.bc.eth_address
ip_obfuscated = self._attestation_pack_ip_obfuscated(target)
cid_obfuscated = self._attestation_pack_cid_obfuscated(report_cid)

self.P(
f"[ATTESTATION] Submitting test attestation: job={job_id}, mode={'CONTINUOUS' if test_mode else 'SINGLEPASS'}, "
f"nodes={node_count}, score={vulnerability_score}, target={ip_obfuscated}, "
f"cid={cid_obfuscated}, sender={node_eth_address}"
)
tx_hash = self.bc.submit_attestation(
function_name="submitRedmeshTestAttestation",
function_args=[
test_mode,
node_count,
vulnerability_score,
execution_id,
ip_obfuscated,
cid_obfuscated,
],
signature_types=["bytes32", "uint8", "uint16", "uint8", "bytes8", "bytes2", "bytes10"],
signature_values=[
self.REDMESH_ATTESTATION_DOMAIN,
test_mode,
node_count,
vulnerability_score,
execution_id,
ip_obfuscated,
cid_obfuscated,
],
tx_private_key=tenant_private_key,
)

result = {
"job_id": job_id,
"tx_hash": tx_hash,
"test_mode": "C" if test_mode == 1 else "S",
"node_count": node_count,
"vulnerability_score": vulnerability_score,
"execution_id": execution_id,
"report_cid": report_cid,
"node_eth_address": node_eth_address,
}
self.P(
"Submitted RedMesh test attestation for "
f"{job_id} (tx: {tx_hash}, node: {node_eth_address}, score: {vulnerability_score})",
color='g'
)
return result

def _submit_redmesh_job_start_attestation(self, job_id: str, job_specs: dict, workers: dict):
self.P(f"[ATTESTATION] Job-start attestation requested for job {job_id}")
if not self.cfg_attestation_enabled:
self.P("[ATTESTATION] Attestation is disabled via config. Skipping.", color='y')
return None
tenant_private_key = self._attestation_get_tenant_private_key()
if tenant_private_key is None:
self.P(
"[ATTESTATION] Tenant private key is missing. "
"Expected env var 'R1EN_ATTESTATION_PRIVATE_KEY'. Skipping.",
color='y'
)
return None

run_mode = str(job_specs.get("run_mode", "SINGLEPASS")).upper()
test_mode = 1 if run_mode == "CONTINUOUS_MONITORING" else 0
node_count = len(workers) if isinstance(workers, dict) else 0
target = job_specs.get("target")
execution_id = self._attestation_pack_execution_id(job_id)
node_eth_address = self.bc.eth_address
ip_obfuscated = self._attestation_pack_ip_obfuscated(target)
node_hashes = self._attestation_pack_node_hashes(workers)

worker_addrs = list(workers.keys()) if isinstance(workers, dict) else []
self.P(
f"[ATTESTATION] Submitting job-start attestation: job={job_id}, mode={'CONTINUOUS' if test_mode else 'SINGLEPASS'}, "
f"nodes={node_count}, target={ip_obfuscated}, node_hashes={node_hashes}, "
f"workers={worker_addrs}, sender={node_eth_address}"
)
tx_hash = self.bc.submit_attestation(
function_name="submitRedmeshJobStartAttestation",
function_args=[
test_mode,
node_count,
execution_id,
node_hashes,
ip_obfuscated,
],
signature_types=["bytes32", "uint8", "uint16", "bytes8", "bytes32", "bytes2"],
signature_values=[
self.REDMESH_ATTESTATION_DOMAIN,
test_mode,
node_count,
execution_id,
node_hashes,
ip_obfuscated,
],
tx_private_key=tenant_private_key,
)

result = {
"job_id": job_id,
"tx_hash": tx_hash,
"test_mode": "C" if test_mode == 1 else "S",
"node_count": node_count,
"execution_id": execution_id,
"node_hashes": node_hashes,
"ip_obfuscated": ip_obfuscated,
"node_eth_address": node_eth_address,
}
self.P(
"Submitted RedMesh job-start attestation for "
f"{job_id} (tx: {tx_hash}, node: {node_eth_address}, node_count: {node_count})",
color='g'
)
return result


def __post_init(self):
"""
Perform warmup: reconcile existing jobs in CStore, migrate legacy keys,
Expand Down Expand Up @@ -1107,23 +1349,63 @@ def _maybe_finalize_pass(self):
# ═══════════════════════════════════════════════════
pass_date_started = self._get_timeline_date(job_specs, "pass_started") or self._get_timeline_date(job_specs, "created")
pass_date_completed = self.time()
pass_history.append({
pass_record = ({
"pass_nr": job_pass,
"date_started": pass_date_started,
"date_completed": pass_date_completed,
"duration": round(pass_date_completed - pass_date_started, 2) if pass_date_started else None,
"reports": {addr: w.get("report_cid") for addr, w in workers.items()}
})
now_ts = self.time()

# Compute risk score for this pass
aggregated_for_score = self._collect_aggregated_report(workers)
risk_score = 0
if aggregated_for_score:
risk_result = self._compute_risk_score(aggregated_for_score)
pass_history[-1]["risk_score"] = risk_result["score"]
pass_history[-1]["risk_breakdown"] = risk_result["breakdown"]
job_specs["risk_score"] = risk_result["score"]
pass_record["risk_score"] = risk_result["score"]
pass_record["risk_breakdown"] = risk_result["breakdown"]
risk_score = risk_result["score"]
job_specs["risk_score"] = risk_score
self.P(f"Risk score for job {job_id} pass {job_pass}: {risk_result['score']}/100")

should_submit_attestation = True
if run_mode == "CONTINUOUS_MONITORING":
last_attestation_at = job_specs.get("last_attestation_at")
min_interval = self.cfg_attestation_min_seconds_between_submits
if last_attestation_at is not None and now_ts - last_attestation_at < min_interval:
elapsed = round(now_ts - last_attestation_at)
self.P(
f"[ATTESTATION] Skipping test attestation for job {job_id}: "
f"last submitted {elapsed}s ago, min interval is {min_interval}s",
color='y'
)
should_submit_attestation = False

if should_submit_attestation:
# Best-effort on-chain summary; failures must not block pass finalization.
try:
redmesh_test_attestation = self._submit_redmesh_test_attestation(
job_id=job_id,
job_specs=job_specs,
workers=workers,
vulnerability_score=risk_score
)
if redmesh_test_attestation is not None:
pass_record["redmesh_test_attestation"] = redmesh_test_attestation
job_specs["last_attestation_at"] = now_ts
except Exception as exc:
import traceback
self.P(
f"[ATTESTATION] Failed to submit test attestation for job {job_id}: {exc}\n"
f" Type: {type(exc).__name__}\n"
f" Args: {exc.args}\n"
f" Traceback:\n{traceback.format_exc()}",
color='r'
)

pass_history.append(pass_record)

# Handle SINGLEPASS - set FINALIZED and exit (no scheduling)
if run_mode == "SINGLEPASS":
job_specs["job_status"] = "FINALIZED"
Expand Down Expand Up @@ -1594,6 +1876,25 @@ def launch_test(
actor_type="user"
)
self._emit_timeline_event(job_specs, "started", "Scan started", actor=self.ee_id, actor_type="node")

try:
redmesh_job_start_attestation = self._submit_redmesh_job_start_attestation(
job_id=job_id,
job_specs=job_specs,
workers=workers,
)
if redmesh_job_start_attestation is not None:
job_specs["redmesh_job_start_attestation"] = redmesh_job_start_attestation
except Exception as exc:
import traceback
self.P(
f"[ATTESTATION] Failed to submit job-start attestation for job {job_id}: {exc}\n"
f" Type: {type(exc).__name__}\n"
f" Args: {exc.args}\n"
f" Traceback:\n{traceback.format_exc()}",
color='r'
)

self.chainstore_hset(
hkey=self.cfg_instance_id,
key=job_id,
Expand Down