Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,26 @@ and this project adheres to

## [Unreleased]

### Added

- AsyncClient with full async/await support using httpx
- Simple `search()` API accepting raw query strings
- `get_domain()` method for domain lookups
- Streaming `bulk_export_stream()` for memory-efficient exports
- `serialize_queries()` helper to reduce query serialization duplication
- Async example in `example/example_async_client.py`

### Changed

- Use `__get` in both sync and async clients for uniform internal API
- Widen query type from `Query` to `AbstractQuery` to accept `RawQuery` directly
- Updated l9format requirement from =1.3.2 to =1.4.0 ([ae676d9])
- Updated l9format requirement from =1.4.0 to =2.0.0 ([df916e5], [#68])

### Fixed

- Return `SuccessResponse` for HTTP 204 No Content instead of `ErrorResponse`

### Added

- Add Python 3.11, 3.12, and 3.14 support ([d111628])
Expand Down
36 changes: 36 additions & 0 deletions example/example_async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Example usage of the async LeakIX client."""

import asyncio

import decouple

from leakix import AsyncClient, Scope

API_KEY = decouple.config("API_KEY")


async def example_search_services():
"""Search for services using a raw query string."""
async with AsyncClient(api_key=API_KEY) as client:
response = await client.search("+country:FR +port:22", scope=Scope.SERVICE)
assert response.status_code() == 200
for event in response.json():
print(f"{event.ip}:{event.port} - {event.summary}")


async def example_search_leaks():
"""Search for leaks using a raw query string."""
async with AsyncClient(api_key=API_KEY) as client:
response = await client.search("+plugin:GitConfigHttpPlugin", scope=Scope.LEAK)
assert response.status_code() == 200
for event in response.json():
print(f"{event.host} - {event.summary}")


async def main():
await example_search_services()
await example_search_leaks()


if __name__ == "__main__":
asyncio.run(main())
36 changes: 35 additions & 1 deletion example/example_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import decouple

from leakix import Client
from leakix import Client, Scope
from leakix.field import CountryField, Operator, PluginField, TimeField
from leakix.plugin import Plugin
from leakix.query import MustNotQuery, MustQuery, RawQuery
Expand Down Expand Up @@ -118,6 +118,36 @@ def example_get_subdomains():
print(response.json())


def example_search_simple():
"""Simple search using query string syntax (same as the website)."""
response = CLIENT.search("+plugin:GitConfigHttpPlugin", scope=Scope.LEAK)
for event in response.json():
print(event.ip)


def example_search_service():
"""Search for services with multiple filters."""
response = CLIENT.search("+country:FR +port:22", scope=Scope.SERVICE)
for event in response.json():
print(event.ip, event.port)


def example_get_domain():
"""Get services and leaks for a domain."""
response = CLIENT.get_domain("example.com")
if response.is_success():
print("Services:", response.json()["services"])
print("Leaks:", response.json()["leaks"])


def example_bulk_export_stream():
"""Streaming bulk export - memory efficient for large datasets."""
query = MustQuery(field=PluginField(Plugin.GitConfigHttpPlugin))
for aggregation in CLIENT.bulk_export_stream(queries=[query]):
for event in aggregation.events:
print(event.ip)


if __name__ == "__main__":
example_get_host_filter_plugin()
example_get_service_filter_plugin()
Expand All @@ -131,3 +161,7 @@ def example_get_subdomains():
example_bulk_service()
example_bulk_export_last_event()
example_get_subdomains()
example_search_simple()
example_search_service()
example_get_domain()
example_bulk_export_stream()
4 changes: 3 additions & 1 deletion leakix/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from importlib.metadata import version

from leakix.async_client import AsyncClient as AsyncClient
from leakix.base import HostResult as HostResult
from leakix.client import Client as Client
from leakix.client import HostResult as HostResult
from leakix.client import Scope as Scope
from leakix.domain import L9Subdomain as L9Subdomain
from leakix.field import (
Expand Down Expand Up @@ -71,6 +72,7 @@

__all__ = [
"__version__",
"AsyncClient",
"Client",
"HostResult",
"L9Subdomain",
Expand Down
188 changes: 188 additions & 0 deletions leakix/async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Async LeakIX API client using httpx."""

import json
from collections.abc import AsyncIterator
from typing import Any, cast

import httpx
from l9format import l9format

from leakix.base import DEFAULT_URL, BaseClient
from leakix.client import Scope
from leakix.query import AbstractQuery, serialize_queries
from leakix.response import (
AbstractResponse,
ErrorResponse,
RateLimitResponse,
SuccessResponse,
)

DEFAULT_TIMEOUT = 30.0


class AsyncClient(BaseClient):
"""Async client for the LeakIX API.

Mirrors the sync Client API but uses httpx for async operations.
All methods return AbstractResponse for consistency with the sync client.
"""

def __init__(
self,
api_key: str | None = None,
base_url: str | None = DEFAULT_URL,
timeout: float = DEFAULT_TIMEOUT,
) -> None:
super().__init__(api_key=api_key, base_url=base_url)
self.timeout = timeout
self._client: httpx.AsyncClient | None = None

async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=self.timeout,
)
return self._client

async def close(self) -> None:
"""Close the HTTP client."""
if self._client is not None and not self._client.is_closed:
await self._client.aclose()
self._client = None

async def __aenter__(self) -> "AsyncClient":
return self

async def __aexit__(self, *args: Any) -> None:
await self.close()

async def __get(
self, path: str, params: dict[str, Any] | None = None
) -> AbstractResponse:
"""Make a GET request and return an AbstractResponse."""
client = await self._get_client()
r = await client.get(path, params=params)
if r.status_code == 200:
response_json = r.json() if r.content else []
return SuccessResponse(response=r, response_json=response_json)
elif r.status_code == 429:
return RateLimitResponse(response=r)
elif r.status_code == 204:
return SuccessResponse(response=r, response_json=[])
else:
return ErrorResponse(response=r, response_json=r.json())

async def get(
self,
scope: Scope,
queries: list[AbstractQuery] | None = None,
page: int = 0,
) -> AbstractResponse:
"""Search LeakIX for services or leaks."""
if page < 0:
raise ValueError("Page argument must be a positive integer")
serialized_query = serialize_queries(queries)
return await self.__get(
"/search",
params={"scope": scope.value, "q": serialized_query, "page": page},
)

async def get_service(
self, queries: list[AbstractQuery] | None = None, page: int = 0
) -> AbstractResponse:
"""Shortcut for get with scope=Scope.SERVICE."""
return self._parse_events(
await self.get(Scope.SERVICE, queries=queries, page=page)
)

async def get_leak(
self, queries: list[AbstractQuery] | None = None, page: int = 0
) -> AbstractResponse:
"""Shortcut for get with scope=Scope.LEAK."""
return self._parse_events(
await self.get(Scope.LEAK, queries=queries, page=page)
)

async def search(
self, query: str, scope: Scope = Scope.LEAK, page: int = 0
) -> AbstractResponse:
"""
Simple search using a raw query string (same syntax as the website).

Example:
>>> await client.search("+plugin:GitConfigHttpPlugin", scope=Scope.LEAK)
"""
if page < 0:
raise ValueError("Page argument must be a positive integer")
r = await self.__get(
"/search",
params={"scope": scope.value, "q": query, "page": page},
)
return self._parse_events(r)

async def get_host(self, ipv4: str) -> AbstractResponse:
"""Returns the list of services and associated leaks for a given host."""
return self._parse_host_result(await self.__get(f"/host/{ipv4}"))

async def get_domain(self, domain: str) -> AbstractResponse:
"""Returns the list of services and associated leaks for a given domain."""
return self._parse_host_result(await self.__get(f"/domain/{domain}"))

async def get_plugins(self) -> AbstractResponse:
"""Returns the list of plugins the authenticated user has access to."""
return self._parse_plugins(await self.__get("/api/plugins"))

async def get_subdomains(self, domain: str) -> AbstractResponse:
"""Returns the list of subdomains for a given domain."""
return self._parse_subdomains(await self.__get(f"/api/subdomains/{domain}"))

async def bulk_export(
self, queries: list[AbstractQuery] | None = None
) -> AbstractResponse:
"""Bulk export leaks (Pro API feature)."""
serialized_query = serialize_queries(queries)
client = await self._get_client()
async with client.stream(
"GET", "/bulk/search", params={"q": serialized_query}
) as r:
if r.status_code == 200:
response_json = []
async for line in r.aiter_lines():
if line:
json_event = json.loads(line)
response_json.append(
l9format.L9Aggregation.from_dict(json_event)
)
return SuccessResponse(response=r, response_json=response_json)
elif r.status_code == 429:
return RateLimitResponse(response=r)
elif r.status_code == 204:
return SuccessResponse(response=r, response_json=[])
else:
await r.aread()
return ErrorResponse(response=r, response_json=r.json())

async def bulk_export_stream(
self, queries: list[AbstractQuery] | None = None
) -> AsyncIterator[l9format.L9Aggregation]:
"""
Streaming version of bulk_export. Yields L9Aggregation objects one by one.
More memory efficient for large result sets.
"""
serialized_query = serialize_queries(queries)
client = await self._get_client()
async with client.stream(
"GET", "/bulk/search", params={"q": serialized_query}
) as r:
if r.status_code != 200:
return
async for line in r.aiter_lines():
if line:
json_event = json.loads(line)
yield cast(
l9format.L9Aggregation,
l9format.L9Aggregation.from_dict(json_event),
)
73 changes: 73 additions & 0 deletions leakix/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Shared logic between sync and async LeakIX clients."""

import dataclasses
from importlib.metadata import version
from typing import Any, cast

from l9format import l9format
from l9format.l9format import Model

from leakix.domain import L9Subdomain
from leakix.plugin import APIResult
from leakix.response import AbstractResponse

DEFAULT_URL = "https://leakix.net"


@dataclasses.dataclass
class HostResult(Model):
Services: list[l9format.L9Event] | None = None
Leaks: list[l9format.L9Event] | None = None


class BaseClient:
"""Shared initialization and response transformation logic."""

MAX_RESULTS_PER_PAGE = 20

def __init__(
self,
api_key: str | None = None,
base_url: str | None = DEFAULT_URL,
) -> None:
self.api_key = api_key
self.base_url = base_url if base_url else DEFAULT_URL
self.headers: dict[str, str] = {
"Accept": "application/json",
"User-agent": f"leakix-client-python/{version('leakix')}",
}
if api_key:
self.headers["api-key"] = api_key

@staticmethod
def _parse_events(response: AbstractResponse) -> AbstractResponse:
"""Parse raw JSON dicts into L9Event objects on a success response."""
if response.is_success():
response.response_json = [
l9format.L9Event.from_dict(res) for res in response.response_json
]
return response

@staticmethod
def _parse_host_result(response: AbstractResponse) -> AbstractResponse:
"""Parse a host/domain response into {services, leaks} format."""
if response.is_success():
data: dict[str, Any] = response.json()
formatted = cast(HostResult, HostResult.from_dict(data))
response.response_json = {
"services": formatted.Services,
"leaks": formatted.Leaks,
}
return response

@staticmethod
def _parse_plugins(response: AbstractResponse) -> AbstractResponse:
if response.is_success():
response.response_json = [APIResult.from_dict(d) for d in response.json()]
return response

@staticmethod
def _parse_subdomains(response: AbstractResponse) -> AbstractResponse:
if response.is_success():
response.response_json = [L9Subdomain.from_dict(d) for d in response.json()]
return response
Loading
Loading