Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/python-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12', '3.13']
fail-fast: false
steps:
- uses: actions/checkout@v6
Expand Down Expand Up @@ -69,4 +69,4 @@ jobs:
run: poetry run mypy

- name: Upload coverage
uses: codecov/codecov-action@v5
uses: codecov/codecov-action@v5
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
default_stages: [commit, push]
default_stages: [pre-commit, pre-push]
default_language_version:
# force all unspecified python hooks to run python3
python: python3
Expand Down
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This file is a concise, repo-specific guide for agentic coding tools.
If anything here conflicts with code or CI, follow the code/CI.

## Quick facts
- Primary language: Python (>=3.9)
- Primary language: Python (>=3.10)
- Build/packaging: Poetry
- Tests: pytest + coverage
- Lint/format: black, isort, flynt (pre-commit)
Expand Down Expand Up @@ -93,4 +93,4 @@ If anything here conflicts with code or CI, follow the code/CI.
## Notes for agents
- Prefer editing minimal sections; do not reformat unrelated code.
- Run targeted tests when possible (single file or single test).
- Keep changes compatible with Python 3.9+.
- Keep changes compatible with Python 3.10+.
93 changes: 44 additions & 49 deletions pathable/accessors.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
"""Pathable accessors module"""

import stat
import sys
from collections import OrderedDict
from collections.abc import Hashable
from collections.abc import Mapping
from collections.abc import Sequence
from pathlib import Path
from typing import Any
from typing import Generic
from typing import Optional
from typing import TypeVar
from typing import Union

from pathable.protocols import Subscriptable
from pathable.types import LookupKey
Expand Down Expand Up @@ -45,7 +42,7 @@ def __eq__(self, other: object) -> Any:
return NotImplemented
return self.node == other.node

def stat(self, parts: Sequence[K]) -> Union[dict[str, Any], None]:
def stat(self, parts: Sequence[K]) -> dict[str, Any] | None:
raise NotImplementedError

def keys(self, parts: Sequence[K]) -> Sequence[K]:
Expand Down Expand Up @@ -156,8 +153,9 @@ def _is_traversable_node(cls, node: N) -> bool:
@classmethod
def _get_node(cls, node: N, parts: Sequence[K]) -> N:
current = node
get_subnode = cls._get_subnode
for part in parts:
current = cls._get_subnode(current, part)
current = get_subnode(current, part)
return current

@classmethod
Expand All @@ -171,14 +169,10 @@ def _get_subnode(cls, node: N, part: K) -> N:

class PathAccessor(NodeAccessor[Path, str, bytes]):

def stat(self, parts: Sequence[str]) -> Union[dict[str, Any], None]:
def stat(self, parts: Sequence[str]) -> dict[str, Any] | None:
subpath = self.node.joinpath(*parts)
try:
# Avoid following symlinks (Python 3.10+)
if sys.version_info >= (3, 10):
stat = subpath.stat(follow_symlinks=False)
else:
stat = subpath.lstat()
stat = subpath.stat(follow_symlinks=False)
except OSError:
return None
return {
Expand Down Expand Up @@ -252,14 +246,14 @@ def _get_subnode(cls, node: Path, part: str) -> Path:


class SubscriptableAccessor(
NodeAccessor[Union[Subscriptable[SK, SV], SV], SK, SV], Generic[SK, SV]
NodeAccessor[Subscriptable[SK, SV] | SV, SK, SV], Generic[SK, SV]
):
"""Accessor for subscriptable content."""

@classmethod
def _get_subnode(
cls, node: Union[Subscriptable[SK, SV], SV], part: SK
) -> Union[Subscriptable[SK, SV], SV]:
cls, node: Subscriptable[SK, SV] | SV, part: SK
) -> Subscriptable[SK, SV] | SV:
if not isinstance(node, Subscriptable):
raise KeyError(part)
try:
Expand All @@ -271,13 +265,13 @@ def _get_subnode(
class CachedSubscriptableAccessor(
SubscriptableAccessor[CSK, CSV], Generic[CSK, CSV]
):
def __init__(self, node: Union[Subscriptable[CSK, CSV], CSV]):
def __init__(self, node: Subscriptable[CSK, CSV] | CSV):
super().__init__(node)

# Per-instance cache: avoids global strong references and id-reuse hazards.
# Default maxsize matches functools.lru_cache default (128).
self._cache_enabled = True
self._cache_maxsize: Optional[int] = 128
self._cache_maxsize: int | None = 128
self._cache: OrderedDict[tuple[CSK, ...], CSV] = OrderedDict()

def clear_cache(self) -> None:
Expand All @@ -289,7 +283,7 @@ def disable_cache(self) -> None:
self._cache_enabled = False
self._cache.clear()

def enable_cache(self, *, maxsize: Optional[int] = 128) -> None:
def enable_cache(self, *, maxsize: int | None = 128) -> None:
"""Enable caching for this accessor instance.

Args:
Expand Down Expand Up @@ -331,23 +325,23 @@ class LookupAccessor(CachedSubscriptableAccessor[LookupKey, LookupValue]):

@classmethod
def _is_traversable_node(cls, node: LookupNode) -> bool:
return isinstance(node, Mapping) or isinstance(node, list)
return isinstance(node, Mapping | list)

def stat(self, parts: Sequence[LookupKey]) -> Union[dict[str, Any], None]:
def stat(self, parts: Sequence[LookupKey]) -> dict[str, Any] | None:
try:
node = self[parts]
except KeyError:
return None

if self._is_traversable_node(node):
return {
"type": type(node).__name__,
"length": len(node),
}
try:
length = len(node)
except TypeError:
length = None
length: int | None
match node:
case Mapping() | list():
length = len(node)
case _:
try:
length = len(node)
except TypeError:
length = None

return {
"type": type(node).__name__,
Expand All @@ -360,38 +354,39 @@ def contains(self, parts: Sequence[LookupKey], key: LookupKey) -> bool:
except KeyError:
return False

if isinstance(node, Mapping):
return key in node

if isinstance(node, list):
return isinstance(key, int) and 0 <= key < len(node)

return False
match node:
case Mapping():
return key in node
case list() as items:
return isinstance(key, int) and 0 <= key < len(items)
case _:
return False

def require_child(
self, parts: Sequence[LookupKey], key: LookupKey
) -> None:
# Validate parent path for intermediate diagnostics.
node = self[parts]

if isinstance(node, Mapping):
if key not in node:
raise KeyError(key)
return

if isinstance(node, list):
if not (isinstance(key, int) and 0 <= key < len(node)):
match node:
case Mapping():
if key not in node:
raise KeyError(key)
return
case list() as items:
if not (isinstance(key, int) and 0 <= key < len(items)):
raise KeyError(key)
return
case _:
raise KeyError(key)
return

raise KeyError(key)

def keys(self, parts: Sequence[LookupKey]) -> Sequence[LookupKey]:
node = self[parts]
if isinstance(node, Mapping):
return list(node.keys())
if isinstance(node, list):
return list(range(len(node)))
match node:
case Mapping():
return list(node.keys())
case list() as items:
return list(range(len(items)))
# Non-traversable leaf.
if parts:
raise KeyError(parts[-1])
Expand Down
30 changes: 13 additions & 17 deletions pathable/parsers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Pathable parsers module"""

from __future__ import annotations

from collections.abc import Hashable
from typing import Sequence

Expand All @@ -12,6 +10,17 @@ def parse_parts(
parts: Sequence[Hashable | None], sep: str = SEPARATOR
) -> list[Hashable]:
"""Parse (filter and split) path parts."""

def append_split(part: str) -> None:
if not part or part == ".":
return
if sep_check in part:
for split_part in reversed(part.split(sep_check)):
if split_part and split_part != ".":
append(split_part)
return
append(part)

parsed: list[Hashable] = []
append = parsed.append
sep_check = sep
Expand All @@ -24,24 +33,11 @@ def parse_parts(
continue
# Fast-path: str is most common.
if isinstance(part, str):
if part and part != ".":
if sep_check in part:
for x in reversed(part.split(sep_check)):
if x and x != ".":
append(x)
else:
append(part)
append_split(part)
continue
# Fast-path: bytes, decode then treat as str.
if isinstance(part, bytes):
part = part.decode("ascii")
if part and part != ".":
if sep_check in part:
for x in reversed(part.split(sep_check)):
if x and x != ".":
append(x)
else:
append(part)
append_split(part.decode("ascii"))
continue
# Fallback: Hashable (covers e.g. tuple, custom keys).
if isinstance(part, Hashable):
Expand Down
Loading