-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexport_state_store.py
More file actions
145 lines (127 loc) · 4.64 KB
/
export_state_store.py
File metadata and controls
145 lines (127 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Shared export_state.json locking and atomic I/O for API and CLI."""
from __future__ import annotations
import json
import os
import tempfile
import threading
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, cast
from models.export import ExportStateDict
fcntl: Any
try:
import fcntl as _fcntl_mod
fcntl = _fcntl_mod
except ImportError:
fcntl = None
msvcrt: Any
try:
import msvcrt as _msvcrt_mod
msvcrt = _msvcrt_mod
except ImportError:
msvcrt = None
# Only when neither fcntl nor msvcrt exists (very rare): same-process only.
_fallback_locks: dict[str, threading.Lock] = {}
_fallback_locks_guard = threading.Lock()
EXPORT_STATE_FILE = os.path.join(
os.path.expanduser("~"), ".claude-code-chat-browser", "export_state.json"
)
def _fallback_lock_for(path: str) -> threading.Lock:
with _fallback_locks_guard:
if path not in _fallback_locks:
_fallback_locks[path] = threading.Lock()
return _fallback_locks[path]
@contextmanager
def export_state_lock(state_path: str | None = None) -> Iterator[None]:
"""Serialize export_state.json reads/writes across processes.
POSIX: ``flock`` on a sidecar ``*.lock`` file. Windows: ``msvcrt.locking`` on
the same sidecar (byte-range lock). If neither is available, falls back to
a per-path ``threading.Lock`` (same process only).
"""
path = EXPORT_STATE_FILE if state_path is None else state_path
if fcntl is not None:
lock_path = path + ".lock"
dir_name = os.path.dirname(lock_path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)
lock_fp = open(lock_path, "a+", encoding="utf-8")
try:
fcntl.flock(lock_fp.fileno(), fcntl.LOCK_EX)
yield
finally:
fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN)
lock_fp.close()
elif msvcrt is not None:
lock_path = path + ".lock"
dir_name = os.path.dirname(lock_path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)
if not os.path.exists(lock_path):
with open(lock_path, "wb") as f:
f.write(b"\x00")
lock_fp = open(lock_path, "r+b") # type: ignore[assignment]
try:
if os.path.getsize(lock_path) == 0:
lock_fp.write(b"\x00") # type: ignore[arg-type]
lock_fp.flush()
lock_fp.seek(0)
msvcrt.locking(lock_fp.fileno(), msvcrt.LK_LOCK, 1)
try:
yield
finally:
lock_fp.seek(0)
msvcrt.locking(lock_fp.fileno(), msvcrt.LK_UNLCK, 1)
finally:
lock_fp.close()
else:
with _fallback_lock_for(path):
yield
def load_export_state_from_disk(state_path: str | None = None) -> ExportStateDict:
"""Load state from disk (call under :func:`export_state_lock` for consistency).
Migrates legacy flat ``{session_id: mtime, ...}`` to ``{"sessions": ...}``.
Returns a dict with a mapping ``sessions``; malformed top-level values or
a non-dict ``sessions`` entry are sanitized so callers always see a dict.
"""
path = EXPORT_STATE_FILE if state_path is None else state_path
if not os.path.isfile(path):
return {}
try:
with open(path, encoding="utf-8") as f:
data = json.load(f)
except Exception:
return {}
if not isinstance(data, dict):
return {}
if "sessions" not in data and "lastExportTime" not in data:
return {"sessions": data}
if not isinstance(data.get("sessions"), dict):
data = dict(data)
data["sessions"] = {}
return cast(ExportStateDict, data)
def atomic_write_export_state(
state: ExportStateDict, state_path: str | None = None
) -> None:
"""Write *state* atomically (serialize, temp file + fsync + replace).
Call under :func:`export_state_lock` matching *state_path*.
"""
path = EXPORT_STATE_FILE if state_path is None else state_path
dir_name = os.path.dirname(path) or "."
os.makedirs(dir_name, exist_ok=True)
try:
payload = json.dumps(state, indent=2)
except (TypeError, ValueError) as e:
raise ValueError(f"export state is not JSON-serializable: {e}") from e
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(payload)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
try:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
except OSError:
pass
raise