update data processing jobs to use structured data functions

This commit is contained in:
ILay
2026-04-27 18:52:34 +02:00
parent bdfbe3d310
commit b58e399130

View File

@@ -1,8 +1,8 @@
from dagster import job from dagster import job
from util_services.util_ops import ( from util_services.util_ops import (
preview_dataframe, preview_dataframe,
read_csv_from_s3, read_structured_from_s3,
write_csv_to_s3, write_df_to_s3,
) )
from .ops import ( from .ops import (
remove_duplicates, remove_duplicates,
@@ -21,10 +21,10 @@ from .ops import (
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def remove_duplicates_job_s3(): def remove_duplicates_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = remove_duplicates(org_df) anon_df = remove_duplicates(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@@ -33,10 +33,10 @@ def remove_duplicates_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def fill_missing_values_job_s3(): def fill_missing_values_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = fill_missing_values(org_df) anon_df = fill_missing_values(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@@ -45,10 +45,10 @@ def fill_missing_values_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def standardize_categorical_values_job_s3(): def standardize_categorical_values_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = standardize_categorical_values(org_df) anon_df = standardize_categorical_values(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@@ -57,10 +57,10 @@ def standardize_categorical_values_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def correct_typos_job_s3(): def correct_typos_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = correct_typos(org_df) anon_df = correct_typos(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@job(tags={ @job(tags={
@@ -68,10 +68,10 @@ def correct_typos_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def normalize_numeric_min_max_job_s3(): def normalize_numeric_min_max_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = normalize_numeric_min_max(org_df) anon_df = normalize_numeric_min_max(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@job(tags={ @job(tags={
@@ -79,10 +79,10 @@ def normalize_numeric_min_max_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def normalize_datetime_job_s3(): def normalize_datetime_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = normalize_datetime(org_df) anon_df = normalize_datetime(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@job(tags={ @job(tags={
@@ -90,10 +90,10 @@ def normalize_datetime_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def normalize_coordinates_job_s3(): def normalize_coordinates_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = normalize_coordinates(org_df) anon_df = normalize_coordinates(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@job(tags={ @job(tags={
@@ -101,10 +101,10 @@ def normalize_coordinates_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def add_global_aggregations_job_s3(): def add_global_aggregations_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = add_global_aggregations(org_df) anon_df = add_global_aggregations(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)
@job(tags={ @job(tags={
@@ -112,8 +112,8 @@ def add_global_aggregations_job_s3():
"resource_type": "RD_DATA" "resource_type": "RD_DATA"
}) })
def filter_dataset_job_s3(): def filter_dataset_job_s3():
org_df = read_csv_from_s3() org_df = read_structured_from_s3()
anon_df = filter_dataset(org_df) anon_df = filter_dataset(org_df)
preview_dataframe(org_df) preview_dataframe(org_df)
write_csv_to_s3(anon_df) write_df_to_s3(anon_df)
preview_dataframe(anon_df) preview_dataframe(anon_df)