From c029379f3d0c7f9af0807819633c6fe7750a961d Mon Sep 17 00:00:00 2001 From: ultrasynergy Date: Wed, 22 Apr 2026 10:53:49 -0600 Subject: [PATCH 1/4] update the logging style of task query: added time window presentation --- Scweet/runner.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/Scweet/runner.py b/Scweet/runner.py index 5aabf2d..5652a26 100644 --- a/Scweet/runner.py +++ b/Scweet/runner.py @@ -956,9 +956,20 @@ async def _search_worker( if unique_added > 0: if global_limit is not None: - logger.info("Collected %d / %d tweets", total_collected, global_limit) + logger.info( + "%s ~ %s: Collected %d / %d tweets", + task_query.get("since"), + task_query.get("until"), + total_collected, + global_limit, + ) else: - logger.info("Collected %d tweets", total_collected) + logger.info( + "%s ~ %s: Collected %d tweets", + task_query.get("since"), + task_query.get("until"), + total_collected, + ) if page_unique_tweets and on_tweets_batch is not None: try: @@ -991,7 +1002,9 @@ async def _search_worker( ) if stop_due_to_empty_pages: logger.info( - "Search done (no more results) account=%s", + "%s ~ %s: Search done (no more results) account=%s", + task_query.get("since"), + task_query.get("until"), account.get("username"), ) From a54f9147d988b5631c5a82e26b312f0466c76cd6 Mon Sep 17 00:00:00 2001 From: ultrasynergy Date: Wed, 22 Apr 2026 14:02:16 -0600 Subject: [PATCH 2/4] fix the until value returns current time not 23:59:59 when it is not specified and the default is current datetime. --- Scweet/runner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Scweet/runner.py b/Scweet/runner.py index 5652a26..880415c 100644 --- a/Scweet/runner.py +++ b/Scweet/runner.py @@ -1,7 +1,7 @@ from __future__ import annotations import asyncio -from datetime import datetime +from datetime import datetime, timezone import hashlib import inspect import json @@ -1199,6 +1199,9 @@ def _normalize_timestamp(value: Optional[str], *, end_of_day: bool) -> Optional[ try: parsed = datetime.strptime(value, _DATE_FMT) if end_of_day: + now_utc = datetime.now(timezone.utc) + if parsed.date() == now_utc.date(): + return now_utc.replace(tzinfo=None).strftime(_TS_FMT) parsed = parsed.replace(hour=23, minute=59, second=59) return parsed.strftime(_TS_FMT) except Exception: From 4de715f5df35e524944cf2fcf0b09347572465aa Mon Sep 17 00:00:00 2001 From: ultrasynergy Date: Wed, 22 Apr 2026 14:53:08 -0600 Subject: [PATCH 3/4] update time splitting logic for concurrent tasks to exponential from linear --- DOCUMENTATION.md | 9 ++++- Scweet/config.py | 5 +++ Scweet/runner.py | 11 +++++- Scweet/scheduler.py | 89 +++++++++++++++++++++++++++++++++++--------- tests/test_runner.py | 38 +++++++++++++++++-- 5 files changed, 127 insertions(+), 25 deletions(-) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 6f6946e..136144f 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -467,8 +467,13 @@ s = Scweet( | `max_task_attempts` | `int` | `3` | Max retry attempts per task | | `max_fallback_attempts` | `int` | `3` | Max fallback attempts on failure | | `max_account_switches` | `int` | `2` | Max account switches per task | -| `scheduler_min_interval_s` | `int` | `300` | Minimum time interval split (seconds) | -| `n_splits` | `int` | `5` | Number of time interval splits for search | +| `scheduler_min_interval_s` | `int` | `300` | Legacy fallback (seconds): used as ``scheduler_exponential_min_s`` when that field is omitted | +| `scheduler_max_interval_s` | `int \| None` | `None` | Reserved / unused by the current hybrid scheduler | +| `scheduler_exponential_count` | `int` | `10` | How many slices at the **newest** end use exponential widths (capped by ``n_splits - 1`` when ``n_splits > 1``) | +| `scheduler_exponential_growth` | `float` | `2.0` | Multiplier between successive exponential slice widths (must be ``> 1``) | +| `scheduler_exponential_min_s` | `int` | `900` | Width of the newest slice (seconds), default 15 minutes | +| `scheduler_exponential_max_s` | `int` | `432000` | Maximum nominal width of any exponential slice (seconds), default 5 days | +| `n_splits` | `int` | `5` | Total number of search time windows: ``E`` exponential (newest) + ``n_splits - E`` equal-width windows over the older remainder | | `priority` | `int` | `1` | Task priority | | `proxy_check_on_lease` | `bool` | `True` | Verify proxy connectivity before leasing | | `proxy_check_url` | `str` | `"https://x.com/robots.txt"` | URL for proxy check | diff --git a/Scweet/config.py b/Scweet/config.py index 98778c8..0337fc4 100644 --- a/Scweet/config.py +++ b/Scweet/config.py @@ -51,6 +51,11 @@ class ScweetConfig(BaseModel): max_fallback_attempts: int = Field(default=3, ge=1) max_account_switches: int = Field(default=2, ge=0) scheduler_min_interval_s: int = Field(default=300, ge=1) + scheduler_max_interval_s: Optional[int] = Field(default=None, ge=1) + scheduler_exponential_count: int = Field(default=10, ge=1) + scheduler_exponential_growth: float = Field(default=2.0, gt=1.0) + scheduler_exponential_min_s: int = Field(default=900, ge=1) + scheduler_exponential_max_s: int = Field(default=432000, ge=1) n_splits: int = Field(default=5, ge=1) priority: int = 1 proxy_check_on_lease: bool = True diff --git a/Scweet/runner.py b/Scweet/runner.py index 880415c..dfcdc3d 100644 --- a/Scweet/runner.py +++ b/Scweet/runner.py @@ -221,12 +221,19 @@ async def run_search(self, search_request, on_tweets_batch: Optional[Any] = None run_id = str(created) n_intervals = max(1, int(_cfg(self.config, "n_splits", 1))) - min_interval_s = max(1, int(_cfg(self.config, "scheduler_min_interval_s", 300))) + exp_min_raw = _cfg(self.config, "scheduler_exponential_min_s", None) + if exp_min_raw is None: + exp_min_raw = _cfg(self.config, "scheduler_min_interval_s", 300) intervals = split_time_intervals( base_query["since"], base_query["until"], n_intervals, - min_interval_s, + exponential_count=int(_cfg(self.config, "scheduler_exponential_count", 10)), + exponential_min_s=max(1, int(exp_min_raw)), + exponential_max_s=max( + 1, int(_cfg(self.config, "scheduler_exponential_max_s", 432000)) + ), + exponential_growth=float(_cfg(self.config, "scheduler_exponential_growth", 2.0)), ) priority = int(_cfg(self.config, "priority", 1)) diff --git a/Scweet/scheduler.py b/Scweet/scheduler.py index 8429aa0..3d73d42 100644 --- a/Scweet/scheduler.py +++ b/Scweet/scheduler.py @@ -12,9 +12,23 @@ def split_time_intervals( since: str, until: str, n_intervals: int, - min_interval_seconds: int, + *, + exponential_count: int = 10, + exponential_min_s: int = 900, + exponential_max_s: int = 432000, + exponential_growth: float = 2.0, ) -> list[tuple[str, str]]: - """Split [since, until] into bounded intervals using actor-style semantics.""" + """Split ``[since, until]`` into contiguous intervals (oldest → newest). + + Up to ``min(exponential_count, n_intervals - 1)`` slices are carved from + ``until`` backward. Nominal widths are ``exponential_min_s * exponential_growth**i``, + each capped by ``exponential_max_s`` and by remaining span to ``since``. + + Any span left toward ``since`` is divided into ``n_intervals - E`` equal + windows, where ``E`` is the number of exponential slices produced (fewer if + the range is short). If exponential slices reach ``since`` exactly, no + uniform windows are added. + """ since_dt = datetime.strptime(since, _TS_FMT) until_dt = datetime.strptime(until, _TS_FMT) total_seconds = (until_dt - since_dt).total_seconds() @@ -22,21 +36,62 @@ def split_time_intervals( if total_seconds <= 0: return [(since, until)] - intervals_count = max(1, int(n_intervals)) - min_interval_seconds = max(1, int(min_interval_seconds)) - max_intervals_allowed = max(1, int(total_seconds // min_interval_seconds)) - intervals_count = min(intervals_count, max_intervals_allowed) - - interval_seconds = total_seconds / intervals_count - intervals: list[tuple[str, str]] = [] - for idx in range(intervals_count): - start_dt = since_dt + timedelta(seconds=idx * interval_seconds) - if idx == intervals_count - 1: - end_dt = until_dt - else: - end_dt = since_dt + timedelta(seconds=(idx + 1) * interval_seconds) - intervals.append((start_dt.strftime(_TS_FMT), end_dt.strftime(_TS_FMT))) - return intervals + n_intervals = max(1, int(n_intervals)) + if n_intervals == 1: + return [(since, until)] + + exp_n_cfg = max(1, int(exponential_count)) + exp_min = max(1, int(exponential_min_s)) + exp_max = max(exp_min, int(exponential_max_s)) + gf = float(exponential_growth) + if gf <= 1.0: + gf = 2.0 + + # Reserve at least one uniform slot when n_intervals > 1: E <= n_intervals - 1. + e_target = min(exp_n_cfg, n_intervals - 1) + + widths = [min(exp_min * (gf**i), float(exp_max)) for i in range(e_target)] + + exp_rev: list[tuple[datetime, datetime]] = [] + cur_right = until_dt + for wi in widths: + if cur_right <= since_dt: + break + span_left = (cur_right - since_dt).total_seconds() + if span_left <= 0: + break + seg = min(float(wi), span_left) + cur_left = cur_right - timedelta(seconds=seg) + if cur_left < since_dt: + cur_left = since_dt + exp_rev.append((cur_left, cur_right)) + cur_right = cur_left + if cur_left <= since_dt: + break + + exp_chrono = list(reversed(exp_rev)) + e_used = len(exp_chrono) + + remainder_start = cur_right + rem_sec = (remainder_start - since_dt).total_seconds() + + uniform: list[tuple[datetime, datetime]] = [] + if rem_sec > 0: + m_uniform = n_intervals - e_used + if m_uniform < 1: + m_uniform = 1 + step = rem_sec / m_uniform + t_left = since_dt + for j in range(m_uniform): + if j == m_uniform - 1: + t_right = remainder_start + else: + t_right = since_dt + timedelta(seconds=step * (j + 1)) + uniform.append((t_left, t_right)) + t_left = t_right + + out = uniform + exp_chrono + return [(a.strftime(_TS_FMT), b.strftime(_TS_FMT)) for a, b in out] def build_tasks_for_intervals( diff --git a/tests/test_runner.py b/tests/test_runner.py index 768a4c0..2f7dd69 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -84,20 +84,50 @@ def test_split_time_intervals_and_task_generation(): since = "2026-02-01_00:00:00_UTC" until = "2026-02-01_01:00:00_UTC" - intervals = split_time_intervals(since, until, n_intervals=10, min_interval_seconds=900) - assert len(intervals) == 4 + intervals = split_time_intervals( + since, + until, + n_intervals=10, + exponential_count=10, + exponential_min_s=900, + exponential_max_s=432000, + exponential_growth=2.0, + ) + # Newest side: 15m + 30m + 15m = 1h; no uniform remainder. + assert len(intervals) == 3 assert intervals[0][0] == since assert intervals[-1][1] == until + assert intervals[0] == (since, "2026-02-01_00:15:00_UTC") + assert intervals[1] == ("2026-02-01_00:15:00_UTC", "2026-02-01_00:45:00_UTC") + assert intervals[2] == ("2026-02-01_00:45:00_UTC", until) base_query = {"words": ["bitcoin"], "since": since, "until": until} tasks = build_tasks_for_intervals(base_query, run_id="run-1", priority=7, intervals=intervals) - assert len(tasks) == 4 - assert len({task["task_id"] for task in tasks}) == 4 + assert len(tasks) == 3 + assert len({task["task_id"] for task in tasks}) == 3 assert all(task["run_id"] == "run-1" for task in tasks) assert all(task["priority"] == 7 for task in tasks) assert all(task["query"]["cursor"] is None for task in tasks) +def test_split_time_intervals_uniform_tail(): + since = "2026-02-01_00:00:00_UTC" + until = "2026-02-01_04:00:00_UTC" + intervals = split_time_intervals( + since, + until, + n_intervals=5, + exponential_count=2, + exponential_min_s=900, + exponential_max_s=432000, + exponential_growth=2.0, + ) + assert len(intervals) == 5 + assert intervals[0][0] == since + assert intervals[-1][1] == until + assert intervals[-1][0] == "2026-02-01_03:45:00_UTC" + + def test_in_memory_task_queue_lease_ack_retry_fail_cancel(): async def _run(): queue = InMemoryTaskQueue() From d810056f7cd3f443764fcca17df137b7b723b2e2 Mon Sep 17 00:00:00 2001 From: ultrasynergy Date: Wed, 22 Apr 2026 14:53:56 -0600 Subject: [PATCH 4/4] fix time window execute order from oldest -> newest to newest -> oldest --- Scweet/scheduler.py | 8 ++++++-- tests/test_runner.py | 16 ++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/Scweet/scheduler.py b/Scweet/scheduler.py index 3d73d42..8114a93 100644 --- a/Scweet/scheduler.py +++ b/Scweet/scheduler.py @@ -18,7 +18,11 @@ def split_time_intervals( exponential_max_s: int = 432000, exponential_growth: float = 2.0, ) -> list[tuple[str, str]]: - """Split ``[since, until]`` into contiguous intervals (oldest → newest). + """Split ``[since, until]`` into contiguous intervals (newest → oldest). + + Intervals are ordered for task scheduling: the window ending at ``until`` + appears first so concurrent workers dequeue recent time ranges before older + ones. Contiguous coverage of ``[since, until]`` is unchanged. Up to ``min(exponential_count, n_intervals - 1)`` slices are carved from ``until`` backward. Nominal widths are ``exponential_min_s * exponential_growth**i``, @@ -91,7 +95,7 @@ def split_time_intervals( t_left = t_right out = uniform + exp_chrono - return [(a.strftime(_TS_FMT), b.strftime(_TS_FMT)) for a, b in out] + return [(a.strftime(_TS_FMT), b.strftime(_TS_FMT)) for a, b in reversed(out)] def build_tasks_for_intervals( diff --git a/tests/test_runner.py b/tests/test_runner.py index 2f7dd69..60acfac 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -93,13 +93,13 @@ def test_split_time_intervals_and_task_generation(): exponential_max_s=432000, exponential_growth=2.0, ) - # Newest side: 15m + 30m + 15m = 1h; no uniform remainder. + # Newest side: 15m + 30m + 15m = 1h; no uniform remainder. Recent-first order. assert len(intervals) == 3 - assert intervals[0][0] == since - assert intervals[-1][1] == until - assert intervals[0] == (since, "2026-02-01_00:15:00_UTC") + assert intervals[-1][0] == since + assert intervals[0][1] == until + assert intervals[0] == ("2026-02-01_00:45:00_UTC", until) assert intervals[1] == ("2026-02-01_00:15:00_UTC", "2026-02-01_00:45:00_UTC") - assert intervals[2] == ("2026-02-01_00:45:00_UTC", until) + assert intervals[2] == (since, "2026-02-01_00:15:00_UTC") base_query = {"words": ["bitcoin"], "since": since, "until": until} tasks = build_tasks_for_intervals(base_query, run_id="run-1", priority=7, intervals=intervals) @@ -123,9 +123,9 @@ def test_split_time_intervals_uniform_tail(): exponential_growth=2.0, ) assert len(intervals) == 5 - assert intervals[0][0] == since - assert intervals[-1][1] == until - assert intervals[-1][0] == "2026-02-01_03:45:00_UTC" + assert intervals[-1][0] == since + assert intervals[0][1] == until + assert intervals[0][0] == "2026-02-01_03:45:00_UTC" def test_in_memory_task_queue_lease_ack_retry_fail_cancel():