From b58e399130691ef93869b9d2003dbb45d4de4c5b Mon Sep 17 00:00:00 2001 From: ILay Date: Mon, 27 Apr 2026 18:52:34 +0200 Subject: [PATCH] update data processing jobs to use structured data functions --- .../data_processing/jobs.py | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/template_code_location/data_processing/jobs.py b/src/template_code_location/data_processing/jobs.py index 54fb939..674e3a1 100644 --- a/src/template_code_location/data_processing/jobs.py +++ b/src/template_code_location/data_processing/jobs.py @@ -1,8 +1,8 @@ from dagster import job from util_services.util_ops import ( preview_dataframe, - read_csv_from_s3, - write_csv_to_s3, + read_structured_from_s3, + write_df_to_s3, ) from .ops import ( remove_duplicates, @@ -21,10 +21,10 @@ from .ops import ( "resource_type": "RD_DATA" }) def remove_duplicates_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = remove_duplicates(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @@ -33,10 +33,10 @@ def remove_duplicates_job_s3(): "resource_type": "RD_DATA" }) def fill_missing_values_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = fill_missing_values(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @@ -45,10 +45,10 @@ def fill_missing_values_job_s3(): "resource_type": "RD_DATA" }) def standardize_categorical_values_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = standardize_categorical_values(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @@ -57,10 +57,10 @@ def standardize_categorical_values_job_s3(): "resource_type": "RD_DATA" }) def correct_typos_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = correct_typos(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @job(tags={ @@ -68,10 +68,10 @@ def correct_typos_job_s3(): "resource_type": "RD_DATA" }) def normalize_numeric_min_max_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = normalize_numeric_min_max(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @job(tags={ @@ -79,10 +79,10 @@ def normalize_numeric_min_max_job_s3(): "resource_type": "RD_DATA" }) def normalize_datetime_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = normalize_datetime(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @job(tags={ @@ -90,10 +90,10 @@ def normalize_datetime_job_s3(): "resource_type": "RD_DATA" }) def normalize_coordinates_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = normalize_coordinates(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @job(tags={ @@ -101,10 +101,10 @@ def normalize_coordinates_job_s3(): "resource_type": "RD_DATA" }) def add_global_aggregations_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = add_global_aggregations(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df) @job(tags={ @@ -112,8 +112,8 @@ def add_global_aggregations_job_s3(): "resource_type": "RD_DATA" }) def filter_dataset_job_s3(): - org_df = read_csv_from_s3() + org_df = read_structured_from_s3() anon_df = filter_dataset(org_df) preview_dataframe(org_df) - write_csv_to_s3(anon_df) + write_df_to_s3(anon_df) preview_dataframe(anon_df)