diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index da2e751..0000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,65 +0,0 @@ -version: 2 - -references: - container_config: &container_config - docker: - # Main Python container - - image: circleci/python:3.6.2 - environment: - TARGET_POSTGRES_HOST: test_postgres - TARGET_POSTGRES_PORT: 5432 - TARGET_POSTGRES_USER: test_user - TARGET_POSTGRES_PASSWORD: my-secret-passwd - TARGET_POSTGRES_DBNAME: tap_postgres_test - TARGET_POSTGRES_SCHEMA: test_target_postgres - - # PostgreSQL service container image used as test source database - - image: postgres:11.4 - name: test_postgres - environment: - POSTGRES_USER: test_user - POSTGRES_PASSWORD: my-secret-passwd - POSTGRES_DB: tap_postgres_test - ports: - - 5432:5432 - -jobs: - build: - <<: *container_config - - steps: - - checkout - - - run: - name: 'Setup virtual environment' - command: | - python3 -m venv ./virtualenvs/target-postgres - . ./virtualenvs/target-postgres/bin/activate - pip install --upgrade pip - pip install -e .[test] - - - run: - name: 'Pylinting' - command: | - . ./virtualenvs/target-postgres/bin/activate - pylint --rcfile .pylintrc --disable duplicate-code target_postgres/ - - - run: - name: 'Unit Tests' - command: | - . ./virtualenvs/target-postgres/bin/activate - export LOGGING_CONF_FILE=$(pwd)/sample_logging.conf - nosetests --where=tests/unit - - - run: - name: 'Integration Tests' - command: | - . ./virtualenvs/target-postgres/bin/activate - export LOGGING_CONF_FILE=$(pwd)/sample_logging.conf - nosetests --where=tests/integration/ - -workflows: - version: 2 - build: - jobs: - - build \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..fd63b9c --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,44 @@ +name: CI + +on: + pull_request: + push: + branches: + - master + +jobs: + lint_and_test: + name: Linting and Testing + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ 3.6, 3.7, 3.8 ] + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Start PG test container + run: docker-compose up -d --build db + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup virtual environment + run: make venv + + - name: Pylinting + run: make pylint + + - name: Unit Tests + run: make unit_test + + - name: Integration Tests + env: + LOGGING_CONF_FILE: ./sample_logging.conf + run: make integration_test + + - name: Shutdown PG test container + run: docker-compose down diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..805a326 --- /dev/null +++ b/Makefile @@ -0,0 +1,25 @@ +venv: + python3 -m venv venv ;\ + . ./venv/bin/activate ;\ + pip install --upgrade pip setuptools wheel ;\ + pip install -e .[test] + +pylint: + . ./venv/bin/activate ;\ + pylint --rcfile .pylintrc target_postgres/ + +unit_test: + . ./venv/bin/activate ;\ + pytest --cov=target_postgres --cov-fail-under=44 tests/unit -v + +env: + export TARGET_POSTGRES_PORT=5432 + export TARGET_POSTGRES_DBNAME=target_db + export TARGET_POSTGRES_USER=my_user + export TARGET_POSTGRES_PASSWORD=secret + export TARGET_POSTGRES_HOST=localhost + export TARGET_POSTGRES_SCHEMA=public + +integration_test: env + . ./venv/bin/activate ;\ + pytest tests/integration --cov=target_postgres --cov-fail-under=87 -v diff --git a/README.md b/README.md index e8f839e..ce53899 100644 --- a/README.md +++ b/README.md @@ -23,43 +23,43 @@ installation instructions for [Mac](http://docs.python-guide.org/en/latest/start It's recommended to use a virtualenv: ```bash - python3 -m venv venv - pip install pipelinewise-target-postgres -``` - -or - -```bash - python3 -m venv venv - . venv/bin/activate - pip install --upgrade pip - pip install . +make venv ``` ### To run -Like any other target that's following the singer specificiation: +Like any other target that's following the singer specification: `some-singer-tap | target-postgres --config [config.json]` -It's reading incoming messages from STDIN and using the properites in `config.json` to upload data into Postgres. +It's reading incoming messages from STDIN and using the properties in `config.json` to upload data into Postgres. **Note**: To avoid version conflicts run `tap` and `targets` in separate virtual environments. -### Configuration settings -Running the the target connector requires a `config.json` file. An example with the minimal settings: +#### Spin up a PG DB + +Make use of the available docker-compose file to spin up a PG DB. - ```json - { - "host": "localhost", - "port": 5432, - "user": "my_user", - "password": "secret", - "dbname": "my_db_name", - "default_target_schema": "my_target_schema" - } - ``` +```bash +docker-compose up -d --build db +``` + + +### Configuration settings + +Running the target connector requires a `config.json` file. An example with the minimal settings: + +```json +{ + "host": "localhost", + "port": 5432, + "user": "my_user", + "password": "secret", + "dbname": "target_db", + "default_target_schema": "public" +} +``` Full list of options in `config.json`: @@ -96,33 +96,29 @@ Full list of options in `config.json`: export TARGET_POSTGRES_SCHEMA= ``` -2. Install python dependencies in a virtual env and run nose unit and integration tests +**PS**: You can run `make env` to export pre-defined environment variables + + +2. Install python dependencies in a virtual env and run unit and integration tests ``` - python3 -m venv venv - . venv/bin/activate - pip install --upgrade pip - pip install .[test] + make venv ``` 3. To run unit tests: ``` - nosetests --where=tests/unit + make unit_test ``` 4. To run integration tests: ``` - nosetests --where=tests/integration + make integration_test ``` ### To run pylint: 1. Install python dependencies and run python linter ``` - python3 -m venv venv - . venv/bin/activate - pip install --upgrade pip - pip install .[test] - pylint --rcfile .pylintrc --disable duplicate-code target_postgres/ + make venv pylint ``` ## License diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a73f79c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3" + +services: + db: + image: postgres:12-alpine + environment: + POSTGRES_DB: "target_db" + POSTGRES_USER: "my_user" + POSTGRES_PASSWORD: "secret" + ports: + - 5432:5432 diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 2cbdf6b..0000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -singer-python==5.1.1 -psycopg2==2.7.5 -inflection==0.3.1 \ No newline at end of file diff --git a/setup.py b/setup.py index 32b8743..e1b5140 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup with open('README.md') as f: - long_description = f.read() + long_description = f.read() setup(name="pipelinewise-target-postgres", version="2.1.1", @@ -25,17 +25,16 @@ ], extras_require={ "test": [ - 'nose==1.3.7', - 'mock==3.0.5', - 'pylint==2.4.4', - 'nose-cov==1.6' - ] + 'pytest==6.2.1', + 'pylint==2.6.0', + 'pytest-cov==2.10.1', + ] }, entry_points=""" [console_scripts] target-postgres=target_postgres:main """, packages=["target_postgres"], - package_data = {}, + package_data={}, include_package_data=True, -) + ) diff --git a/target_postgres/__init__.py b/target_postgres/__init__.py index 49ed923..5283317 100644 --- a/target_postgres/__init__.py +++ b/target_postgres/__init__.py @@ -94,7 +94,6 @@ def persist_lines(config, lines) -> None: stream_to_sync = {} total_row_count = {} batch_size_rows = config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS) - parallelism = config.get("parallelism", -1) # Loop over lines from stdin for line in lines: @@ -127,8 +126,9 @@ def persist_lines(config, lines) -> None: raise InvalidValidationOperationException( f"Data validation failed and cannot load to destination. RECORD: {o['record']}\n" "multipleOf validations that allows long precisions are not supported (i.e. with 15 digits" - "or more) Try removing 'multipleOf' methods from JSON schema.") - raise RecordValidationException(f"Record does not pass schema validation. RECORD: {o['record']}") + "or more) Try removing 'multipleOf' methods from JSON schema.") from ex + raise RecordValidationException( + f"Record does not pass schema validation. RECORD: {o['record']}") from ex primary_key_string = stream_to_sync[stream].record_primary_key_string(o['record']) if not primary_key_string: diff --git a/target_postgres/db_sync.py b/target_postgres/db_sync.py index 3b8443b..a20d003 100644 --- a/target_postgres/db_sync.py +++ b/target_postgres/db_sync.py @@ -329,7 +329,7 @@ def table_name(self, stream_name, is_temporary=False, without_schema=False): if without_schema: return f'"{pg_table_name.lower()}"' - return f'{self.schema_name}."{pg_table_name.lower()}"' + return f'"{self.schema_name}"."{pg_table_name.lower()}"' def record_primary_key_string(self, record): if len(self.stream_schema_message['key_properties']) == 0: @@ -509,7 +509,7 @@ def create_schema_if_not_exists(self, table_columns_cache=None): ) if len(schema_rows) == 0: - query = "CREATE SCHEMA IF NOT EXISTS {}".format(schema_name) + query = 'CREATE SCHEMA IF NOT EXISTS "{}"'.format(schema_name) self.logger.info("Schema '%s' does not exist. Creating... %s", schema_name, query) self.query(query) @@ -524,7 +524,7 @@ def get_tables(self): def get_table_columns(self, table_name): return self.query("""SELECT column_name, data_type FROM information_schema.columns - WHERE lower(table_name) = %s AND lower(table_schema) = %s""", (table_name.replace("\"", "").lower(), + WHERE lower(table_name) = %s AND lower(table_schema) = '%s'""", (table_name.replace("\"", "").lower(), self.schema_name.lower())) def update_columns(self): @@ -582,13 +582,13 @@ def sync_table(self): stream_schema_message = self.stream_schema_message stream = stream_schema_message['stream'] table_name = self.table_name(stream, without_schema=True) - found_tables = [table for table in (self.get_tables()) if f'"{table["table_name"].lower()}"' == table_name] + found_tables = [table for table in (self.get_tables()) if f'{table["table_name"].lower()}' == table_name] if len(found_tables) == 0: query = self.create_table_query() - self.logger.info("Table '%s' does not exist. Creating... %s", table_name, query) + self.logger.info("Table %s does not exist. Creating... %s", table_name, query) self.query(query) self.grant_privilege(self.schema_name, self.grantees, self.grant_select_on_all_tables_in_schema) else: - self.logger.info("Table '%s' exists", table_name) + self.logger.info("Table %s exists", table_name) self.update_columns() diff --git a/tests/integration/test_target_postgres.py b/tests/integration/test_target_postgres.py index 5547efa..4caf465 100644 --- a/tests/integration/test_target_postgres.py +++ b/tests/integration/test_target_postgres.py @@ -1,12 +1,12 @@ import unittest import os import json -import mock import datetime import target_postgres -from target_postgres import RecordValidationException -from nose.tools import assert_raises +from unittest import mock + +from target_postgres import RecordValidationException from target_postgres.db_sync import DbSync from psycopg2.errors import InvalidTextRepresentation @@ -30,6 +30,7 @@ class TestIntegration(unittest.TestCase): @classmethod def setUp(cls): cls.config = test_utils.get_test_config() + print(cls.config) cls.maxDiff = None postgres = DbSync(cls.config) if cls.config['default_target_schema']: @@ -277,13 +278,13 @@ def assert_binary_data_is_in_postgres(self, table_name, should_metadata_columns_ def test_invalid_json(self): """Receiving invalid JSONs should raise an exception""" tap_lines = test_utils.get_test_tap_lines('invalid-json.json') - with assert_raises(json.decoder.JSONDecodeError): + with self.assertRaises(json.decoder.JSONDecodeError): target_postgres.persist_lines(self.config, tap_lines) def test_message_order(self): """RECORD message without a previously received SCHEMA message should raise an exception""" tap_lines = test_utils.get_test_tap_lines('invalid-message-order.json') - with assert_raises(Exception): + with self.assertRaises(Exception): target_postgres.persist_lines(self.config, tap_lines) def test_loading_tables(self): @@ -547,7 +548,7 @@ def test_column_name_change(self): [{'c_int': 1, 'c_pk': 1, 'c_varchar': '1'}]) # Table two should have versioned column - self.assertEquals( + self.assertEqual( self.remove_metadata_columns_from_rows(table_two), [ {previous_column_name: datetime.datetime(2019, 2, 1, 15, 12, 45), 'c_int': 1, 'c_pk': 1, @@ -637,13 +638,13 @@ def test_grant_privileges(self): # Granting not existing group should raise exception postgres.query("DROP SCHEMA IF EXISTS {} CASCADE".format(self.config['default_target_schema'])) - with assert_raises(Exception): + with self.assertRaises(Exception): self.config['default_target_schema_select_permissions'] = 'group_not_exists_1' target_postgres.persist_lines(self.config, tap_lines) # Granting not existing list of groups should raise exception postgres.query("DROP SCHEMA IF EXISTS {} CASCADE".format(self.config['default_target_schema'])) - with assert_raises(Exception): + with self.assertRaises(Exception): self.config['default_target_schema_select_permissions'] = ['group_not_exists_1', 'group_not_exists_2'] target_postgres.persist_lines(self.config, tap_lines) @@ -691,7 +692,7 @@ def test_flush_streams_with_no_intermediate_flushes(self, mock_emit_state): target_postgres.persist_lines(self.config, tap_lines) # State should be emitted only once with the latest received STATE message - self.assertEquals( + self.assertEqual( mock_emit_state.mock_calls, [ mock.call({"currently_syncing": None, "bookmarks": { @@ -719,7 +720,7 @@ def test_flush_streams_with_intermediate_flushes(self, mock_emit_state): target_postgres.persist_lines(self.config, tap_lines) # State should be emitted multiple times, updating the positions only in the stream which got flushed - self.assertEquals( + self.assertEqual( mock_emit_state.call_args_list, [ # Flush #1 - Flushed edgydata until lsn: 108197216 @@ -794,7 +795,7 @@ def test_flush_streams_with_intermediate_flushes_on_all_streams(self, mock_emit_ target_postgres.persist_lines(self.config, tap_lines) # State should be emitted 6 times, flushing every stream and updating every stream position - self.assertEquals( + self.assertEqual( mock_emit_state.call_args_list, [ # Flush #1 - Flush every stream until lsn: 108197216 @@ -863,12 +864,12 @@ def test_record_validation(self): # Loading invalid records when record validation enabled should fail at ... self.config['validate_records'] = True - with assert_raises(RecordValidationException): + with self.assertRaises(RecordValidationException): target_postgres.persist_lines(self.config, tap_lines) # Loading invalid records when record validation disabled should fail at load time self.config['validate_records'] = False - with assert_raises(InvalidTextRepresentation): + with self.assertRaises(InvalidTextRepresentation): target_postgres.persist_lines(self.config, tap_lines) def test_loading_tables_with_custom_temp_dir(self): diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index f784e67..85bb503 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -1,5 +1,4 @@ import unittest -from nose.tools import assert_raises import target_postgres @@ -31,7 +30,7 @@ def test_config_validation(self): # Empty configuration should fail - (nr_of_errors >= 0) self.assertGreater(len(validator(empty_config)), 0) - # Minimal configuratino should pass - (nr_of_errors == 0) + # Minimal configuration should pass - (nr_of_errors == 0) self.assertEqual(len(validator(minimal_config)), 0) # Configuration without schema references - (nr_of_errors >= 0) @@ -72,21 +71,21 @@ def test_column_type_mapping(self): json_arr = {"type": ["array"] } # Mapping from JSON schema types to Postgres column types - self.assertEquals(mapper(json_str) , 'character varying') - self.assertEquals(mapper(json_str_or_null) , 'character varying') - self.assertEquals(mapper(json_dt) , 'timestamp without time zone') - self.assertEquals(mapper(json_dt_or_null) , 'timestamp without time zone') - self.assertEquals(mapper(json_t) , 'time without time zone') - self.assertEquals(mapper(json_t_or_null) , 'time without time zone') - self.assertEquals(mapper(json_num) , 'double precision') - self.assertEquals(mapper(json_smallint) , 'smallint') - self.assertEquals(mapper(json_int) , 'integer') - self.assertEquals(mapper(json_bigint) , 'bigint') - self.assertEquals(mapper(json_nobound_int) , 'numeric') - self.assertEquals(mapper(json_int_or_str) , 'character varying') - self.assertEquals(mapper(json_bool) , 'boolean') - self.assertEquals(mapper(json_obj) , 'jsonb') - self.assertEquals(mapper(json_arr) , 'jsonb') + self.assertEqual(mapper(json_str) , 'character varying') + self.assertEqual(mapper(json_str_or_null) , 'character varying') + self.assertEqual(mapper(json_dt) , 'timestamp without time zone') + self.assertEqual(mapper(json_dt_or_null) , 'timestamp without time zone') + self.assertEqual(mapper(json_t) , 'time without time zone') + self.assertEqual(mapper(json_t_or_null) , 'time without time zone') + self.assertEqual(mapper(json_num) , 'double precision') + self.assertEqual(mapper(json_smallint) , 'smallint') + self.assertEqual(mapper(json_int) , 'integer') + self.assertEqual(mapper(json_bigint) , 'bigint') + self.assertEqual(mapper(json_nobound_int) , 'numeric') + self.assertEqual(mapper(json_int_or_str) , 'character varying') + self.assertEqual(mapper(json_bool) , 'boolean') + self.assertEqual(mapper(json_obj) , 'jsonb') + self.assertEqual(mapper(json_arr) , 'jsonb') def test_stream_name_to_dict(self): """Test identifying catalog, schema and table names from fully qualified stream and table names"""