# Workflow Orchestration: Development and Deployment Guide ## 1. Goal and Scope The purpose of this document is to provide a comprehensive guide for participants to create, manage, update and delete workflows within the Simpl-Open orchestration platform. By following a *code-first approach*, developers ensure consistency, traceability, and reliability across all environments. ## 2. Local Development Development must always begin in a local environment. This allows developers to rapidly iterate, test business logic, and validate DAG (Directed Acyclic Graph) structures without impacting production data. ### 2.1 Project Layout This repository (`template-code-location`) serves as the **single consolidated code location** for all data services workflows. It imports jobs and ops from three external packages (`data-processing`, `dataframe-level-anonymisation`, and `field-level-pseudo-anonymisation`) which are installed as Git dependencies, and also provides a place for custom template jobs/ops. ```text template-code-location/ ├── src/ │ └── template_code_location/ │ ├── __init__.py │ ├── repository.py # Unified entry point (all jobs/sensors/resources) │ ├── jobs/ # Custom jobs specific to this code location │ │ ├── __init__.py │ │ └── jobs.py │ └── ops/ # Custom ops specific to this code location │ ├── __init__.py │ └── ops.py ├── tests/ # Unit & integration tests ├── Dockerfile ├── pyproject.toml # Dependencies & external package sources └── README.md ``` ### 2.2 External Dependencies (Git Packages) The heavy-lifting logic lives in separate repositories, pulled in as installable Python packages via `pyproject.toml` and `[tool.uv.sources]`: | Package | Purpose | Source | |---------|---------|--------| | `data-processing` | Data cleaning & transformation jobs | Git (branch: `develop`) | | `dataframe-level-anonymisation` | k-anonymity, l-diversity, t-closeness | Git (branch: `develop`) | | `field-level-pseudo-anonymisation` | Field-level encryption/hashing/redaction | Git (branch: `develop`) | | `util-services` | Shared resources, sensors, and logging | Git (tag: `v0.5.0`) | These packages expose their jobs and ops which are then imported and registered in `repository.py`. ### 2.3 Code Examples (Ops, Jobs, and Definitions) The orchestration logic should be modular. ### 2.3.1 Workflow creation Here is a practical example of how to construct a workflow. **1. Defining Ops (`ops/ops.py`)** Ops are the core units of computation. Keep them focused on a single task. ```python from dagster import op @op def fetch_data() -> list: """Fetches raw data from a source.""" return [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}] @op def process_data(data: list) -> dict: """Processes raw data and returns a summary.""" return {"count": len(data), "status": "success"} ``` **2. Assembling Jobs (`jobs/jobs.py`)** Jobs link ops together to form a dependency graph (workflow). ```python from dagster import job from ..ops.ops import fetch_data, process_data @job def data_processing_job(): """A simple job that fetches and processes data.""" raw = fetch_data() process_data(raw) ``` **3. Registering Definitions (`repository.py`)** This file acts as the entry point for the Simpl-Open orchestration platform to discover your code. It imports jobs from local modules as well as from external packages. ```python from dagster import Definitions from util_services.resources import s3_resource from util_services.sensors import notify_success, notify_failure, notify_canceled from util_services.custom_json_logger import simpl_json_logger # External package jobs from data_processing.jobs import remove_duplicates_job_s3, fill_missing_values_job_s3 from dataframe_level_anonymisation.jobs import k_anonymity_job_s3, l_diversity_job_s3 from field_level_pseudo_anonymisation.jobs import anonymise_pseudonymise_structured_job_s3 # Local template jobs from template_code_location.jobs.jobs import data_processing_job defs = Definitions( jobs=[data_processing_job, remove_duplicates_job_s3, ...], sensors=[notify_success, notify_failure, notify_canceled], resources={"s3": s3_resource.configured({"resource_name": "selfS3"})}, loggers={"simpl": simpl_json_logger}, ) ``` ### 2.3.2 Workflow edit Updating a workflow, means changing the job name, parameters or definition. Each modification means getting a new version of that workflow. To make the version visible, the name of the job can include a version tag (eg: `data_processing_job_ver_2`). **1. Change Jobs (`jobs/jobs.py`)** Change workflow name, parameters or definition in the `jobs.py` file. **2. Change Ops (`ops/ops.py`)** If workflow ops are updated, this must be done in `ops.py` file. **3. Change Tests (`tests/`)** If jobs or obs are updated, check out their related `tests` and update them accordingly. **4. Update Registering Definitions (`repository.py`)** If jobs names are updated, make sure to update the import and definitions. ### 2.3.3 Workflow deletion Before deleting an existing workflow, first check its status (if is needed or referenced) in the **Dagster UI**: - Navigate to the "Runs". Identify a given workflow using the "Filter" button or navigation "Newer", "Older" buttons. - Check details and status: open workflow details by clicking on the target, or just check the status value. - Check logs: open workflow logs by clicking on the uuid. Here are two practical examples how to delete a workflow: **Temporary delete workflow** **1. Clean Registering Definitions (`repository.py`)** Comment or delete workflow definition from the `repository.py`. **Permanently delete workflow** **1. Clean Jobs (`jobs/jobs.py`)** Delete workflow definition from the `jobs.py` file. **2. Clean Ops (`ops/ops.py`)** If workflow links also ops, delete their definition from the `ops.py` file. **3. Clean Tests (`tests/`)** If jobs or obs have dedicated unittest, check out the `tests/` folder and delete them. **4. Clean Scheduler** If the workflow has any scheduler, check and adjust it accordingly. ### 2.4 Best Practices & Constraints - **Separation of Concerns**: Keep orchestration logic (how ops connect) strictly separate from heavy business logic (which should ideally live in separate Python modules/classes). - **Naming Conventions**: Use snake_case for jobs and ops. Code locations should be named based on the domain they represent (e.g., inventory_sync_service). - **Dependency Management**: All dependencies must be explicitly declared in pyproject.toml or requirements.txt. - **Environment Agnosticism**: Avoid hardcoding credentials. Use environment variables to handle configuration. ## 3. Publishing to Production (Gitea) Once the local validation is complete, the code must be published to the centralized Gitea repository. 1. **Repository Hosting**: All workflows are stored in Gitea instances within the agent environment. 2. **Versioning**: Workflows are versioned using Git. Each version of a workflow must correspond to a specific Git commit. 3. **Artifact Generation**: Workflows are packaged as Docker container images. - Images must be pushed to the Gitea Integrated Container Registry. - **Tagging Policy**: Use semantic versioning or Git commit SHAs. Avoid using the latest tag in production to ensure idempotency and easy rollbacks. ## 4. Review and Approval Process To maintain high-quality standards and security, no code is deployed directly to the main branch. 1. **Feature Branching**: Developers must push their changes to a dedicated feature branch. 2. **Pull Request (PR)**: Open a Pull Request in Gitea from the feature branch to the main branch. 3. **Peer Review**: At least one developer (other than the author) must review the code. - Reviewers check for logic errors, security vulnerabilities, and adherence to the standards defined in Section 2. 4. **Approval**: Once comments are addressed and the reviewer provides an "Approve" status, the PR can be merged. ## 5. Production Deployment After the code is merged and the artifact is published, the final step is deploying to the orchestration platform. ### 5.1 Deployment Pipeline The deployment follows these automated steps: 1. **CI/CD Trigger**: A merge to the main branch triggers the CI pipeline. 2. **Image Build**: The pipeline builds the Docker image and pushes it to the Gitea Registry. 3. **Manifest Update**: The deployment configuration (e.g., Helm values or Kubernetes manifests) is updated to reference the new image tag. 4. **Platform Reload**: The Simpl-Open orchestration platform (Dagster) is notified of the change. ### 5.2 Verification To confirm a successful deployment: - **Dagster UI**: - Navigate to the "Deployment" or "Code Locations" tab. Verify that the loaded image tag matches the latest Git commit. - Navigate to the "Runs" tab. Identify deleted workflow. Click on it's target to open details. "Pipeline not found" message shows that workflow was deleted successfully. Click on it's uuid to open logs. Logs must be available after deletion. - **Health Check**: Trigger a "Test Run" of the job in the production environment using a limited data slice. - **Logs**: Monitor the initialization logs in the Dagster daemon to ensure the code location was loaded without schema or dependency errors.