From 60096f840dceb11e860cfb256c7636209c16ddcc Mon Sep 17 00:00:00 2001 From: Richard Mrasek Date: Fri, 12 Jun 2026 13:29:01 +0200 Subject: [PATCH] added example for schedule, reset for demo --- src/template_code_location/jobs/jobs.py | 21 ++++++++++++++------- src/template_code_location/ops/ops.py | 4 +--- src/template_code_location/repository.py | 9 ++++++--- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/template_code_location/jobs/jobs.py b/src/template_code_location/jobs/jobs.py index 0fb52df..23401d0 100644 --- a/src/template_code_location/jobs/jobs.py +++ b/src/template_code_location/jobs/jobs.py @@ -1,12 +1,8 @@ -from dagster import job +from dagster import ScheduleDefinition, job from util_services.util_ops import preview_dataframe -from ..ops.ops import ( - fetch_data, - generate_example_dataset, - process_data, - transform_example_dataset, -) +from ..ops.ops import (fetch_data, generate_example_dataset, process_data, + transform_example_dataset) @job @@ -15,6 +11,15 @@ def data_processing_job(): raw = fetch_data() process_data(raw) +data_processing_job_daily_9am = ScheduleDefinition( + job=data_processing_job, + cron_schedule="0 9 * * *", +) + +@job +def example_job(): + original_df = generate_example_dataset() + preview_dataframe(original_df) @job def example_dataframe_demo_job(): @@ -23,3 +28,5 @@ def example_dataframe_demo_job(): preview_dataframe(original_df) transformed_df = transform_example_dataset(original_df) preview_dataframe(transformed_df) + + diff --git a/src/template_code_location/ops/ops.py b/src/template_code_location/ops/ops.py index 145a5a8..994c18b 100644 --- a/src/template_code_location/ops/ops.py +++ b/src/template_code_location/ops/ops.py @@ -34,7 +34,5 @@ def transform_example_dataset(df: pd.DataFrame) -> pd.DataFrame: df["price_band"] = df["total_price"].apply( lambda value: "high" if value >= 25 else "standard" ) - - df["total_price"] = df["quantity"] * df["unit_price"] - + return df diff --git a/src/template_code_location/repository.py b/src/template_code_location/repository.py index 718a395..5e985d0 100644 --- a/src/template_code_location/repository.py +++ b/src/template_code_location/repository.py @@ -3,18 +3,21 @@ from dataframe_level_anonymisation.jobs import (k_anonymity_job_s3, l_diversity_job_s3, t_closeness_job_s3) from template_code_location.jobs.jobs import (data_processing_job, - example_dataframe_demo_job) + data_processing_job_daily_9am, + example_dataframe_demo_job, + example_job) from util_services.custom_json_logger import simpl_json_logger from util_services.resources import s3_resource defs = Definitions( jobs=[ - #data_processing_job, - example_dataframe_demo_job, + example_job, + data_processing_job, k_anonymity_job_s3, l_diversity_job_s3, t_closeness_job_s3, ], + schedules=[data_processing_job_daily_9am], sensors=[], resources={"s3": s3_resource.configured({"resource_name": "selfS3"})}, loggers={"simpl": simpl_json_logger}, -- 2.49.1