added example for schedule, reset for demo
All checks were successful
Check Deleted Workflows / check-deleted-workflows (pull_request) Successful in 56s
All checks were successful
Check Deleted Workflows / check-deleted-workflows (pull_request) Successful in 56s
This commit is contained in:
@@ -1,12 +1,8 @@
|
|||||||
from dagster import job
|
from dagster import ScheduleDefinition, job
|
||||||
from util_services.util_ops import preview_dataframe
|
from util_services.util_ops import preview_dataframe
|
||||||
|
|
||||||
from ..ops.ops import (
|
from ..ops.ops import (fetch_data, generate_example_dataset, process_data,
|
||||||
fetch_data,
|
transform_example_dataset)
|
||||||
generate_example_dataset,
|
|
||||||
process_data,
|
|
||||||
transform_example_dataset,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@job
|
@job
|
||||||
@@ -15,6 +11,15 @@ def data_processing_job():
|
|||||||
raw = fetch_data()
|
raw = fetch_data()
|
||||||
process_data(raw)
|
process_data(raw)
|
||||||
|
|
||||||
|
data_processing_job_daily_9am = ScheduleDefinition(
|
||||||
|
job=data_processing_job,
|
||||||
|
cron_schedule="0 9 * * *",
|
||||||
|
)
|
||||||
|
|
||||||
|
@job
|
||||||
|
def example_job():
|
||||||
|
original_df = generate_example_dataset()
|
||||||
|
preview_dataframe(original_df)
|
||||||
|
|
||||||
@job
|
@job
|
||||||
def example_dataframe_demo_job():
|
def example_dataframe_demo_job():
|
||||||
@@ -23,3 +28,5 @@ def example_dataframe_demo_job():
|
|||||||
preview_dataframe(original_df)
|
preview_dataframe(original_df)
|
||||||
transformed_df = transform_example_dataset(original_df)
|
transformed_df = transform_example_dataset(original_df)
|
||||||
preview_dataframe(transformed_df)
|
preview_dataframe(transformed_df)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,5 @@ def transform_example_dataset(df: pd.DataFrame) -> pd.DataFrame:
|
|||||||
df["price_band"] = df["total_price"].apply(
|
df["price_band"] = df["total_price"].apply(
|
||||||
lambda value: "high" if value >= 25 else "standard"
|
lambda value: "high" if value >= 25 else "standard"
|
||||||
)
|
)
|
||||||
|
|
||||||
df["total_price"] = df["quantity"] * df["unit_price"]
|
|
||||||
|
|
||||||
return df
|
return df
|
||||||
|
|||||||
@@ -3,18 +3,21 @@ from dataframe_level_anonymisation.jobs import (k_anonymity_job_s3,
|
|||||||
l_diversity_job_s3,
|
l_diversity_job_s3,
|
||||||
t_closeness_job_s3)
|
t_closeness_job_s3)
|
||||||
from template_code_location.jobs.jobs import (data_processing_job,
|
from template_code_location.jobs.jobs import (data_processing_job,
|
||||||
example_dataframe_demo_job)
|
data_processing_job_daily_9am,
|
||||||
|
example_dataframe_demo_job,
|
||||||
|
example_job)
|
||||||
from util_services.custom_json_logger import simpl_json_logger
|
from util_services.custom_json_logger import simpl_json_logger
|
||||||
from util_services.resources import s3_resource
|
from util_services.resources import s3_resource
|
||||||
|
|
||||||
defs = Definitions(
|
defs = Definitions(
|
||||||
jobs=[
|
jobs=[
|
||||||
#data_processing_job,
|
example_job,
|
||||||
example_dataframe_demo_job,
|
data_processing_job,
|
||||||
k_anonymity_job_s3,
|
k_anonymity_job_s3,
|
||||||
l_diversity_job_s3,
|
l_diversity_job_s3,
|
||||||
t_closeness_job_s3,
|
t_closeness_job_s3,
|
||||||
],
|
],
|
||||||
|
schedules=[data_processing_job_daily_9am],
|
||||||
sensors=[],
|
sensors=[],
|
||||||
resources={"s3": s3_resource.configured({"resource_name": "selfS3"})},
|
resources={"s3": s3_resource.configured({"resource_name": "selfS3"})},
|
||||||
loggers={"simpl": simpl_json_logger},
|
loggers={"simpl": simpl_json_logger},
|
||||||
|
|||||||
Reference in New Issue
Block a user