From 16284c236a577222982d64555fd3fa3cedf0ca45 Mon Sep 17 00:00:00 2001 From: Hunter Senft-Grupp Date: Sun, 24 Nov 2019 14:34:58 -0500 Subject: [PATCH 1/4] Updated implementations and removed container volume mounting. Added python_callable extension. --- .gitignore | 2 + bin/sync-deps | 3 - deps/dev-requirements.in | 9 + deps/dev-requirements.txt | 35 ++- deps/docs-requirements.in | 2 - deps/docs-requirements.txt | 124 ---------- deps/linting-requirements.in | 3 - deps/linting-requirements.txt | 133 ---------- deps/requirements.in | 9 +- deps/requirements.txt | 12 +- deps/testing-requirements.in | 3 - deps/testing-requirements.txt | 113 --------- examples/dags/extensions/python_callable.py | 83 +++++++ examples/dags/sensors.py | 4 +- images/example-tasks/Dockerfile | 6 +- images/example-tasks/code/call.py | 3 + images/example-tasks/code/xcom.py | 13 + images/example-tasks/scripts/xcom_push.py | 1 - images/service/airflow.cfg | 1 + images/service/deps/requirements.txt | 2 +- release.rst | 12 + setup.py | 3 - src/airflow_docker/__init__.py | 1 + src/airflow_docker/constants.py | 25 ++ src/airflow_docker/context.py | 47 ++++ src/airflow_docker/ext/python_callable.py | 80 ++++++ src/airflow_docker/operator.py | 258 ++++++++++++-------- src/airflow_docker/plugin.py | 1 + src/airflow_docker/utils.py | 37 +++ src/airflow_docker/views/config.py | 5 +- src/airflow_docker/views/extensions.py | 5 +- tox.ini | 8 +- 32 files changed, 537 insertions(+), 506 deletions(-) delete mode 100644 deps/docs-requirements.in delete mode 100644 deps/docs-requirements.txt delete mode 100644 deps/linting-requirements.in delete mode 100644 deps/linting-requirements.txt delete mode 100644 deps/testing-requirements.in delete mode 100644 deps/testing-requirements.txt create mode 100644 examples/dags/extensions/python_callable.py create mode 100644 images/example-tasks/code/call.py create mode 100644 images/example-tasks/code/xcom.py create mode 100644 release.rst create mode 100644 src/airflow_docker/constants.py create mode 100644 src/airflow_docker/context.py create mode 100644 src/airflow_docker/ext/python_callable.py diff --git a/.gitignore b/.gitignore index 531e82d..18adb85 100644 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,5 @@ venv.bak/ # mypy .mypy_cache/ + +pip-wheel-metadata/ diff --git a/bin/sync-deps b/bin/sync-deps index 3ff95c5..e402cb6 100755 --- a/bin/sync-deps +++ b/bin/sync-deps @@ -5,10 +5,7 @@ set -ex # Library Deps export SLUGIFY_USES_TEXT_UNIDECODE=${SLUGIFY_USES_TEXT_UNIDECODE:-yes} pip-compile -o deps/requirements.txt --upgrade-package airflow-docker-helper deps/requirements.in -pip-compile -o deps/testing-requirements.txt deps/requirements.txt deps/testing-requirements.in pip-compile -o deps/dev-requirements.txt --upgrade-package releasely deps/requirements.txt deps/dev-requirements.in -pip-compile -o deps/docs-requirements.txt deps/requirements.txt deps/testing-requirements.txt deps/docs-requirements.in -pip-compile -o deps/linting-requirements.txt deps/requirements.txt deps/testing-requirements.txt deps/docs-requirements.txt deps/linting-requirements.in # Service Image Deps pip-compile -o images/service/deps/requirements.txt images/service/deps/requirements.in deps/requirements.txt diff --git a/deps/dev-requirements.in b/deps/dev-requirements.in index 905ad43..0761668 100644 --- a/deps/dev-requirements.in +++ b/deps/dev-requirements.in @@ -1 +1,10 @@ releasely +sphinx +sphinx_rtd_theme +isort +black +flake8 +pytest +pytest-cov +mock; python_version < '3' + diff --git a/deps/dev-requirements.txt b/deps/dev-requirements.txt index bc597d3..c144cb3 100644 --- a/deps/dev-requirements.txt +++ b/deps/dev-requirements.txt @@ -6,13 +6,16 @@ # airflow-docker-helper==0.2.0 airflow-queue-stats==0.1.4 +alabaster==0.7.12 # via sphinx alembic==1.2.1 amqp==2.5.2 -apache-airflow[docker]==1.10.5 +apache-airflow[celery,docker]==1.10.5 apispec[yaml]==3.0.0 +appdirs==1.4.3 # via black attrs==19.3.0 babel==2.7.0 billiard==3.6.1.0 +black==19.10b0 boto3==1.7.84 botocore==1.10.84 bumpversion==0.5.3 # via releasely @@ -24,6 +27,7 @@ click==7.0 colorama==0.4.1 colorlog==4.0.2 configparser==3.5.3 +coverage==4.5.4 # via pytest-cov croniter==0.3.30 defusedxml==0.6.0 dill==0.2.9 @@ -31,6 +35,8 @@ docker-pycreds==0.4.0 docker==3.7.3 docutils==0.15.2 dumb-init==1.2.2 +entrypoints==0.3 # via flake8 +flake8==3.7.9 flask-admin==1.5.3 flask-appbuilder==1.13.1 flask-babel==0.12.2 @@ -47,8 +53,10 @@ funcsigs==1.0.0 future==0.16.0 gunicorn==19.9.0 idna==2.8 +imagesize==1.1.0 # via sphinx importlib-metadata==0.23 iso8601==0.1.12 +isort==4.3.21 itsdangerous==1.1.0 jinja2==2.10.3 jmespath==0.9.4 @@ -63,16 +71,26 @@ markupsafe==1.1.1 marshmallow-enum==1.5.1 marshmallow-sqlalchemy==0.19.0 marshmallow==2.19.5 +mccabe==0.6.1 # via flake8 more-itertools==7.2.0 numpy==1.17.3 ordereddict==1.1 +packaging==19.2 # via pytest, sphinx pandas==0.25.2 +pathspec==0.6.0 # via black pendulum==1.4.4 +pluggy==0.13.1 # via pytest prison==0.1.0 psutil==5.6.3 +py==1.8.0 # via pytest +pycodestyle==2.5.0 # via flake8 +pyflakes==2.1.1 # via flake8 pygments==2.4.2 pyjwt==1.7.1 +pyparsing==2.4.5 # via packaging pyrsistent==0.15.4 +pytest-cov==2.8.1 +pytest==5.3.0 python-daemon==2.1.2 python-dateutil==2.8.0 python-editor==1.0.4 @@ -80,22 +98,35 @@ python3-openid==3.1.0 pytz==2019.3 pytzdata==2019.3 pyyaml==5.1.2 +regex==2019.11.1 # via black releasely==1.0.5 requests==2.22.0 s3transfer==0.1.13 setproctitle==1.1.10 six==1.12.0 +snowballstemmer==2.0.0 # via sphinx +sphinx-rtd-theme==0.4.3 +sphinx==2.2.1 +sphinxcontrib-applehelp==1.0.1 # via sphinx +sphinxcontrib-devhelp==1.0.1 # via sphinx +sphinxcontrib-htmlhelp==1.0.2 # via sphinx +sphinxcontrib-jsmath==1.0.1 # via sphinx +sphinxcontrib-qthelp==1.0.2 # via sphinx +sphinxcontrib-serializinghtml==1.1.3 # via sphinx sqlalchemy==1.3.10 tabulate==0.8.5 tenacity==4.12.0 termcolor==1.1.0 text-unidecode==1.2 thrift==0.11.0 +toml==0.10.0 # via black tornado==5.1.1 +typed-ast==1.4.0 # via black tzlocal==1.5.1 unicodecsv==0.14.1 urllib3==1.25.6 vine==1.3.0 +wcwidth==0.1.7 # via pytest websocket-client==0.56.0 werkzeug==0.16.0 wtforms==2.2.1 @@ -103,4 +134,4 @@ zipp==0.6.0 zope.deprecation==4.4.0 # The following packages are considered to be unsafe in a requirements file: -# setuptools==41.4.0 # via jsonschema, python-daemon, zope.deprecation +# setuptools==42.0.0 # via jsonschema, python-daemon, sphinx, zope.deprecation diff --git a/deps/docs-requirements.in b/deps/docs-requirements.in deleted file mode 100644 index 8213302..0000000 --- a/deps/docs-requirements.in +++ /dev/null @@ -1,2 +0,0 @@ -sphinx -sphinx_rtd_theme diff --git a/deps/docs-requirements.txt b/deps/docs-requirements.txt deleted file mode 100644 index 8ae9d80..0000000 --- a/deps/docs-requirements.txt +++ /dev/null @@ -1,124 +0,0 @@ -# -# This file is autogenerated by pip-compile -# To update, run: -# -# pip-compile --output-file=deps/docs-requirements.txt deps/docs-requirements.in deps/requirements.txt deps/testing-requirements.txt -# -airflow-docker-helper==0.2.0 -airflow-queue-stats==0.1.4 -alabaster==0.7.12 # via sphinx -alembic==1.2.1 -amqp==2.5.2 -apache-airflow[docker]==1.10.5 -apispec[yaml]==3.0.0 -atomicwrites==1.3.0 -attrs==19.3.0 -babel==2.7.0 -billiard==3.6.1.0 -boto3==1.7.84 -botocore==1.10.84 -cached-property==1.5.1 -celery==4.3.0 -certifi==2019.9.11 -chardet==3.0.4 -click==7.0 -colorama==0.4.1 -colorlog==4.0.2 -configparser==3.5.3 -coverage==4.5.4 -croniter==0.3.30 -defusedxml==0.6.0 -dill==0.2.9 -docker-pycreds==0.4.0 -docker==3.7.3 -docutils==0.15.2 -dumb-init==1.2.2 -flask-admin==1.5.3 -flask-appbuilder==1.13.1 -flask-babel==0.12.2 -flask-caching==1.3.3 -flask-jwt-extended==3.24.0 -flask-login==0.4.1 -flask-openid==1.2.5 -flask-sqlalchemy==2.4.1 -flask-swagger==0.2.13 -flask-wtf==0.14.2 -flask==1.1.1 -flower==0.9.3 -funcsigs==1.0.0 -future==0.16.0 -gunicorn==19.9.0 -idna==2.8 -imagesize==1.1.0 # via sphinx -importlib-metadata==0.23 -iso8601==0.1.12 -itsdangerous==1.1.0 -jinja2==2.10.3 -jmespath==0.9.4 -json-merge-patch==0.2 -jsonschema==3.1.1 -kombu==4.6.3 -lazy-object-proxy==1.4.2 -lockfile==0.12.2 -mako==1.1.0 -markdown==2.6.11 -markupsafe==1.1.1 -marshmallow-enum==1.5.1 -marshmallow-sqlalchemy==0.19.0 -marshmallow==2.19.5 -more-itertools==7.2.0 -numpy==1.17.3 -ordereddict==1.1 -packaging==19.2 -pandas==0.25.2 -pendulum==1.4.4 -pluggy==0.13.0 -prison==0.1.0 -psutil==5.6.3 -py==1.8.0 -pygments==2.4.2 -pyjwt==1.7.1 -pyparsing==2.4.2 -pyrsistent==0.15.4 -pytest-cov==2.8.1 -pytest==5.2.1 -python-daemon==2.1.2 -python-dateutil==2.8.0 -python-editor==1.0.4 -python3-openid==3.1.0 -pytz==2019.3 -pytzdata==2019.3 -pyyaml==5.1.2 -requests==2.22.0 -s3transfer==0.1.13 -setproctitle==1.1.10 -six==1.12.0 -snowballstemmer==2.0.0 # via sphinx -sphinx-rtd-theme==0.4.3 -sphinx==2.2.0 -sphinxcontrib-applehelp==1.0.1 # via sphinx -sphinxcontrib-devhelp==1.0.1 # via sphinx -sphinxcontrib-htmlhelp==1.0.2 # via sphinx -sphinxcontrib-jsmath==1.0.1 # via sphinx -sphinxcontrib-qthelp==1.0.2 # via sphinx -sphinxcontrib-serializinghtml==1.1.3 # via sphinx -sqlalchemy==1.3.10 -tabulate==0.8.5 -tenacity==4.12.0 -termcolor==1.1.0 -text-unidecode==1.2 -thrift==0.11.0 -tornado==5.1.1 -tzlocal==1.5.1 -unicodecsv==0.14.1 -urllib3==1.25.6 -vine==1.3.0 -wcwidth==0.1.7 -websocket-client==0.56.0 -werkzeug==0.16.0 -wtforms==2.2.1 -zipp==0.6.0 -zope.deprecation==4.4.0 - -# The following packages are considered to be unsafe in a requirements file: -# setuptools==41.4.0 # via jsonschema, python-daemon, sphinx, zope.deprecation diff --git a/deps/linting-requirements.in b/deps/linting-requirements.in deleted file mode 100644 index fc4bc95..0000000 --- a/deps/linting-requirements.in +++ /dev/null @@ -1,3 +0,0 @@ -isort -black -flake8 diff --git a/deps/linting-requirements.txt b/deps/linting-requirements.txt deleted file mode 100644 index 0409379..0000000 --- a/deps/linting-requirements.txt +++ /dev/null @@ -1,133 +0,0 @@ -# -# This file is autogenerated by pip-compile -# To update, run: -# -# pip-compile --output-file=deps/linting-requirements.txt deps/docs-requirements.txt deps/linting-requirements.in deps/requirements.txt deps/testing-requirements.txt -# -airflow-docker-helper==0.2.0 -airflow-queue-stats==0.1.4 -alabaster==0.7.12 -alembic==1.2.1 -amqp==2.5.2 -apache-airflow[docker]==1.10.5 -apispec[yaml]==3.0.0 -appdirs==1.4.3 # via black -atomicwrites==1.3.0 -attrs==19.3.0 -babel==2.7.0 -billiard==3.6.1.0 -black==19.3b0 -boto3==1.7.84 -botocore==1.10.84 -cached-property==1.5.1 -celery==4.3.0 -certifi==2019.9.11 -chardet==3.0.4 -click==7.0 -colorama==0.4.1 -colorlog==4.0.2 -configparser==3.5.3 -coverage==4.5.4 -croniter==0.3.30 -defusedxml==0.6.0 -dill==0.2.9 -docker-pycreds==0.4.0 -docker==3.7.3 -docutils==0.15.2 -dumb-init==1.2.2 -entrypoints==0.3 # via flake8 -flake8==3.7.8 -flask-admin==1.5.3 -flask-appbuilder==1.13.1 -flask-babel==0.12.2 -flask-caching==1.3.3 -flask-jwt-extended==3.24.0 -flask-login==0.4.1 -flask-openid==1.2.5 -flask-sqlalchemy==2.4.1 -flask-swagger==0.2.13 -flask-wtf==0.14.2 -flask==1.1.1 -flower==0.9.3 -funcsigs==1.0.0 -future==0.16.0 -gunicorn==19.9.0 -idna==2.8 -imagesize==1.1.0 -importlib-metadata==0.23 -iso8601==0.1.12 -isort==4.3.21 -itsdangerous==1.1.0 -jinja2==2.10.3 -jmespath==0.9.4 -json-merge-patch==0.2 -jsonschema==3.1.1 -kombu==4.6.3 -lazy-object-proxy==1.4.2 -lockfile==0.12.2 -mako==1.1.0 -markdown==2.6.11 -markupsafe==1.1.1 -marshmallow-enum==1.5.1 -marshmallow-sqlalchemy==0.19.0 -marshmallow==2.19.5 -mccabe==0.6.1 # via flake8 -more-itertools==7.2.0 -numpy==1.17.3 -ordereddict==1.1 -packaging==19.2 -pandas==0.25.2 -pendulum==1.4.4 -pluggy==0.13.0 -prison==0.1.0 -psutil==5.6.3 -py==1.8.0 -pycodestyle==2.5.0 # via flake8 -pyflakes==2.1.1 # via flake8 -pygments==2.4.2 -pyjwt==1.7.1 -pyparsing==2.4.2 -pyrsistent==0.15.4 -pytest-cov==2.8.1 -pytest==5.2.1 -python-daemon==2.1.2 -python-dateutil==2.8.0 -python-editor==1.0.4 -python3-openid==3.1.0 -pytz==2019.3 -pytzdata==2019.3 -pyyaml==5.1.2 -requests==2.22.0 -s3transfer==0.1.13 -setproctitle==1.1.10 -six==1.12.0 -snowballstemmer==2.0.0 -sphinx-rtd-theme==0.4.3 -sphinx==2.2.0 -sphinxcontrib-applehelp==1.0.1 -sphinxcontrib-devhelp==1.0.1 -sphinxcontrib-htmlhelp==1.0.2 -sphinxcontrib-jsmath==1.0.1 -sphinxcontrib-qthelp==1.0.2 -sphinxcontrib-serializinghtml==1.1.3 -sqlalchemy==1.3.10 -tabulate==0.8.5 -tenacity==4.12.0 -termcolor==1.1.0 -text-unidecode==1.2 -thrift==0.11.0 -toml==0.10.0 # via black -tornado==5.1.1 -tzlocal==1.5.1 -unicodecsv==0.14.1 -urllib3==1.25.6 -vine==1.3.0 -wcwidth==0.1.7 -websocket-client==0.56.0 -werkzeug==0.16.0 -wtforms==2.2.1 -zipp==0.6.0 -zope.deprecation==4.4.0 - -# The following packages are considered to be unsafe in a requirements file: -# setuptools==41.4.0 # via jsonschema, python-daemon, sphinx, zope.deprecation diff --git a/deps/requirements.in b/deps/requirements.in index 7e49b12..ded96a9 100644 --- a/deps/requirements.in +++ b/deps/requirements.in @@ -1,4 +1,11 @@ -apache-airflow[docker]==1.10.5 +apache-airflow[celery,docker]==1.10.5 airflow-docker-helper>=0.2.0 airflow-queue-stats==0.1.4 boto3>=1.7.0,<1.8.0 + +# Added due to strange kombu version incompatibility. +# Taken from https://github.com/apache/airflow/blob/1.10.5/setup.py#L158 +celery~=4.3 +flower>=0.7.3, <1.0 +tornado>=4.2.0, <6.0 +kombu==4.6.3 diff --git a/deps/requirements.txt b/deps/requirements.txt index 8f02a5d..dfaf958 100644 --- a/deps/requirements.txt +++ b/deps/requirements.txt @@ -8,7 +8,7 @@ airflow-docker-helper==0.2.0 airflow-queue-stats==0.1.4 alembic==1.2.1 # via apache-airflow amqp==2.5.2 # via kombu -apache-airflow[docker]==1.10.5 +apache-airflow[celery,docker]==1.10.5 apispec[yaml]==3.0.0 # via flask-appbuilder attrs==19.3.0 # via jsonschema babel==2.7.0 # via flask-babel, flower @@ -16,7 +16,7 @@ billiard==3.6.1.0 # via celery boto3==1.7.84 botocore==1.10.84 # via boto3, s3transfer cached-property==1.5.1 # via apache-airflow -celery==4.3.0 # via apache-airflow, flower +celery==4.3.0 certifi==2019.9.11 # via requests chardet==3.0.4 # via requests click==7.0 # via flask, flask-appbuilder @@ -41,7 +41,7 @@ flask-sqlalchemy==2.4.1 # via flask-appbuilder flask-swagger==0.2.13 # via apache-airflow flask-wtf==0.14.2 # via apache-airflow, flask-appbuilder flask==1.1.1 # via apache-airflow, flask-admin, flask-appbuilder, flask-babel, flask-caching, flask-jwt-extended, flask-login, flask-openid, flask-sqlalchemy, flask-swagger, flask-wtf -flower==0.9.3 # via apache-airflow +flower==0.9.3 funcsigs==1.0.0 # via apache-airflow future==0.16.0 # via apache-airflow gunicorn==19.9.0 # via apache-airflow @@ -53,7 +53,7 @@ jinja2==2.10.3 # via apache-airflow, flask, flask-babel jmespath==0.9.4 # via boto3, botocore json-merge-patch==0.2 # via apache-airflow jsonschema==3.1.1 # via flask-appbuilder -kombu==4.6.3 # via apache-airflow, celery +kombu==4.6.3 lazy-object-proxy==1.4.2 # via apache-airflow lockfile==0.12.2 # via python-daemon mako==1.1.0 # via alembic @@ -89,7 +89,7 @@ tenacity==4.12.0 # via apache-airflow termcolor==1.1.0 # via apache-airflow text-unidecode==1.2 # via apache-airflow thrift==0.11.0 # via apache-airflow -tornado==5.1.1 # via apache-airflow, flower +tornado==5.1.1 tzlocal==1.5.1 # via apache-airflow, pendulum unicodecsv==0.14.1 # via apache-airflow urllib3==1.25.6 # via requests @@ -101,4 +101,4 @@ zipp==0.6.0 # via importlib-metadata zope.deprecation==4.4.0 # via apache-airflow # The following packages are considered to be unsafe in a requirements file: -# setuptools==41.4.0 # via jsonschema, python-daemon, zope.deprecation +# setuptools==42.0.0 # via jsonschema, python-daemon, zope.deprecation diff --git a/deps/testing-requirements.in b/deps/testing-requirements.in deleted file mode 100644 index 9d2289d..0000000 --- a/deps/testing-requirements.in +++ /dev/null @@ -1,3 +0,0 @@ -pytest -pytest-cov -mock; python_version < '3' diff --git a/deps/testing-requirements.txt b/deps/testing-requirements.txt deleted file mode 100644 index e71898c..0000000 --- a/deps/testing-requirements.txt +++ /dev/null @@ -1,113 +0,0 @@ -# -# This file is autogenerated by pip-compile -# To update, run: -# -# pip-compile --output-file=deps/testing-requirements.txt deps/requirements.txt deps/testing-requirements.in -# -airflow-docker-helper==0.2.0 -airflow-queue-stats==0.1.4 -alembic==1.2.1 -amqp==2.5.2 -apache-airflow[docker]==1.10.5 -apispec[yaml]==3.0.0 -atomicwrites==1.3.0 # via pytest -attrs==19.3.0 -babel==2.7.0 -billiard==3.6.1.0 -boto3==1.7.84 -botocore==1.10.84 -cached-property==1.5.1 -celery==4.3.0 -certifi==2019.9.11 -chardet==3.0.4 -click==7.0 -colorama==0.4.1 -colorlog==4.0.2 -configparser==3.5.3 -coverage==4.5.4 # via pytest-cov -croniter==0.3.30 -defusedxml==0.6.0 -dill==0.2.9 -docker-pycreds==0.4.0 -docker==3.7.3 -docutils==0.15.2 -dumb-init==1.2.2 -flask-admin==1.5.3 -flask-appbuilder==1.13.1 -flask-babel==0.12.2 -flask-caching==1.3.3 -flask-jwt-extended==3.24.0 -flask-login==0.4.1 -flask-openid==1.2.5 -flask-sqlalchemy==2.4.1 -flask-swagger==0.2.13 -flask-wtf==0.14.2 -flask==1.1.1 -flower==0.9.3 -funcsigs==1.0.0 -future==0.16.0 -gunicorn==19.9.0 -idna==2.8 -importlib-metadata==0.23 -iso8601==0.1.12 -itsdangerous==1.1.0 -jinja2==2.10.3 -jmespath==0.9.4 -json-merge-patch==0.2 -jsonschema==3.1.1 -kombu==4.6.3 -lazy-object-proxy==1.4.2 -lockfile==0.12.2 -mako==1.1.0 -markdown==2.6.11 -markupsafe==1.1.1 -marshmallow-enum==1.5.1 -marshmallow-sqlalchemy==0.19.0 -marshmallow==2.19.5 -more-itertools==7.2.0 -numpy==1.17.3 -ordereddict==1.1 -packaging==19.2 # via pytest -pandas==0.25.2 -pendulum==1.4.4 -pluggy==0.13.0 # via pytest -prison==0.1.0 -psutil==5.6.3 -py==1.8.0 # via pytest -pygments==2.4.2 -pyjwt==1.7.1 -pyparsing==2.4.2 # via packaging -pyrsistent==0.15.4 -pytest-cov==2.8.1 -pytest==5.2.1 -python-daemon==2.1.2 -python-dateutil==2.8.0 -python-editor==1.0.4 -python3-openid==3.1.0 -pytz==2019.3 -pytzdata==2019.3 -pyyaml==5.1.2 -requests==2.22.0 -s3transfer==0.1.13 -setproctitle==1.1.10 -six==1.12.0 -sqlalchemy==1.3.10 -tabulate==0.8.5 -tenacity==4.12.0 -termcolor==1.1.0 -text-unidecode==1.2 -thrift==0.11.0 -tornado==5.1.1 -tzlocal==1.5.1 -unicodecsv==0.14.1 -urllib3==1.25.6 -vine==1.3.0 -wcwidth==0.1.7 # via pytest -websocket-client==0.56.0 -werkzeug==0.16.0 -wtforms==2.2.1 -zipp==0.6.0 -zope.deprecation==4.4.0 - -# The following packages are considered to be unsafe in a requirements file: -# setuptools==41.4.0 # via jsonschema, python-daemon, zope.deprecation diff --git a/examples/dags/extensions/python_callable.py b/examples/dags/extensions/python_callable.py new file mode 100644 index 0000000..48cf5db --- /dev/null +++ b/examples/dags/extensions/python_callable.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime, timedelta +from airflow import DAG + +from airflow_docker.operator import Operator + + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2018, 10, 10), + 'retries': 0, +} + +dag = DAG( + 'python-callable', + default_args=default_args, + schedule_interval=None, +) + +with dag: + no_context = Operator( + task_id='call-python-callable-no-context', + image='airflowdocker/example-tasks:latest', + python_callable="call:call_me", + op_args=[ + 'foo', + 'bar', + ], + op_kwargs={ + 'integer': 1, + 'float': 2.0, + 'bool': True, + 'nested': [ + 'foo', + 'baz', + ], + }, + ) + + use_context = Operator( + task_id='call-python-callable-with-context', + image='airflowdocker/example-tasks:latest', + provide_context=True, + python_callable="call:call_me", + ) + + xcom_push = Operator( + task_id='call-python-callable-xcom-push', + image='airflowdocker/example-tasks:latest', + python_callable="xcom:push", + op_kwargs={ + "data_to_push": [ + "this", + "that", + ] + } + ) + + xcom_pull = Operator( + task_id='call-python-callable-xcom-pull', + image='airflowdocker/example-tasks:latest', + python_callable="xcom:pull", + op_args=[ + "{{ task_instance.xcom_pull(task_ids='call-python-callable-xcom-push') | tojson }}" + ] + ) + xcom_pull.set_upstream(xcom_push) + diff --git a/examples/dags/sensors.py b/examples/dags/sensors.py index 7f4535b..b995c97 100644 --- a/examples/dags/sensors.py +++ b/examples/dags/sensors.py @@ -73,7 +73,7 @@ soft_fail=False, timeout=30, command=json.dumps([ - "python", "sensor.py", "FAIL", + "sensor.py", "FAIL", ]), ) @@ -92,7 +92,7 @@ soft_fail=True, timeout=30, command=json.dumps([ - "python", "sensor.py", "FAIL", + "sensor.py", "FAIL", ]), ) diff --git a/images/example-tasks/Dockerfile b/images/example-tasks/Dockerfile index db22f33..02e9b8f 100644 --- a/images/example-tasks/Dockerfile +++ b/images/example-tasks/Dockerfile @@ -2,7 +2,11 @@ FROM python:3.6.6-alpine RUN pip install airflow-docker-helper +WORKDIR /usr/local/lib/airflow-docker +COPY scripts/ scripts +COPY code/ code + WORKDIR /usr/local/lib/airflow-docker/scripts -COPY scripts/ . +ENV PYTHONPATH /usr/local/lib/airflow-docker/code ENTRYPOINT ["python"] diff --git a/images/example-tasks/code/call.py b/images/example-tasks/code/call.py new file mode 100644 index 0000000..a5572ab --- /dev/null +++ b/images/example-tasks/code/call.py @@ -0,0 +1,3 @@ +def call_me(*args, **kwargs): + print(args) + print(kwargs) diff --git a/images/example-tasks/code/xcom.py b/images/example-tasks/code/xcom.py new file mode 100644 index 0000000..f72146b --- /dev/null +++ b/images/example-tasks/code/xcom.py @@ -0,0 +1,13 @@ +import json + + +def push(data_to_push): + return data_to_push + + +def pull(xcom_data): + print(xcom_data) + print(type(xcom_data)) + data = json.loads(xcom_data) + for item in data: + print("Got: {}".format(item)) diff --git a/images/example-tasks/scripts/xcom_push.py b/images/example-tasks/scripts/xcom_push.py index a09e6bd..ffac370 100644 --- a/images/example-tasks/scripts/xcom_push.py +++ b/images/example-tasks/scripts/xcom_push.py @@ -1,5 +1,4 @@ from airflow_docker_helper import client -import sys client.xcom_push(key="foo", value={"bar": "baz"}) client.xcom_push(key="foo2", value={"bar2": "baz2"}) diff --git a/images/service/airflow.cfg b/images/service/airflow.cfg index aad8202..5589027 100644 --- a/images/service/airflow.cfg +++ b/images/service/airflow.cfg @@ -652,3 +652,4 @@ tolerations = extension_paths = airflow_docker.ext.environment_preset:EnvironmentPresetExtension airflow_docker.ext.aws.role_assumption:AWSRoleAssumptionExtension + airflow_docker.ext.python_callable:PythonCallableExtension diff --git a/images/service/deps/requirements.txt b/images/service/deps/requirements.txt index 360e9ff..f7a4e80 100644 --- a/images/service/deps/requirements.txt +++ b/images/service/deps/requirements.txt @@ -108,4 +108,4 @@ zipp==0.6.0 zope.deprecation==4.4.0 # The following packages are considered to be unsafe in a requirements file: -# setuptools==41.4.0 # via jsonschema, python-daemon, zope.deprecation +# setuptools==42.0.0 # via jsonschema, python-daemon, zope.deprecation diff --git a/release.rst b/release.rst new file mode 100644 index 0000000..46236e8 --- /dev/null +++ b/release.rst @@ -0,0 +1,12 @@ +RELEASE_TYPE: major + +* Replaced various dev requirements files with a single set of `dev` dependencies +* Removed host/container volume mounts and replaced them with pure docker volumes +* Added docker-based worker/container two-way communication +* Added an extension equivalent to the PythonOperator, for running a python callable in a user provided image. +* Updated `do_meta_operation` implementations to work with existing `airflow-docker-helper` and `python_callable` based mechanisms for two-way communication + +Authors: + +* Hunter Senft-Grupp + diff --git a/setup.py b/setup.py index 9d56c3d..eff6db7 100644 --- a/setup.py +++ b/setup.py @@ -49,9 +49,6 @@ def parse_requirements(filename): python_requires=">=3,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*", install_requires=parse_requirements("deps/requirements.in"), extras_require={ - "testing": parse_requirements("deps/testing-requirements.in"), - "docs": parse_requirements("deps/docs-requirements.in"), - "linting": parse_requirements("deps/linting-requirements.in"), "dev": parse_requirements("deps/dev-requirements.in"), }, entry_points={ diff --git a/src/airflow_docker/__init__.py b/src/airflow_docker/__init__.py index d5bcf30..94e2039 100644 --- a/src/airflow_docker/__init__.py +++ b/src/airflow_docker/__init__.py @@ -16,6 +16,7 @@ __version__ = "1.1.3" import airflow + from airflow_docker.operator import ( BranchOperator, Operator, diff --git a/src/airflow_docker/constants.py b/src/airflow_docker/constants.py new file mode 100644 index 0000000..78be096 --- /dev/null +++ b/src/airflow_docker/constants.py @@ -0,0 +1,25 @@ +import airflow_docker_helper + +META_PATH_DIR = airflow_docker_helper.META_PATH_DIR +"""This is the legacy subdirectory where airflow-docker run time files used to live.""" + +CONTAINER_RUN_DIR = "/var/run/airflow-docker" +"""The current reserved run time directory, mounted as a volume in running containers.""" + +BRANCH_OPERATOR_FILENAME = airflow_docker_helper.BRANCH_OPERATOR_FILENAME +"""The non-python callable filename for the branch operator result.""" + +SHORT_CIRCUIT_OPERATOR_FILENAME = airflow_docker_helper.SHORT_CIRCUIT_OPERATOR_FILENAME +"""The non-python callable filename for the branch operator result.""" + +SENSOR_OPERATOR_FILENAME = airflow_docker_helper.SENSOR_OPERATOR_FILENAME +"""The non-python callable filename for the sensor operator result.""" + +CONTEXT_FILENAME = airflow_docker_helper.CONTEXT_FILENAME +"""The filename for the serialized context info.""" + +XCOM_PUSH_FILENAME = airflow_docker_helper.XCOM_PUSH_FILENAME +"""The filename for runtime xcom pushed data.""" + +RESULT_FILENAME = "result.json" +"""The python callable filename for the return result.""" diff --git a/src/airflow_docker/context.py b/src/airflow_docker/context.py new file mode 100644 index 0000000..c872d25 --- /dev/null +++ b/src/airflow_docker/context.py @@ -0,0 +1,47 @@ +def serialize_context(context): + return { + "dag": serialize_dag(context["dag"]), + "ds": context["ds"], + "next_ds": context["next_ds"], + "next_ds_nodash": context["next_ds_nodash"], + "prev_ds": context["prev_ds"], + "prev_ds_nodash": context["prev_ds_nodash"], + "ds_nodash": context["ds_nodash"], + "ts": context["ts"], + "ts_nodash": context["ts_nodash"], + "ts_nodash_with_tz": context["ts_nodash_with_tz"], + "yesterday_ds": context["yesterday_ds"], + "yesterday_ds_nodash": context["yesterday_ds_nodash"], + "tomorrow_ds": context["tomorrow_ds"], + "tomorrow_ds_nodash": context["tomorrow_ds_nodash"], + "END_DATE": context["END_DATE"], + "end_date": context["end_date"], + "dag_run": serialize_dag_run(context["dag_run"]), + "run_id": context["run_id"], + "execution_date": context["execution_date"].isoformat(), + "prev_execution_date": context["prev_execution_date"].isoformat(), + "next_execution_date": context["next_execution_date"].isoformat(), + "latest_date": context["latest_date"], + "params": context["params"], + "task": serialize_task(context["task"]), + "task_instance": serialize_task_instance(context["task_instance"]), + "ti": serialize_task_instance(context["ti"]), + "task_instance_key_str": context["task_instance_key_str"], + "test_mode": context["test_mode"], + } + + +def serialize_dag(dag): + return {} + + +def serialize_dag_run(dag_run): + return {} + + +def serialize_task(task): + return {} + + +def serialize_task_instance(task_instance): + return {} diff --git a/src/airflow_docker/ext/python_callable.py b/src/airflow_docker/ext/python_callable.py new file mode 100644 index 0000000..0b85721 --- /dev/null +++ b/src/airflow_docker/ext/python_callable.py @@ -0,0 +1,80 @@ +import base64 +import json +import textwrap + +from airflow.utils.db import provide_session + +from airflow_docker.constants import CONTAINER_RUN_DIR +from airflow_docker.context import serialize_context + +PYTHON_ENTRYPOINT = textwrap.dedent( + """\ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import base64 +import json +import {python_import_path} + +args = json.loads(base64.b64decode("{args}".encode('ascii'))) +kwargs = json.loads(base64.b64decode("{kwargs}".encode('ascii'))) +result = {python_import_path}.{callable}(*args, **kwargs) +with open('{container_run_dir}/result.json', 'w') as f: + json.dump(result, f) +""" +) + + +class PythonCallableExtension: + kwargs = {"python_callable", "op_args", "op_kwargs"} + + @classmethod + @provide_session + def post_prepare_environment( + cls, operator, config, context, host_tmp_dir, session=None + ): + python_callable = operator.extra_kwargs.get("python_callable", None) + args = operator.extra_kwargs.get("op_args", []) + op_kwargs = operator.extra_kwargs.get("op_kwargs", {}) + + if operator.provide_context: + context = serialize_context(context) + context.update(op_kwargs) + kwargs = context + else: + kwargs = op_kwargs + + if operator.command is None and python_callable is not None: + operator.log.info( + "Preparing entry point script to call: {}".format(python_callable) + ) + module, callable = python_callable.split(":") + entrypoint = PYTHON_ENTRYPOINT.format( + python_import_path=module, + callable=callable, + args=base64.b64encode(json.dumps(args).encode("utf-8")).decode("ascii"), + kwargs=base64.b64encode(json.dumps(kwargs).encode("utf-8")).decode( + "ascii" + ), + container_run_dir=CONTAINER_RUN_DIR, + ) + operator.container_data["entrypoint.py"] = entrypoint.encode("utf-8") + operator.log.debug( + "Set the entrypoint.py script: \n{}".format( + operator.container_data["entrypoint.py"] + ) + ) + + if operator.entrypoint is None: + operator.entrypoint = "/usr/bin/env python" + operator.log.info( + "Set the docker entrypoint: {}".format(operator.entrypoint) + ) + + operator.command = json.dumps(["/var/run/airflow-docker/entrypoint.py"]) + operator.log.info("Set the docker command: {}".format(operator.command)) + + else: + if any([python_callable, args, op_kwargs]): + raise ValueError( + "It appears you are trying to use the PythonCallExtension. You must pass 'call' to the operator." + ) diff --git a/src/airflow_docker/operator.py b/src/airflow_docker/operator.py index 383602b..e9d8d90 100644 --- a/src/airflow_docker/operator.py +++ b/src/airflow_docker/operator.py @@ -35,23 +35,36 @@ # specific language governing permissions and limitations # under the License. import ast +import io import json +import os -import airflow.configuration as conf -import airflow_docker_helper import six -from airflow.exceptions import AirflowConfigException, AirflowException +from airflow.exceptions import AirflowException from airflow.hooks.docker_hook import DockerHook from airflow.models import BaseOperator, SkipMixin from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults -from airflow.utils.file import TemporaryDirectory -from airflow_docker.conf import get_boolean_default, get_default -from airflow_docker.ext import delegate_to_extensions, register_extensions -from airflow_docker.utils import get_config from docker import APIClient, tls -DEFAULT_HOST_TEMPORARY_DIRECTORY = "/tmp/airflow" +from airflow_docker.conf import get_boolean_default, get_default +from airflow_docker.constants import ( + BRANCH_OPERATOR_FILENAME, + CONTAINER_RUN_DIR, + CONTEXT_FILENAME, + META_PATH_DIR, + RESULT_FILENAME, + SENSOR_OPERATOR_FILENAME, + SHORT_CIRCUIT_OPERATOR_FILENAME, + XCOM_PUSH_FILENAME, +) +from airflow_docker.context import serialize_context +from airflow_docker.ext import delegate_to_extensions, register_extensions +from airflow_docker.utils import ( + get_config, + make_tar_data_stream, + process_tar_data_stream, +) class ShortCircuitMixin(SkipMixin): @@ -111,12 +124,6 @@ class BaseDockerOperator(object): """ Execute a command inside a docker container. - A temporary directory is created on the host and - mounted into a container to allow storing files - that together exceed the default disk size of 10GB in a container. - The path to the mounted directory can be accessed - via the environment variable ``AIRFLOW_TMP_DIR``. - If a login to a private registry is required prior to pulling the image, a Docker connection needs to be configured in Airflow and the connection ID be provided with the parameter ``docker_conn_id``. @@ -272,9 +279,11 @@ def __init__( self.shm_size = shm_size self.provide_context = provide_context + self.container_data = {} + """A mapping of filenames to file data in bytes.""" + self.cli = None self.container = None - self._host_client = None # Shim for attaching a test client def get_hook(self): return DockerHook( @@ -303,63 +312,73 @@ def _execute(self, context): if "status" in output: self.log.info("%s", output["status"]) - with TemporaryDirectory( - prefix="airflowtmp", dir=self.host_tmp_base_dir - ) as host_tmp_dir: - self.environment["AIRFLOW_TMP_DIR"] = self.tmp_dir - additional_volumes = ["{0}:{1}".format(host_tmp_dir, self.tmp_dir)] - - # Hook for creating mounted meta directories - self.prepare_host_tmp_dir(context, host_tmp_dir) - self.prepare_environment(context, host_tmp_dir) - - if self.provide_context: - self.write_context(context, host_tmp_dir) - - self.container = self.cli.create_container( - command=self.get_command(), - entrypoint=self.entrypoint, - environment=self.environment, - host_config=self.cli.create_host_config( - auto_remove=self.auto_remove, - binds=self.volumes + additional_volumes, - network_mode=self.network_mode, - shm_size=self.shm_size, - dns=self.dns, - dns_search=self.dns_search, - cpu_shares=int(round(self.cpus * 1024)), - mem_limit=self.mem_limit, - ), - image=self.image, - user=self.user, - working_dir=self.working_dir, + self.environment["AIRFLOW_TMP_DIR"] = self.tmp_dir + additional_volumes = [ + os.path.join(self.tmp_dir, META_PATH_DIR), + CONTAINER_RUN_DIR, + ] + + self.prepare_environment(context) + + if self.provide_context: + self.container_data[CONTEXT_FILENAME] = json.dumps( + serialize_context(context) + ).encode("utf-8") + + self.container = self.cli.create_container( + command=self.get_command(), + entrypoint=self.entrypoint, + environment=self.environment, + host_config=self.cli.create_host_config( + auto_remove=False, # The operator implementation will be responsible for removing the container + binds=self.volumes + additional_volumes, + network_mode=self.network_mode, + shm_size=self.shm_size, + dns=self.dns, + dns_search=self.dns_search, + cpu_shares=int(round(self.cpus * 1024)), + mem_limit=self.mem_limit, + ), + image=self.image, + user=self.user, + working_dir=self.working_dir, + ) + + self.log.info("Container created: %s", self.container["Id"]) + + self.put_container_data(root=CONTAINER_RUN_DIR, spec=self.container_data) + + self.put_container_data( + root=os.path.join(self.tmp_dir, META_PATH_DIR), spec=self.container_data + ) + + self.cli.start(self.container["Id"]) + + line = "" + for line in self.cli.logs(container=self.container["Id"], stream=True): + line = line.strip() + if hasattr(line, "decode"): + line = line.decode("utf-8") + self.log.info(line) + + result = self.cli.wait(self.container["Id"]) + if result["StatusCode"] != 0: + raise AirflowException("docker container failed: " + repr(result)) + + self.gather_container_data(path=os.path.join(self.tmp_dir, META_PATH_DIR)) + self.gather_container_data(path=CONTAINER_RUN_DIR) + + for row in self.get_xcom_data(): + self.xcom_push(context, key=row["key"], value=row["value"]) + + if self.xcom_push_flag: + return ( + self.cli.logs(container=self.container["Id"]) + if self.xcom_all + else str(line) ) - self.cli.start(self.container["Id"]) - - line = "" - for line in self.cli.logs(container=self.container["Id"], stream=True): - line = line.strip() - if hasattr(line, "decode"): - line = line.decode("utf-8") - self.log.info(line) - - result = self.cli.wait(self.container["Id"]) - if result["StatusCode"] != 0: - raise AirflowException("docker container failed: " + repr(result)) - - # Move the in-container xcom-pushes into airflow. - result = self.host_client.get_xcom_push_data(host_tmp_dir) - for row in result: - self.xcom_push(context, key=row["key"], value=row["value"]) - - if self.xcom_push_flag: - return ( - self.cli.logs(container=self.container["Id"]) - if self.xcom_all - else str(line) - ) - return self.do_meta_operation(context, host_tmp_dir) + return self.do_meta_operation(context) def get_command(self): if self.command is not None and self.command.strip().find("[") == 0: @@ -372,6 +391,11 @@ def on_kill(self): if self.cli is not None: self.log.info("Stopping docker container") self.cli.stop(self.container["Id"]) + self.maybe_remove() + + def maybe_remove(self): + if self.cli is not None and self.auto_remove: + self.cli.remove_container(self.container["Id"], v=True) def __get_tls_config(self): tls_config = None @@ -386,37 +410,43 @@ def __get_tls_config(self): self.docker_url = self.docker_url.replace("tcp://", "https://") return tls_config - def do_meta_operation(self, context, host_tmp_dir): - pass + def do_meta_operation(self, context): + return _maybe_decode_keys( + data=self.container_data, keys=[RESULT_FILENAME], default=None + ) - def prepare_environment(self, context, host_tmp_dir): - delegate_to_extensions(self, "post_prepare_environment", context, host_tmp_dir) + def prepare_environment(self, context): + delegate_to_extensions(self, "post_prepare_environment", context, None) - def prepare_host_tmp_dir(self, context, host_tmp_dir): - self.host_client.make_meta_dir(host_tmp_dir) - host_meta_dir = airflow_docker_helper.get_host_meta_path(host_tmp_dir) - self.log.info("Making host meta dir: {}".format(host_meta_dir)) + @staticmethod + def get_config(): + return get_config() - def write_context(self, context, host_tmp_dir): - self.host_client.write_context(context, host_tmp_dir) + def put_container_data(self, root, spec): + tar_stream = make_tar_data_stream(tar_spec=spec) - @property - def host_tmp_base_dir(self): - try: - return conf.get("worker", "host_temporary_directory") - except AirflowConfigException: - return DEFAULT_HOST_TEMPORARY_DIRECTORY + return self.cli.put_archive( + container=self.container["Id"], path=root, data=tar_stream, + ) - def host_meta_dir(self, context, host_tmp_dir): - return airflow_docker_helper.get_host_meta_path(host_tmp_dir) + def gather_container_data(self, path): + data_stream, info = self.cli.get_archive( + container=self.container["Id"], path=path, + ) + data = process_tar_data_stream(data_stream, path) + self.container_data.update(data) - @property - def host_client(self): - return self._host_client or airflow_docker_helper.host + def get_xcom_data(self): + if XCOM_PUSH_FILENAME not in self.container_data: + return [] - @staticmethod - def get_config(): - return get_config() + result = self.container_data[XCOM_PUSH_FILENAME] + result = [ + json.loads(row.decode("utf-8").strip()) + for row in result.split(b"\n") + if row.decode("utf-8").strip() + ] + return result class Operator(BaseDockerOperator, BaseOperator): @@ -426,17 +456,45 @@ def execute(self, context): class Sensor(BaseDockerOperator, BaseSensorOperator): def poke(self, context): + # In mode=poke, this operator will hold state in between each run. + # We should clear out the sensor result before running. + self.reset_sensor() return self._execute(context) - def do_meta_operation(self, context, host_tmp_dir): - return self.host_client.sensor_outcome(host_tmp_dir) + def reset_sensor(self): + for key in [RESULT_FILENAME, SENSOR_OPERATOR_FILENAME]: + if key in self.container_data: + del self.container_data[key] + + def do_meta_operation(self, context): + return _maybe_decode_keys( + data=self.container_data, + keys=[RESULT_FILENAME, SENSOR_OPERATOR_FILENAME], + default=False, + ) class ShortCircuitOperator(ShortCircuitMixin, Operator): - def do_meta_operation(self, context, host_tmp_dir): - return self.host_client.short_circuit_outcome(host_tmp_dir) + def do_meta_operation(self, context): + return _maybe_decode_keys( + data=self.container_data, + keys=[RESULT_FILENAME, SHORT_CIRCUIT_OPERATOR_FILENAME], + default=True, + ) class BranchOperator(BranchMixin, Operator): - def do_meta_operation(self, context, host_tmp_dir): - return self.host_client.branch_task_ids(host_tmp_dir) + def do_meta_operation(self, context): + return _maybe_decode_keys( + data=self.container_data, + keys=[RESULT_FILENAME, BRANCH_OPERATOR_FILENAME], + default=[], + ) + + +def _maybe_decode_keys(data, keys, default=None): + for key in keys: + if key in data: + return json.loads(data[key].decode('utf-8')) + else: + return default diff --git a/src/airflow_docker/plugin.py b/src/airflow_docker/plugin.py index 31a545d..c0faedd 100644 --- a/src/airflow_docker/plugin.py +++ b/src/airflow_docker/plugin.py @@ -1,4 +1,5 @@ from airflow.plugins_manager import AirflowPlugin + from airflow_docker.operator import ( BranchOperator, Operator, diff --git a/src/airflow_docker/utils.py b/src/airflow_docker/utils.py index 05b6411..9b02e77 100644 --- a/src/airflow_docker/utils.py +++ b/src/airflow_docker/utils.py @@ -1,6 +1,9 @@ import functools +import io import json import os +import tarfile +import time from airflow import configuration from airflow.models import Variable @@ -29,3 +32,37 @@ def get_env(): """Get the name of the environment for the current airflow environment. """ return Variable.get("env") + + +def make_tar_data_stream(tar_spec): + tar_stream = io.BytesIO() + with tarfile.TarFile(fileobj=tar_stream, mode="w") as tar: + for key, bytes_data in tar_spec.items(): + tarinfo = tarfile.TarInfo(name=key) + tarinfo.size = len(bytes_data) + tarinfo.mtime = time.time() + tar.addfile(tarinfo, io.BytesIO(bytes_data)) + + tar_stream.seek(0) + + return tar_stream + + +def process_tar_data_stream(tar_data_stream, root): + data_file = io.BytesIO() + for chunk in tar_data_stream: + data_file.write(chunk) + data_file.seek(0) + + root, base = os.path.split(root) + base = f"{base}/" + data = {} + + with tarfile.open(mode="r", fileobj=data_file) as t: + for filename in t.getnames(): + file = t.extractfile(filename) + if file: + key = filename.split(base, 1)[-1] # Gets the path after the first root + data[key] = file.read() + file.close() + return data diff --git a/src/airflow_docker/views/config.py b/src/airflow_docker/views/config.py index 3ba3b08..80e69ff 100644 --- a/src/airflow_docker/views/config.py +++ b/src/airflow_docker/views/config.py @@ -1,14 +1,15 @@ import json from airflow.plugins_manager import AirflowPlugin -from airflow_docker.utils import get_config -from airflow_docker.views import template_folder from flask import Blueprint from flask_admin import BaseView, expose from flask_appbuilder import BaseView as AppBuilderBaseView from pygments import highlight, lexers from pygments.formatters import HtmlFormatter +from airflow_docker.utils import get_config +from airflow_docker.views import template_folder + def render_config(self): config = get_config() diff --git a/src/airflow_docker/views/extensions.py b/src/airflow_docker/views/extensions.py index 0c7c227..efbdf72 100644 --- a/src/airflow_docker/views/extensions.py +++ b/src/airflow_docker/views/extensions.py @@ -1,12 +1,13 @@ import json from airflow.plugins_manager import AirflowPlugin -from airflow_docker.operator import BaseDockerOperator -from airflow_docker.views import template_folder from flask import Blueprint from flask_admin import BaseView, expose from flask_appbuilder import BaseView as AppBuilderBaseView +from airflow_docker.operator import BaseDockerOperator +from airflow_docker.views import template_folder + def render_extensions(self): extensions_classes = BaseDockerOperator._extensions diff --git a/tox.ini b/tox.ini index 0670779..fb2756f 100644 --- a/tox.ini +++ b/tox.ini @@ -10,8 +10,8 @@ basepython = py3: python3.6 commands = - pip install -r deps/testing-requirements.txt - pip install airflow-docker[testing] + pip install -r deps/dev-requirements.txt + pip install airflow-docker[dev] pytest --cov airflow_docker --cov-report= -v {posargs:} tests/unit setenv = @@ -36,7 +36,7 @@ setenv = [testenv:lint] basepython = python3.6 commands = - pip install airflow-docker[linting] + pip install airflow-docker[dev] flake8 tests/ src/ isort -rc -c src/airflow_docker/ tests/ black --diff --check src/airflow_docker/ tests/ @@ -48,7 +48,7 @@ setenv = [testenv:format] basepython = python3.6 commands = - pip install airflow-docker[linting] + pip install airflow-docker[dev] isort -rc src/airflow_docker/ tests/ black src/airflow_docker/ tests/ setenv = From 8d2f13a60ac1cc6de315551e0e26ed645bcf1a3e Mon Sep 17 00:00:00 2001 From: Hunter Senft-Grupp Date: Sun, 24 Nov 2019 15:10:35 -0500 Subject: [PATCH 2/4] Updated aws role assumption to work as expected. --- src/airflow_docker/ext/aws/role_assumption.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/airflow_docker/ext/aws/role_assumption.py b/src/airflow_docker/ext/aws/role_assumption.py index 756cdf0..2313374 100644 --- a/src/airflow_docker/ext/aws/role_assumption.py +++ b/src/airflow_docker/ext/aws/role_assumption.py @@ -3,9 +3,12 @@ import boto3 +from airflow_docker.constants import CONTAINER_RUN_DIR + MAX_ROLE_DURATION_SECONDS = 60 * 60 * 12 # 12 hours in seconds MAX_ROLE_SESSION_NAME_LENGTH = 64 ROLE_SESSION_NAME_TEMPLATE = "{dag_run_id}__{task_instance_try}__{task_id}" +AWS_CREDENTIALS_FILENAME = "aws-credentials" credential_key_map = { "aws_access_key_id": "AccessKeyId", @@ -67,13 +70,6 @@ def get_credentials(context, role_arn, role_session_duration=MAX_ROLE_DURATION_S return raw_credentials -def write_credentials(credentials, credentials_path): - credentials_file_contents = aws_credentials_file_format(**credentials) - os.makedirs(os.path.dirname(credentials_path), exist_ok=True) - with open(credentials_path, "wb") as f: - f.write(credentials_file_contents.encode("utf-8")) - - def find_role_session_duration(role_arn): iam = boto3.client("iam") response = iam.get_role(RoleName=parse_role_name_from_arn(role_arn)) @@ -134,9 +130,8 @@ def post_prepare_environment( operator.log.info("Assuming role: {}".format(role_arn)) - host_credentials_path = os.path.join(host_tmp_dir, ".aws", "credentials") container_credentials_path = os.path.join( - operator.tmp_dir, ".aws", "credentials" + CONTAINER_RUN_DIR, AWS_CREDENTIALS_FILENAME ) raw_credentials = get_credentials( @@ -146,9 +141,10 @@ def post_prepare_environment( ) log_credentials(operator, raw_credentials) credentials = format_credentials_data(raw_credentials) - write_credentials( - credentials=credentials, credentials_path=host_credentials_path - ) + credentials_file_contents = aws_credentials_file_format(**credentials) + operator.container_data[ + AWS_CREDENTIALS_FILENAME + ] = credentials_file_contents.encode("utf-8") operator.environment[ "AWS_SHARED_CREDENTIALS_FILE" From b3edcc90177f0bcceb4a42320edd3dbfd10926bb Mon Sep 17 00:00:00 2001 From: Hunter Senft-Grupp Date: Sun, 24 Nov 2019 14:54:13 -0500 Subject: [PATCH 3/4] Updated all source code files with proper license statement --- src/airflow_docker/__init__.py | 2 +- src/airflow_docker/conf.py | 15 +++++++++++++++ src/airflow_docker/constants.py | 15 +++++++++++++++ src/airflow_docker/context.py | 15 +++++++++++++++ src/airflow_docker/ext/__init__.py | 15 +++++++++++++++ src/airflow_docker/ext/aws/__init__.py | 15 +++++++++++++++ src/airflow_docker/ext/aws/role_assumption.py | 15 +++++++++++++++ src/airflow_docker/ext/environment_preset.py | 15 +++++++++++++++ src/airflow_docker/ext/python_callable.py | 15 +++++++++++++++ src/airflow_docker/operator.py | 2 +- src/airflow_docker/plugin.py | 15 +++++++++++++++ src/airflow_docker/utils.py | 15 +++++++++++++++ src/airflow_docker/views/__init__.py | 15 +++++++++++++++ src/airflow_docker/views/config.py | 15 +++++++++++++++ src/airflow_docker/views/extensions.py | 15 +++++++++++++++ 15 files changed, 197 insertions(+), 2 deletions(-) diff --git a/src/airflow_docker/__init__.py b/src/airflow_docker/__init__.py index 94e2039..77a30e6 100644 --- a/src/airflow_docker/__init__.py +++ b/src/airflow_docker/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright 2019 Hunter Senft-Grupp +# Copyright 2019 Contributing Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/airflow_docker/conf.py b/src/airflow_docker/conf.py index d51b9d8..53f7c50 100644 --- a/src/airflow_docker/conf.py +++ b/src/airflow_docker/conf.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. def get_boolean_default(key, default): import airflow.configuration as conf diff --git a/src/airflow_docker/constants.py b/src/airflow_docker/constants.py index 78be096..122226a 100644 --- a/src/airflow_docker/constants.py +++ b/src/airflow_docker/constants.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import airflow_docker_helper META_PATH_DIR = airflow_docker_helper.META_PATH_DIR diff --git a/src/airflow_docker/context.py b/src/airflow_docker/context.py index c872d25..61c0b5f 100644 --- a/src/airflow_docker/context.py +++ b/src/airflow_docker/context.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. def serialize_context(context): return { "dag": serialize_dag(context["dag"]), diff --git a/src/airflow_docker/ext/__init__.py b/src/airflow_docker/ext/__init__.py index f5e4305..88b4c66 100644 --- a/src/airflow_docker/ext/__init__.py +++ b/src/airflow_docker/ext/__init__.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import functools import importlib diff --git a/src/airflow_docker/ext/aws/__init__.py b/src/airflow_docker/ext/aws/__init__.py index e69de29..fe10e6f 100644 --- a/src/airflow_docker/ext/aws/__init__.py +++ b/src/airflow_docker/ext/aws/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/airflow_docker/ext/aws/role_assumption.py b/src/airflow_docker/ext/aws/role_assumption.py index 2313374..218bfba 100644 --- a/src/airflow_docker/ext/aws/role_assumption.py +++ b/src/airflow_docker/ext/aws/role_assumption.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import os import textwrap diff --git a/src/airflow_docker/ext/environment_preset.py b/src/airflow_docker/ext/environment_preset.py index 1798032..fbc0c47 100644 --- a/src/airflow_docker/ext/environment_preset.py +++ b/src/airflow_docker/ext/environment_preset.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import functools from airflow.models import Variable diff --git a/src/airflow_docker/ext/python_callable.py b/src/airflow_docker/ext/python_callable.py index 0b85721..fd02517 100644 --- a/src/airflow_docker/ext/python_callable.py +++ b/src/airflow_docker/ext/python_callable.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import base64 import json import textwrap diff --git a/src/airflow_docker/operator.py b/src/airflow_docker/operator.py index e9d8d90..02745a4 100644 --- a/src/airflow_docker/operator.py +++ b/src/airflow_docker/operator.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright 2019 Hunter Senft-Grupp +# Copyright 2019 Contributing Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/airflow_docker/plugin.py b/src/airflow_docker/plugin.py index c0faedd..abe0180 100644 --- a/src/airflow_docker/plugin.py +++ b/src/airflow_docker/plugin.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.plugins_manager import AirflowPlugin from airflow_docker.operator import ( diff --git a/src/airflow_docker/utils.py b/src/airflow_docker/utils.py index 9b02e77..42a89b7 100644 --- a/src/airflow_docker/utils.py +++ b/src/airflow_docker/utils.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import functools import io import json diff --git a/src/airflow_docker/views/__init__.py b/src/airflow_docker/views/__init__.py index 3234f94..5d57a6c 100644 --- a/src/airflow_docker/views/__init__.py +++ b/src/airflow_docker/views/__init__.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import pkg_resources template_folder = pkg_resources.resource_filename("airflow_docker", "views/templates") diff --git a/src/airflow_docker/views/config.py b/src/airflow_docker/views/config.py index 80e69ff..908d7ad 100644 --- a/src/airflow_docker/views/config.py +++ b/src/airflow_docker/views/config.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import json from airflow.plugins_manager import AirflowPlugin diff --git a/src/airflow_docker/views/extensions.py b/src/airflow_docker/views/extensions.py index efbdf72..a97f17a 100644 --- a/src/airflow_docker/views/extensions.py +++ b/src/airflow_docker/views/extensions.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2019 Contributing Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import json from airflow.plugins_manager import AirflowPlugin From 516e7337d49c3cb6202cf2446463d7f20c2eac92 Mon Sep 17 00:00:00 2001 From: Hunter Senft-Grupp Date: Sun, 24 Nov 2019 14:56:12 -0500 Subject: [PATCH 4/4] Remove extranous import --- src/airflow_docker/operator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/airflow_docker/operator.py b/src/airflow_docker/operator.py index 02745a4..738c7e8 100644 --- a/src/airflow_docker/operator.py +++ b/src/airflow_docker/operator.py @@ -35,7 +35,6 @@ # specific language governing permissions and limitations # under the License. import ast -import io import json import os