diff --git a/src/template_code_location/jobs/jobs.py b/src/template_code_location/jobs/jobs.py index d194c3e..0fb52df 100644 --- a/src/template_code_location/jobs/jobs.py +++ b/src/template_code_location/jobs/jobs.py @@ -1,5 +1,12 @@ from dagster import job -from ..ops.ops import fetch_data, process_data +from util_services.util_ops import preview_dataframe + +from ..ops.ops import ( + fetch_data, + generate_example_dataset, + process_data, + transform_example_dataset, +) @job @@ -7,3 +14,12 @@ def data_processing_job(): """A simple job that fetches and processes data.""" raw = fetch_data() process_data(raw) + + +@job +def example_dataframe_demo_job(): + """Demonstrates generating and transforming a pandas DataFrame with previews.""" + original_df = generate_example_dataset() + preview_dataframe(original_df) + transformed_df = transform_example_dataset(original_df) + preview_dataframe(transformed_df) diff --git a/src/template_code_location/ops/ops.py b/src/template_code_location/ops/ops.py index 3d1a5e4..70f732d 100644 --- a/src/template_code_location/ops/ops.py +++ b/src/template_code_location/ops/ops.py @@ -1,3 +1,4 @@ +import pandas as pd from dagster import op @@ -11,3 +12,26 @@ def fetch_data() -> list: def process_data(data: list) -> dict: """Processes raw data and returns a summary.""" return {"count": len(data), "status": "success"} + + +@op +def generate_example_dataset() -> pd.DataFrame: + """Generates a small example dataset as a pandas DataFrame.""" + return pd.DataFrame( + [ + {"order_id": 1001, "customer": "Alice", "quantity": 2, "unit_price": 9.50}, + {"order_id": 1002, "customer": "Bob", "quantity": 1, "unit_price": 15.00}, + {"order_id": 1003, "customer": "Charlie", "quantity": 4, "unit_price": 7.25}, + {"order_id": 1004, "customer": "Dora", "quantity": 3, "unit_price": 11.00}, + ] + ) + + +@op +def transform_example_dataset(df: pd.DataFrame) -> pd.DataFrame: + """Applies a simple transformation to the example dataset.""" + + df["price_band"] = df["total_price"].apply( + lambda value: "high" if value >= 25 else "standard" + ) + return df diff --git a/src/template_code_location/repository.py b/src/template_code_location/repository.py index 036312d..298adb0 100644 --- a/src/template_code_location/repository.py +++ b/src/template_code_location/repository.py @@ -2,11 +2,11 @@ from dagster import Definitions from dataframe_level_anonymisation.jobs import (k_anonymity_job_s3, l_diversity_job_s3, t_closeness_job_s3) +from template_code_location.jobs.jobs import (data_processing_job, + example_dataframe_demo_job) from util_services.custom_json_logger import simpl_json_logger from util_services.resources import s3_resource -from template_code_location.jobs.jobs import data_processing_job - defs = Definitions( jobs=[ data_processing_job,