Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a7b0d47
add support for python 3.11
toluaina Oct 27, 2022
815fab9
change build matrix to 3.11-dev
toluaina Oct 27, 2022
c7142d7
change ci build template
toluaina Oct 27, 2022
60f7e14
change to non dev vesion of python 3.11
toluaina Oct 27, 2022
d619c8d
add python 3.11 to pyproject.toml
toluaina Oct 27, 2022
a4b5af0
update package deps
toluaina Oct 31, 2022
39540bc
fixed tests
toluaina Oct 31, 2022
3fe7ffe
handle search_phase_execution_exception with incompatible data types …
toluaina Nov 8, 2022
dc191af
Fix call to `sorted()` on dicts (#381)
loren Nov 13, 2022
d559b6f
refomatting files
toluaina Nov 13, 2022
22b665b
Use STREAM_RESULTS in call to execute (#383)
loren Nov 15, 2022
46d661b
bumped up requirements
toluaina Nov 17, 2022
07d6351
Query optimization for root level nodes
toluaina Nov 18, 2022
3d79c48
rename load_config to config_loader and more rocust query chunking
toluaina Nov 19, 2022
5502f36
add mutually_exclusive option to analyze
toluaina Nov 19, 2022
d4bd36e
add test for query builder sync in batches
toluaina Nov 19, 2022
374874b
updated tests
toluaina Nov 20, 2022
36f4284
only validate each node once in polling mode
toluaina Nov 22, 2022
04ee786
ensure qsize is > 0 before bulk_pop
toluaina Nov 23, 2022
f8f48f8
remove poorly implemented extra args
toluaina Nov 23, 2022
d549e91
updated requirements
toluaina Dec 1, 2022
b0f6cbd
Make Sync a singleton based on unique node attributes
toluaina Dec 2, 2022
50823ba
add check for database existence
toluaina Dec 3, 2022
a7d3c16
update elasticsearch docker version
toluaina Dec 3, 2022
6d827fb
update elasticsearch docker version
toluaina Dec 3, 2022
718c1a2
Pin flake8 to stable 5.0.4
toluaina Dec 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# USE_ASYNC=False
# JOIN_QUERIES=True
# STREAM_RESULTS=True
# db polling interval
# POLL_INTERVAL=0.1
# FILTER_CHUNK_SIZE=5000

# Elasticsearch
# ELASTICSEARCH_SCHEME=http
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/python-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: [3.7, 3.8, 3.9, '3.10']
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
services:
postgres:
image: debezium/postgres:15
Expand All @@ -25,7 +25,7 @@ jobs:
ports:
- 6379:6379
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
ports:
- 9200:9200
- 9300:9300
Expand All @@ -34,9 +34,9 @@ jobs:
network.host: 127.0.0.1
http.host: 0.0.0.0
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
4 changes: 2 additions & 2 deletions bin/bootstrap
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import logging
import click

from pgsync.sync import Sync
from pgsync.utils import get_config, load_config, show_settings
from pgsync.utils import config_loader, get_config, show_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,7 +54,7 @@ def main(teardown, config, user, password, host, port, verbose):

show_settings(config)

for document in load_config(config):
for document in config_loader(config):
sync: Sync = Sync(
document, verbose=verbose, repl_slots=False, **kwargs
)
Expand Down
6 changes: 4 additions & 2 deletions bin/es_mapping
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ from elasticsearch import Elasticsearch, helpers

from pgsync.settings import ELASTICSEARCH_TIMEOUT, ELASTICSEARCH_VERIFY_CERTS
from pgsync.urls import get_elasticsearch_url
from pgsync.utils import get_config, load_config, timeit
from pgsync.utils import config_loader, get_config, timeit

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -114,7 +114,9 @@ def main(config):
"""Create custom NGram analyzer for the default mapping."""

config: str = get_config(config)
for index in set([document["index"] for document in load_config(config)]):
for index in set(
[document["index"] for document in config_loader(config)]
):
create_es_mapping(index)


Expand Down
132 changes: 102 additions & 30 deletions bin/parallel_sync
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ and row numbers.
import asyncio
import multiprocessing
import os
import re
import sys
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
Expand All @@ -54,9 +55,32 @@ from typing import Generator, Optional, Union
import click
import sqlalchemy as sa

from pgsync.settings import BLOCK_SIZE
from pgsync.settings import BLOCK_SIZE, CHECKPOINT_PATH
from pgsync.sync import Sync
from pgsync.utils import get_config, load_config, show_settings, timeit
from pgsync.utils import (
compiled_query,
config_loader,
get_config,
show_settings,
timeit,
)


def save_ctid(page: int, row: int, name: str) -> None:
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid")
with open(checkpoint_file, "w+") as fp:
fp.write(f"{page},{row}\n")


def read_ctid(name: str) -> None:
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid")
if os.path.exists(checkpoint_file):
with open(checkpoint_file, "r") as fp:
pairs: str = fp.read().split()[0].split(",")
page = int(pairs[0])
row = int(pairs[1])
return page, row
return None, None


def logical_slot_changes(
Expand Down Expand Up @@ -92,17 +116,71 @@ class Task:


@timeit
def fetch_tasks(doc: dict, block_size: Optional[int] = None) -> Generator:
def fetch_tasks(
doc: dict,
block_size: Optional[int] = None,
) -> Generator:
block_size = block_size or BLOCK_SIZE
pages: dict = {}
sync: Sync = Sync(doc)
page: Optional[int] = None
row: Optional[int] = None
name: str = re.sub(
"[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}"
)
page, row = read_ctid(name=name)
statement: sa.sql.Select = sa.select(
[
sa.literal_column("1").label("x"),
sa.literal_column("1").label("y"),
sa.column("ctid"),
]
).select_from(sync.tree.root.model)

# filter by Page
if page:
statement = statement.where(
sa.cast(
sa.func.SPLIT_PART(
sa.func.REPLACE(
sa.func.REPLACE(
sa.cast(sa.column("ctid"), sa.Text),
"(",
"",
),
")",
"",
),
",",
1,
),
sa.Integer,
)
> page
)

# filter by Row
if row:
statement = statement.where(
sa.cast(
sa.func.SPLIT_PART(
sa.func.REPLACE(
sa.func.REPLACE(
sa.cast(sa.column("ctid"), sa.Text),
"(",
"",
),
")",
"",
),
",",
2,
),
sa.Integer,
)
> row
)

i: int = 1
for _, _, ctid in sync.fetchmany(statement):
value: list = ctid[0].split(",")
Expand Down Expand Up @@ -175,7 +253,6 @@ def multithreaded(
queue.put(task)

queue.join() # block until all tasks are done

logical_slot_changes(doc, verbose=verbose, validate=validate)


Expand All @@ -194,7 +271,6 @@ def multiprocess(
list(executor.map(task.process, tasks))
except Exception as e:
sys.stdout.write(f"Exception: {e}\n")

logical_slot_changes(doc, verbose=verbose, validate=validate)


Expand All @@ -209,13 +285,9 @@ def multithreaded_async(
sys.stdout.write("Multi-threaded async\n")
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=nprocs)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_tasks(executor, tasks, doc, verbose=verbose, validate=validate)
)
finally:
event_loop.close()

event_loop.run_until_complete(
run_tasks(executor, tasks, doc, verbose=verbose, validate=validate)
)
logical_slot_changes(doc, verbose=verbose, validate=validate)


Expand All @@ -234,9 +306,8 @@ def multiprocess_async(
event_loop.run_until_complete(
run_tasks(executor, tasks, doc, verbose=verbose, validate=validate)
)
finally:
event_loop.close()

except KeyboardInterrupt:
pass
logical_slot_changes(doc, verbose=verbose, validate=validate)


Expand All @@ -247,24 +318,19 @@ async def run_tasks(
verbose: bool = False,
validate: bool = False,
) -> None:
event_loop = asyncio.get_event_loop()
sync: Optional[Sync] = None
if isinstance(executor, ThreadPoolExecutor):
# threads can share a common Sync object
sync: Sync = Sync(doc, verbose=verbose, validate=validate)
tasks: list = [
event_loop.run_in_executor(
executor, run_task, task, sync, None, verbose, validate
)
for task in tasks
]
else:
tasks = [
sync = Sync(doc, verbose=verbose, validate=validate)
event_loop = asyncio.get_event_loop()
completed, pending = await asyncio.wait(
[
event_loop.run_in_executor(
executor, run_task, task, None, doc, verbose, validate
executor, run_task, task, sync, doc, verbose, validate
)
for task in tasks
]
completed, pending = await asyncio.wait(tasks)
)
results: list = [task.result() for task in completed]
print("results: {!r}".format(results))
print("exiting")
Expand All @@ -286,7 +352,14 @@ def run_task(
sync.index,
sync.sync(ctid=task, txmin=txmin, txmax=txmax),
)
print("run_task complete")
if len(task) > 0:
page: int = max(task.keys())
row: int = max(task[page])
name: str = re.sub(
"[^0-9a-zA-Z_]+", "", f"{sync.database.lower()}_{sync.index}"
)
save_ctid(page=page, row=row, name=name)

return 1


Expand Down Expand Up @@ -331,14 +404,13 @@ def main(config, nprocs, mode, verbose):
"""
TODO:
- Track progress across cpus/threads
- Save ctid
- Handle KeyboardInterrupt Exception
"""

show_settings()
config: str = get_config(config)

for document in load_config(config):
for document in config_loader(config):
tasks: Generator = fetch_tasks(document)
if mode == "synchronous":
synchronous(tasks, document, verbose=verbose)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
image: redis
command: redis-server --requirepass PLEASE_CHANGE_ME
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.6
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
ports:
- "9201:9200"
- "9301:9300"
Expand Down
4 changes: 2 additions & 2 deletions examples/airbnb/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pgsync.base import pg_engine, subtransactions
from pgsync.helper import teardown
from pgsync.utils import get_config, load_config
from pgsync.utils import config_loader, get_config


@click.command()
Expand All @@ -21,7 +21,7 @@ def main(config):

config: str = get_config(config)
teardown(drop_db=False, config=config)
document: dict = next(load_config(config))
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
Expand Down
4 changes: 2 additions & 2 deletions examples/airbnb/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pgsync.base import create_database, pg_engine
from pgsync.helper import teardown
from pgsync.utils import get_config, load_config
from pgsync.utils import config_loader, get_config

Base = declarative_base()

Expand Down Expand Up @@ -91,7 +91,7 @@ class Review(Base):


def setup(config: str) -> None:
for document in load_config(config):
for document in config_loader(config):
database: str = document.get("database", document["index"])
create_database(database)
with pg_engine(database) as engine:
Expand Down
4 changes: 2 additions & 2 deletions examples/ancestry/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pgsync.base import pg_engine, subtransactions
from pgsync.helper import teardown
from pgsync.utils import get_config, load_config
from pgsync.utils import config_loader, get_config


@click.command()
Expand All @@ -18,7 +18,7 @@ def main(config):

config: str = get_config(config)
teardown(drop_db=False, config=config)
document: dict = next(load_config(config))
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
Expand Down
4 changes: 2 additions & 2 deletions examples/ancestry/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pgsync.base import create_database, pg_engine
from pgsync.helper import teardown
from pgsync.utils import get_config, load_config
from pgsync.utils import config_loader, get_config

Base = declarative_base()

Expand Down Expand Up @@ -49,7 +49,7 @@ class GreatGrandChild(Base):


def setup(config: str) -> None:
for document in load_config(config):
for document in config_loader(config):
database: str = document.get("database", document["index"])
create_database(database)
with pg_engine(database) as engine:
Expand Down
Loading