Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6fe9c36
test first class method
mberrien-fitzsimons Mar 6, 2025
8ce7eb9
updated variable inputs to init and individual methods
mberrien-fitzsimons Mar 7, 2025
40cac6f
added dock strings to all methods
mberrien-fitzsimons Mar 7, 2025
d550d0c
updated local path organization
mberrien-fitzsimons Mar 10, 2025
36d9651
updated way that dag is initialized
mberrien-fitzsimons Mar 10, 2025
7c4734f
added global to dag
mberrien-fitzsimons Mar 10, 2025
5d590b9
updated dag call structure
mberrien-fitzsimons Mar 10, 2025
efb510d
updated way params is called within Dag instantion
mberrien-fitzsimons Mar 10, 2025
db969c8
small update
mberrien-fitzsimons Mar 10, 2025
b335d66
updated file_sources to include .keys
mberrien-fitzsimons Mar 10, 2025
2fa3399
updated dag to using eacustomdag
mberrien-fitzsimons Mar 10, 2025
58a0332
updated param type to array
mberrien-fitzsimons Mar 10, 2025
5ec70e4
updated sharefile to snowflake dag builder script to include global e…
mberrien-fitzsimons Mar 11, 2025
929d9ad
removed global function because it did not work for explosing dag id
mberrien-fitzsimons Mar 11, 2025
f7178ac
updated s3.py in attempt to fix metadata_column bug
mberrien-fitzsimons Mar 11, 2025
9ef191f
updated class so that it would construct required folder structure
mberrien-fitzsimons Mar 12, 2025
111dc3b
updated input variable name
mberrien-fitzsimons Mar 12, 2025
248f4d6
updated way that local path is structured
mberrien-fitzsimons Mar 12, 2025
89a340b
updated where date and timestamp are instantiated
mberrien-fitzsimons Mar 12, 2025
85f91eb
updated timestamp code to use airflow runtime instead of datetime now…
mberrien-fitzsimons Mar 12, 2025
9882e64
updated timestamp to go back to original form
mberrien-fitzsimons Mar 12, 2025
9644caf
updated way filepath is created
mberrien-fitzsimons Mar 12, 2025
74e8615
save final changes to sharefile to snowflake dag before complete refa…
mberrien-fitzsimons Mar 12, 2025
01e8438
updated sharefile to snowflake dag builder class to remove unecessary…
mberrien-fitzsimons Mar 12, 2025
9a90112
updated sharefile sources
mberrien-fitzsimons Mar 13, 2025
bd8408e
re-ordered init arguments
mberrien-fitzsimons Mar 14, 2025
bd50225
update to readme documentation
mberrien-fitzsimons Mar 20, 2025
6eff93a
updated readme with yaml file showing file_sources
mberrien-fitzsimons Mar 20, 2025
df7cc14
added additional information to yaml section of readme
mberrien-fitzsimons Mar 20, 2025
32b37ff
updated s3 to snowflake task
mberrien-fitzsimons Mar 21, 2025
e791ca2
updated sharefile to snowflake dag builder
mberrien-fitzsimons Mar 21, 2025
0f4ef7c
added print statement for folder ID to see what is happening
mberrien-fitzsimons Mar 24, 2025
6f57bf6
updated transfer_to_s3
mberrien-fitzsimons Mar 24, 2025
344ada0
removed partial method
mberrien-fitzsimons Mar 24, 2025
950ca35
added controle flow for choosing tansfer from s3 to snowflake step
mberrien-fitzsimons Mar 25, 2025
3dab30f
updated sharefile to working state
mberrien-fitzsimons Mar 27, 2025
f76a18a
updated file to handle both single file and folder. testing in TX dev…
mberrien-fitzsimons Mar 27, 2025
47b0166
This was doe in order to access changes to the sharefiletodiskoperato…
mberrien-fitzsimons Mar 27, 2025
5baa75b
fixed mistake in imports
mberrien-fitzsimons Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,95 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w

</details>

## SharefileTransferToSnowflakeDagBuilder
`SharefileTransferToSnowflakeDag` is an Airflow DAG that automates the process of transferring files from ShareFile to Snowflake. The DAG retrieves files from a specified ShareFile location, optionally transforms CSV files into JSONL format, uploads the files to an S3 bucket, and finally loads the data into a Snowflake database.

<details>
<summary>Arguments:</summary>

-----

| Argument | Description |
|-------------------------|--------------------------------------------------------------------------|
| dag_id | Unique identifier for the DAG |
| airflow_default_args | Default Airflow arguments for the DAG |
| file_sources | Dictionary of file sources and their ShareFile paths (more info in Example Yaml File section) |
| local_base_path | Base path for storing downloaded files locally |
| sharefile_conn_id | Airflow connection ID for ShareFile |
| base_s3_destination_key | S3 key prefix for storing uploaded files |
| s3_conn_id | Airflow connection ID for AWS S3 |
| snowflake_conn_id | Airflow connection ID for Snowflake |
| database | Snowflake database where data will be loaded |
| schema | Snowflake schema where data will be loaded |
| full_refresh | Boolean flag indicating whether to perform a full table refresh |
| schedule_interval | DAG execution schedule (default is None) |
| transform_csv_to_jsonl | Boolean flag to convert CSV files to JSONL before uploading to S3 |
| delete_remote | Boolean flag to delete the original file from ShareFile after transfer |
| delete_local_csv | Boolean flag to delete the local CSV file after transformation |

Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwargs.

-----

</details>

<details>
<summary>Example Yaml File:</summary>

```yaml
default_args:
owner:
run_as_user:
depends_on_past:
start_date:
email:
email_on_failure: False
retries: 0
trigger_rule:
retry_delay:
execution_timeout:
sla:

connections:
s3:
s3_conn_id:
s3_bucket :
s3_region :
base_s3_destination_key:
snowflake:
snowflake_conn_id:
snowflake_stage :
snowflake_db :
snowflake_schema :
sharefile:
sharefile_conn_id:
sharefile_base_path:

variables:
tmp_dir:
local_path:
schedule_interval:
delete_remote:

file_sources:
filename1:
sharefile_path: /Path/to/document/folder/
dest_table: schema.table
truncate: True
colnames:
- col1
- col2
- col3
filename2:
sharefile_path: /Path/to/document/folder/
dest_table: schema.table
truncate: True
colnames:
- col1
- col2
- col3
```
</details>

# Providers
Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators.
Expand Down
2 changes: 1 addition & 1 deletion ea_airflow_util/dags/s3_to_snowflake_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self,
is_manual_upload: bool = False,

pool: str,
full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level
full_replace: bool = False,

do_delete_from_source: bool = True,
**kwargs
Expand Down
213 changes: 213 additions & 0 deletions ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import logging
import os
from datetime import datetime
from typing import Callable, List, Optional, Union

from airflow import DAG
from airflow.models import Param
from airflow.operators.python import PythonOperator

from edu_edfi_airflow.callables import s3

from ea_airflow_util.callables.airflow import skip_if_not_in_params_list
from ea_airflow_util.callables import jsonl, snowflake
from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator
from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator
from ea_airflow_util.callables.airflow import xcom_pull_template
from ea_airflow_util.providers.sharefile.hooks.sharefile import SharefileHook

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from ea_airflow_util import EACustomDAG

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

class SharefileTransferToSnowflakeDagBuilder:
"""
This class is responsible for creating an Apache Airflow DAG that automates the process of transferring
files from ShareFile to Snowflake.

Parameters:
- dag_id (str): ID of the DAG to be created.
- airflow_default_args (dict): Default arguments to pass to the DAG.
- file_sources (dict): A mapping of file source names to their ShareFile paths.
- local_base_path (str): Base local path for downloading files.
- sharefile_conn_id (str): Airflow connection ID for ShareFile.
- base_s3_destination_key (str): Base S3 key (path prefix) for uploads.
- s3_conn_id (str): Airflow connection ID for AWS S3.
- snowflake_conn_id (str): Airflow connection ID for Snowflake.
- database (str): Snowflake database name.
- schema (str): Snowflake schema name.
- full_refresh (bool): If True, performs a full refresh load in Snowflake.
- schedule_interval (str or timedelta, optional): Airflow schedule interval for the DAG.
- transform_csv_to_jsonl (bool, optional): If True, converts downloaded CSV files to JSONL format.
- delete_remote (bool, optional): If True, deletes the original file from ShareFile after download.
- delete_local_csv (bool, optional): If True, deletes the CSV after transforming to JSONL.
- transfer_s3_to_snowflake (BaseOperator, optional): Custom operator for the S3 to Snowflake transfer step.
- **kwargs: Additional arguments passed to Airflow operators.
"""
def __init__(self,
dag_id: str,
airflow_default_args: dict,
file_sources: dict,
local_base_path: str,
sharefile_conn_id: str,
base_s3_destination_key: str,
s3_conn_id: str,
snowflake_conn_id: str,
database: str,
schema: str,
full_refresh: bool,
schedule_interval = None,
transform_csv_to_jsonl: bool = False,
delete_remote: bool = False,
delete_local_csv: bool = False,
transfer_s3_to_snowflake = None,
**kwargs
):
Comment thread
tomreitz marked this conversation as resolved.
self.dag_id = dag_id
self.airflow_default_args = airflow_default_args
self.file_sources = file_sources
self.schedule_interval = schedule_interval

self.local_base_path = local_base_path
self.transform_csv_to_jsonl = transform_csv_to_jsonl

self.sharefile_conn_id = sharefile_conn_id
self.delete_remote = delete_remote

self.delete_local_csv = delete_local_csv

self.base_s3_destination_key = base_s3_destination_key
self.s3_conn_id = s3_conn_id

self.snowflake_conn_id = snowflake_conn_id
self.database = database
self.schema = schema
self.full_refresh = full_refresh
self.transfer_s3_to_snowflake = transfer_s3_to_snowflake

self.logger = logging.getLogger(self.__class__.__name__)

self.params_dict = {
"file_sources": Param(
default=list(self.file_sources.keys()) if self.file_sources else [],
examples=list(self.file_sources.keys()),
type="array",
description="Newline-separated list of file sources to pull from ShareFile",
),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the ShareFile API is that it's possible to fetch files in at least three ways:

  1. by path + name (my/folder/my_file.csv), returning exactly one file
  2. by GUID (abcd-efg012345678-901234567-gfedcba), returning exactly one file
  3. by path alone (/my/folder/), returning a list of all files under the path

I believe we primarily use (1) and (3) for various use-cases. (2) is probably less important.

It seems like this code does (3), I can't quite tell if it does (1). What did you test with?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this particular operator that I'm using (SharefileToDiskOperator) only takes a path. It pulls in all files in the given directory. Should I look for a way to input one file at a time? @tomreitz

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. A single path is probably fine for now, at least. This could be good to ask other DEs about, to get a sense of how existing ShareFile DAGs work (on the idea that eventually everyone would migrate their DAGs to use this class).


self.dag = EACustomDAG(dag_id=self.dag_id,
default_args=self.airflow_default_args,
schedule_interval=self.schedule_interval,
params=self.params_dict,
catchup=False
)

def build_python_operator(self,
python_callable: Callable,
**kwargs
) -> PythonOperator:
"""
Optional Python preprocessing operator to run before Earthmover and Lightbeam.

:param python_callable:
:param kwargs:
:return:
"""
callable_name = python_callable.__name__.strip('<>') # Remove brackets around lambdas
task_id = f"{self.run_type}__preprocess_python_callable__{callable_name}"

return PythonOperator(
task_id=task_id,
python_callable=python_callable,
op_kwargs=kwargs,
provide_context=True,
pool=self.pool,
dag=self.dag
)

def build_sharefile_to_snowflake_dag(self, **kwargs):
Copy link
Copy Markdown
Contributor

@tomreitz tomreitz Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be helpful if engineers could use parts of this DAG but override other parts when needed. A specific motivating example: I recently had to build a ShareFile-to-Snowflake DAG; I would be excited to use this class instead, but I'm not sure I can... because of the custom Snowfalke file_format I had to specify.

You already allow passing in of kwargs, which is good. You could leverage that to allow engineers using this class to pass in overrides for specific tasks (maybe other things too). For example, if you did

    if not transfer_s3_to_snowflake:
        transfer_s3_to_snowflake = S3ToSnowflakeOperator(
            ...

then I could use most of the class and just override the one task:

def transfer_s3_to_snowflake():
    ...
    snowflake_hook = SnowflakeHook(snowflake_conn_id=snowflake_conn_id)
    snowflake_conn = snowflake_hook.get_connection(snowflake_conn_id)
    database = snowflake_conn.extra_dejson["database"]
    schema = "..."
    endpoint = "..."
    ...
    qry_copy_into = f"""
        COPY INTO {database}.{schema}.{endpoint}
            (pull_date, pull_timestamp, file_row_number, filename, {header})
        FROM (
            SELECT
                TO_DATE('{mtime_date}') AS pull_date,
                TO_TIMESTAMP('{mtime_timestamp}') AS pull_timestamp,
                metadata$file_row_number AS file_row_number,
                metadata$filename AS filename
                ...
            FROM @{database}.util.airflow_stage/{s3_file}
            (file_format => 'csv_iso88591') t -- HERE'S THE CUSTOM FILE FORMAT...
        )
        force = true;
        """
        cursor_log = snowflake_hook.run(sql=[qry_delete, qry_copy_into], autocommit=False)

my_dag = SharefileTransferToSnowflakeDagBuilder(...)
my_dag.build_sharefile_to_snowflake_dag(transfer_s3_to_snowflake)

I'd be curious for @jayckaiser's opinion about this - it's similar, but less explicit, to how we allow custom transform scripts to be run on files in the SFTPtoSnowflake DAG class. (Oh, and do we need that functionality here too?)

"""
Builds the DAG with tasks for each file source, including:
- Conditional execution based on DAG parameters.
- Downloading from ShareFile to local disk (supports both folder and single file paths).
- Optional transformation from CSV to JSONL.
- Upload to S3.
- Load into Snowflake using the default or a custom operator.

Returns:
airflow.DAG: The constructed Airflow DAG instance.
"""
for file, details in self.file_sources.items():

check_if_file_in_param = PythonOperator(
task_id=f"check_{file}",
python_callable=skip_if_not_in_params_list,
op_kwargs={
'param_name': "file_sources",
'value': file
},
dag=self.dag,
**kwargs
)

transfer_sharefile_to_disk = SharefileToDiskOperator(
task_id=f"transfer_{file}_to_disk",
sharefile_conn_id=self.sharefile_conn_id,
sharefile_path=details['sharefile_path'],
local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file),
delete_remote=self.delete_remote,
dag=self.dag,
**kwargs
)

transform_to_jsonl = PythonOperator(
task_id=f"transform_{file}_to_jsonl",
python_callable=jsonl.translate_csv_file_to_jsonl,
op_kwargs={
'local_path': xcom_pull_template(transfer_sharefile_to_disk),
'output_path': None,
'delete_csv': self.delete_local_csv,
'metadata_dict': {'file_source': file},
},
dag=self.dag,
**kwargs
)

transfer_disk_to_s3 = PythonOperator(
task_id=f"transfer_{file}_to_s3",
python_callable=s3.local_filepath_to_s3,
op_kwargs={
'local_filepath': xcom_pull_template(transfer_sharefile_to_disk),
's3_destination_key': f"{self.base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to consider here is sharding - by tenant, or along other dimensions. If we want to load data from Sharefile paths like

  • data/district1/2024/file.csv
  • data/district1/2025/file.csv
  • data/district2/2024/file.csv
  • data/district2/2025/file.csv
  • ...

We could do that by looping over the years/tenant codes and creating a separate DAG for each. It could be nice if they were all in one DAG though, with separate tasks (or task groups?) for each tenant. Supporting this would complicate this DAG class, but could be quite useful.

One way to do it could be to allow for an optional structure like

shard_dimensions = {
     "district": ["district1", "district2", ...],
     "year": ["2024", "2025"]
}
base_s3_destination_key = "data/{district}/{year}/file.csv"

then make a task [group] per combination of dimension values, support formatted paths, and pass the shards down to other tasks:

from itertools import product
...

shard_groups = []
if shard_dimensions:
    for item in product(*shard_dimensions.values()):
        shard_groups.append(dict(zip(shard_dimensions.keys(), item)))
else: shard_groups.append({})

for shard_group in shard_groups:
    path = base_s3_destination_key.format(**shard_group)
    ...
    # and possibly pass the `shard_group` data into S3ToSnowflakeOperator()
    # via `column_names_str` and `metadata_columns`

(This isn't a requirement to add to the class, just something to think about. It could be added later.)

's3_conn_id': self.s3_conn_id
},
dag=self.dag,
**kwargs
)

if not self.transfer_s3_to_snowflake:
transfer_s3_to_snowflake = S3ToSnowflakeOperator(
task_id=f"{file}_s3_to_snowflake",
snowflake_conn_id=self.snowflake_conn_id,
database=self.database,
schema=self.schema,
table_name=file,
s3_destination_key=self.base_s3_destination_key,
full_refresh=self.full_refresh,
dag=self.dag,
**kwargs
)
else:
transfer_s3_to_snowflake = self.build_python_operator(self.transfer_s3_to_snowflake)

if self.transform_csv_to_jsonl:
check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake
else:
check_if_file_in_param >> transfer_sharefile_to_disk >> transfer_disk_to_s3 >> transfer_s3_to_snowflake

return self.dag
Loading