update Development Guide to clarify project layout and external dependencies
This commit is contained in:
@@ -9,81 +9,100 @@ By following a *code-first approach*, developers ensure consistency, traceabilit
|
||||
Development must always begin in a local environment. This allows developers to rapidly iterate, test business logic, and validate DAG (Directed Acyclic Graph) structures without impacting production data.
|
||||
|
||||
### 2.1 Project Layout
|
||||
This repository (`template-code-location`) serves as the **single consolidated code location** for all data services workflows. It contains the jobs, ops, and configurations previously spread across `data-processing`, `dataframe-level-anonymisation`, and `field-level-pseudo-anonymisation`.
|
||||
This repository (`template-code-location`) serves as the **single consolidated code location** for all data services workflows. It imports jobs and ops from three external packages (`data-processing`, `dataframe-level-anonymisation`, and `field-level-pseudo-anonymisation`) which are installed as Git dependencies, and also provides a place for custom template jobs/ops.
|
||||
|
||||
```text
|
||||
template-code-location/
|
||||
├── src/
|
||||
│ └── template_code_location/
|
||||
│ ├── __init__.py
|
||||
│ ├── repository.py # Unified entry point (all jobs/sensors/resources)
|
||||
│ ├── data_processing/ # Data cleaning & transformation ops/jobs
|
||||
│ │ ├── config_models/
|
||||
│ │ ├── jobs.py
|
||||
│ │ └── ops.py
|
||||
│ ├── dataframe_level_anonymisation/ # k-anonymity, l-diversity, t-closeness
|
||||
│ │ ├── config_models/
|
||||
│ │ ├── jobs.py
|
||||
│ │ ├── ops.py
|
||||
│ │ └── utils.py
|
||||
│ ├── field_level_pseudo_anonymisation/ # Field-level encryption/hashing/redaction
|
||||
│ │ ├── config_models/
|
||||
│ │ ├── techniques/
|
||||
│ │ ├── jobs.py
|
||||
│ │ ├── ops.py
|
||||
│ │ ├── unstructured_ops.py
|
||||
│ │ └── utils.py
|
||||
│ ├── jobs/ # Template example jobs
|
||||
│ └── ops/ # Template example ops
|
||||
├── tests/ # All tests (migrated from source repos)
|
||||
│ ├── jobs/ # Custom jobs specific to this code location
|
||||
│ │ ├── __init__.py
|
||||
│ │ └── jobs.py
|
||||
│ └── ops/ # Custom ops specific to this code location
|
||||
│ ├── __init__.py
|
||||
│ └── ops.py
|
||||
├── tests/ # Unit & integration tests
|
||||
├── Dockerfile
|
||||
├── pyproject.toml
|
||||
├── pyproject.toml # Dependencies & external package sources
|
||||
└── README.md
|
||||
```
|
||||
|
||||
### 2.2 Code Examples (Ops, Jobs, and Definitions)
|
||||
### 2.2 External Dependencies (Git Packages)
|
||||
|
||||
The heavy-lifting logic lives in separate repositories, pulled in as installable Python packages via `pyproject.toml` and `[tool.uv.sources]`:
|
||||
|
||||
| Package | Purpose | Source |
|
||||
|---------|---------|--------|
|
||||
| `data-processing` | Data cleaning & transformation jobs | Git (branch: `develop`) |
|
||||
| `dataframe-level-anonymisation` | k-anonymity, l-diversity, t-closeness | Git (branch: `develop`) |
|
||||
| `field-level-pseudo-anonymisation` | Field-level encryption/hashing/redaction | Git (branch: `develop`) |
|
||||
| `util-services` | Shared resources, sensors, and logging | Git (tag: `v0.5.0`) |
|
||||
|
||||
These packages expose their jobs and ops which are then imported and registered in `repository.py`.
|
||||
|
||||
### 2.3 Code Examples (Ops, Jobs, and Definitions)
|
||||
The orchestration logic should be modular. Here is a practical example of how to construct a workflow.
|
||||
|
||||
**1. Defining Ops (ops.py)**
|
||||
**1. Defining Ops (`ops/ops.py`)**
|
||||
Ops are the core units of computation. Keep them focused on a single task.
|
||||
|
||||
```python
|
||||
from dagster import op
|
||||
|
||||
@op
|
||||
def fetch_raw_data() -> list:
|
||||
"""Fetches raw data from an external source."""
|
||||
def fetch_data() -> list:
|
||||
"""Fetches raw data from a source."""
|
||||
return [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}]
|
||||
|
||||
@op
|
||||
def process_data(data: list) -> dict:
|
||||
"""Transforms raw data into an aggregated format."""
|
||||
return {"processed_count": len(data), "status": "success"}
|
||||
"""Processes raw data and returns a summary."""
|
||||
return {"count": len(data), "status": "success"}
|
||||
```
|
||||
**2. Assembling Jobs (jobs.py)**
|
||||
|
||||
**2. Assembling Jobs (`jobs/jobs.py`)**
|
||||
Jobs link ops together to form a dependency graph (workflow).
|
||||
|
||||
```python
|
||||
from dagster import job
|
||||
from .ops import fetch_raw_data, process_data
|
||||
from ..ops.ops import fetch_data, process_data
|
||||
|
||||
@job
|
||||
def data_processing_job():
|
||||
"""A workflow that fetches and processes data."""
|
||||
raw_data = fetch_raw_data()
|
||||
process_data(raw_data)
|
||||
"""A simple job that fetches and processes data."""
|
||||
raw = fetch_data()
|
||||
process_data(raw)
|
||||
```
|
||||
**3. Registering Definitions (repository.py)**
|
||||
This file acts as the entry point for the Simpl-Open orchestration platform to discover your code.
|
||||
|
||||
**3. Registering Definitions (`repository.py`)**
|
||||
This file acts as the entry point for the Simpl-Open orchestration platform to discover your code. It imports jobs from local modules as well as from external packages.
|
||||
|
||||
```python
|
||||
from dagster import Definitions
|
||||
from .jobs import data_processing_job
|
||||
from util_services.resources import s3_resource
|
||||
from util_services.sensors import notify_success, notify_failure, notify_canceled
|
||||
from util_services.custom_json_logger import simpl_json_logger
|
||||
|
||||
# External package jobs
|
||||
from data_processing.jobs import remove_duplicates_job_s3, fill_missing_values_job_s3
|
||||
from dataframe_level_anonymisation.jobs import k_anonymity_job_s3, l_diversity_job_s3
|
||||
from field_level_pseudo_anonymisation.jobs import anonymise_pseudonymise_structured_job_s3
|
||||
|
||||
# Local template jobs
|
||||
from template_code_location.jobs.jobs import data_processing_job
|
||||
|
||||
# The platform will load this Definitions object
|
||||
defs = Definitions(
|
||||
jobs=[data_processing_job]
|
||||
# You can also declare schedules, sensors, and resources here
|
||||
jobs=[data_processing_job, remove_duplicates_job_s3, ...],
|
||||
sensors=[notify_success, notify_failure, notify_canceled],
|
||||
resources={"s3": s3_resource.configured({"resource_name": "selfS3"})},
|
||||
loggers={"simpl": simpl_json_logger},
|
||||
)
|
||||
```
|
||||
|
||||
### 2.3 Best Practices & Constraints
|
||||
### 2.4 Best Practices & Constraints
|
||||
|
||||
- **Separation of Concerns**: Keep orchestration logic (how ops connect) strictly separate from heavy business logic (which should ideally live in separate Python modules/classes).
|
||||
- **Naming Conventions**: Use snake_case for jobs and ops. Code locations should be named based on the domain they represent (e.g., inventory_sync_service).
|
||||
- **Dependency Management**: All dependencies must be explicitly declared in pyproject.toml or requirements.txt.
|
||||
|
||||
Reference in New Issue
Block a user