Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,13 @@ s = Scweet(
| `max_task_attempts` | `int` | `3` | Max retry attempts per task |
| `max_fallback_attempts` | `int` | `3` | Max fallback attempts on failure |
| `max_account_switches` | `int` | `2` | Max account switches per task |
| `scheduler_min_interval_s` | `int` | `300` | Minimum time interval split (seconds) |
| `n_splits` | `int` | `5` | Number of time interval splits for search |
| `scheduler_min_interval_s` | `int` | `300` | Legacy fallback (seconds): used as ``scheduler_exponential_min_s`` when that field is omitted |
| `scheduler_max_interval_s` | `int \| None` | `None` | Reserved / unused by the current hybrid scheduler |
| `scheduler_exponential_count` | `int` | `10` | How many slices at the **newest** end use exponential widths (capped by ``n_splits - 1`` when ``n_splits > 1``) |
| `scheduler_exponential_growth` | `float` | `2.0` | Multiplier between successive exponential slice widths (must be ``> 1``) |
| `scheduler_exponential_min_s` | `int` | `900` | Width of the newest slice (seconds), default 15 minutes |
| `scheduler_exponential_max_s` | `int` | `432000` | Maximum nominal width of any exponential slice (seconds), default 5 days |
| `n_splits` | `int` | `5` | Total number of search time windows: ``E`` exponential (newest) + ``n_splits - E`` equal-width windows over the older remainder |
| `priority` | `int` | `1` | Task priority |
| `proxy_check_on_lease` | `bool` | `True` | Verify proxy connectivity before leasing |
| `proxy_check_url` | `str` | `"https://x.com/robots.txt"` | URL for proxy check |
Expand Down
5 changes: 5 additions & 0 deletions Scweet/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class ScweetConfig(BaseModel):
max_fallback_attempts: int = Field(default=3, ge=1)
max_account_switches: int = Field(default=2, ge=0)
scheduler_min_interval_s: int = Field(default=300, ge=1)
scheduler_max_interval_s: Optional[int] = Field(default=None, ge=1)
scheduler_exponential_count: int = Field(default=10, ge=1)
scheduler_exponential_growth: float = Field(default=2.0, gt=1.0)
scheduler_exponential_min_s: int = Field(default=900, ge=1)
scheduler_exponential_max_s: int = Field(default=432000, ge=1)
n_splits: int = Field(default=5, ge=1)
priority: int = 1
proxy_check_on_lease: bool = True
Expand Down
35 changes: 29 additions & 6 deletions Scweet/runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import asyncio
from datetime import datetime
from datetime import datetime, timezone
import hashlib
import inspect
import json
Expand Down Expand Up @@ -221,12 +221,19 @@ async def run_search(self, search_request, on_tweets_batch: Optional[Any] = None
run_id = str(created)

n_intervals = max(1, int(_cfg(self.config, "n_splits", 1)))
min_interval_s = max(1, int(_cfg(self.config, "scheduler_min_interval_s", 300)))
exp_min_raw = _cfg(self.config, "scheduler_exponential_min_s", None)
if exp_min_raw is None:
exp_min_raw = _cfg(self.config, "scheduler_min_interval_s", 300)
intervals = split_time_intervals(
base_query["since"],
base_query["until"],
n_intervals,
min_interval_s,
exponential_count=int(_cfg(self.config, "scheduler_exponential_count", 10)),
exponential_min_s=max(1, int(exp_min_raw)),
exponential_max_s=max(
1, int(_cfg(self.config, "scheduler_exponential_max_s", 432000))
),
exponential_growth=float(_cfg(self.config, "scheduler_exponential_growth", 2.0)),
)

priority = int(_cfg(self.config, "priority", 1))
Expand Down Expand Up @@ -956,9 +963,20 @@ async def _search_worker(

if unique_added > 0:
if global_limit is not None:
logger.info("Collected %d / %d tweets", total_collected, global_limit)
logger.info(
"%s ~ %s: Collected %d / %d tweets",
task_query.get("since"),
task_query.get("until"),
total_collected,
global_limit,
)
else:
logger.info("Collected %d tweets", total_collected)
logger.info(
"%s ~ %s: Collected %d tweets",
task_query.get("since"),
task_query.get("until"),
total_collected,
)

if page_unique_tweets and on_tweets_batch is not None:
try:
Expand Down Expand Up @@ -991,7 +1009,9 @@ async def _search_worker(
)
if stop_due_to_empty_pages:
logger.info(
"Search done (no more results) account=%s",
"%s ~ %s: Search done (no more results) account=%s",
task_query.get("since"),
task_query.get("until"),
account.get("username"),
)

Expand Down Expand Up @@ -1186,6 +1206,9 @@ def _normalize_timestamp(value: Optional[str], *, end_of_day: bool) -> Optional[
try:
parsed = datetime.strptime(value, _DATE_FMT)
if end_of_day:
now_utc = datetime.now(timezone.utc)
if parsed.date() == now_utc.date():
return now_utc.replace(tzinfo=None).strftime(_TS_FMT)
parsed = parsed.replace(hour=23, minute=59, second=59)
return parsed.strftime(_TS_FMT)
except Exception:
Expand Down
93 changes: 76 additions & 17 deletions Scweet/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,90 @@ def split_time_intervals(
since: str,
until: str,
n_intervals: int,
min_interval_seconds: int,
*,
exponential_count: int = 10,
exponential_min_s: int = 900,
exponential_max_s: int = 432000,
exponential_growth: float = 2.0,
) -> list[tuple[str, str]]:
"""Split [since, until] into bounded intervals using actor-style semantics."""
"""Split ``[since, until]`` into contiguous intervals (newest → oldest).

Intervals are ordered for task scheduling: the window ending at ``until``
appears first so concurrent workers dequeue recent time ranges before older
ones. Contiguous coverage of ``[since, until]`` is unchanged.

Up to ``min(exponential_count, n_intervals - 1)`` slices are carved from
``until`` backward. Nominal widths are ``exponential_min_s * exponential_growth**i``,
each capped by ``exponential_max_s`` and by remaining span to ``since``.

Any span left toward ``since`` is divided into ``n_intervals - E`` equal
windows, where ``E`` is the number of exponential slices produced (fewer if
the range is short). If exponential slices reach ``since`` exactly, no
uniform windows are added.
"""
since_dt = datetime.strptime(since, _TS_FMT)
until_dt = datetime.strptime(until, _TS_FMT)
total_seconds = (until_dt - since_dt).total_seconds()

if total_seconds <= 0:
return [(since, until)]

intervals_count = max(1, int(n_intervals))
min_interval_seconds = max(1, int(min_interval_seconds))
max_intervals_allowed = max(1, int(total_seconds // min_interval_seconds))
intervals_count = min(intervals_count, max_intervals_allowed)

interval_seconds = total_seconds / intervals_count
intervals: list[tuple[str, str]] = []
for idx in range(intervals_count):
start_dt = since_dt + timedelta(seconds=idx * interval_seconds)
if idx == intervals_count - 1:
end_dt = until_dt
else:
end_dt = since_dt + timedelta(seconds=(idx + 1) * interval_seconds)
intervals.append((start_dt.strftime(_TS_FMT), end_dt.strftime(_TS_FMT)))
return intervals
n_intervals = max(1, int(n_intervals))
if n_intervals == 1:
return [(since, until)]

exp_n_cfg = max(1, int(exponential_count))
exp_min = max(1, int(exponential_min_s))
exp_max = max(exp_min, int(exponential_max_s))
gf = float(exponential_growth)
if gf <= 1.0:
gf = 2.0

# Reserve at least one uniform slot when n_intervals > 1: E <= n_intervals - 1.
e_target = min(exp_n_cfg, n_intervals - 1)

widths = [min(exp_min * (gf**i), float(exp_max)) for i in range(e_target)]

exp_rev: list[tuple[datetime, datetime]] = []
cur_right = until_dt
for wi in widths:
if cur_right <= since_dt:
break
span_left = (cur_right - since_dt).total_seconds()
if span_left <= 0:
break
seg = min(float(wi), span_left)
cur_left = cur_right - timedelta(seconds=seg)
if cur_left < since_dt:
cur_left = since_dt
exp_rev.append((cur_left, cur_right))
cur_right = cur_left
if cur_left <= since_dt:
break

exp_chrono = list(reversed(exp_rev))
e_used = len(exp_chrono)

remainder_start = cur_right
rem_sec = (remainder_start - since_dt).total_seconds()

uniform: list[tuple[datetime, datetime]] = []
if rem_sec > 0:
m_uniform = n_intervals - e_used
if m_uniform < 1:
m_uniform = 1
step = rem_sec / m_uniform
t_left = since_dt
for j in range(m_uniform):
if j == m_uniform - 1:
t_right = remainder_start
else:
t_right = since_dt + timedelta(seconds=step * (j + 1))
uniform.append((t_left, t_right))
t_left = t_right

out = uniform + exp_chrono
return [(a.strftime(_TS_FMT), b.strftime(_TS_FMT)) for a, b in reversed(out)]


def build_tasks_for_intervals(
Expand Down
42 changes: 36 additions & 6 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,50 @@ def test_split_time_intervals_and_task_generation():
since = "2026-02-01_00:00:00_UTC"
until = "2026-02-01_01:00:00_UTC"

intervals = split_time_intervals(since, until, n_intervals=10, min_interval_seconds=900)
assert len(intervals) == 4
assert intervals[0][0] == since
assert intervals[-1][1] == until
intervals = split_time_intervals(
since,
until,
n_intervals=10,
exponential_count=10,
exponential_min_s=900,
exponential_max_s=432000,
exponential_growth=2.0,
)
# Newest side: 15m + 30m + 15m = 1h; no uniform remainder. Recent-first order.
assert len(intervals) == 3
assert intervals[-1][0] == since
assert intervals[0][1] == until
assert intervals[0] == ("2026-02-01_00:45:00_UTC", until)
assert intervals[1] == ("2026-02-01_00:15:00_UTC", "2026-02-01_00:45:00_UTC")
assert intervals[2] == (since, "2026-02-01_00:15:00_UTC")

base_query = {"words": ["bitcoin"], "since": since, "until": until}
tasks = build_tasks_for_intervals(base_query, run_id="run-1", priority=7, intervals=intervals)
assert len(tasks) == 4
assert len({task["task_id"] for task in tasks}) == 4
assert len(tasks) == 3
assert len({task["task_id"] for task in tasks}) == 3
assert all(task["run_id"] == "run-1" for task in tasks)
assert all(task["priority"] == 7 for task in tasks)
assert all(task["query"]["cursor"] is None for task in tasks)


def test_split_time_intervals_uniform_tail():
since = "2026-02-01_00:00:00_UTC"
until = "2026-02-01_04:00:00_UTC"
intervals = split_time_intervals(
since,
until,
n_intervals=5,
exponential_count=2,
exponential_min_s=900,
exponential_max_s=432000,
exponential_growth=2.0,
)
assert len(intervals) == 5
assert intervals[-1][0] == since
assert intervals[0][1] == until
assert intervals[0][0] == "2026-02-01_03:45:00_UTC"


def test_in_memory_task_queue_lease_ack_retry_fail_cancel():
async def _run():
queue = InMemoryTaskQueue()
Expand Down