diff --git a/Dockerfile b/Dockerfile index b61745b..0c997fb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,13 +1,68 @@ FROM python:3.12-slim-bookworm +# --- Install uv (pinned for reproducibility) --- +COPY --from=ghcr.io/astral-sh/uv:0.10.8 /uv /uvx /bin/ + WORKDIR /app +# Create non-root user with explicit UID/GID 1000 +RUN addgroup --gid 1000 appgroup && \ + adduser --uid 1000 --gid 1000 --disabled-password --gecos "" appuser + +# Install system dependencies: +# - git: required to fetch util-services from GitLab (tool.uv.sources) +# - build-essential / gcc / g++ / python3-dev / cmake: native extensions +# (scrubadub-spacy → spaCy, pycanon, etc.) +# - curl: optional healthcheck / runtime tooling +RUN apt-get update && apt-get upgrade -y \ + && apt-get install -y --no-install-recommends \ + build-essential=12.9 \ + cmake=3.25.1-1 \ + gcc=4:12.2.0-3 \ + g++=4:12.2.0-3 \ + python3-dev=3.11.2-1+b1 \ + git=1:2.39.5-0+deb12u3 \ + curl=7.88.1-10+deb12u14 \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* \ + && rm -rf /tmp/* \ + && rm -rf /var/tmp/* + +# Pre-own /app so appuser can write to it +RUN chown -R appuser:appgroup /app + +# Copy project metadata and source COPY pyproject.toml . -RUN pip install --no-cache-dir dagster dagster-webserver +COPY src/ ./src/ -COPY src/ src/ -RUN pip install --no-cache-dir . +# uv environment knobs: +# UV_COMPILE_BYTECODE → compile .pyc files at install time for faster cold start +# UV_LINK_MODE=copy → copy files instead of symlinks (required in Docker layers) +# UV_SYSTEM_PYTHON=1 → install into the system Python (no extra venv needed) +ENV UV_COMPILE_BYTECODE=1 +ENV UV_LINK_MODE=copy +ENV UV_SYSTEM_PYTHON=1 -EXPOSE 3000 +# Install the project and all dependencies, respecting [tool.uv.sources] +# (git source for util-services and pytorch-cpu index for torch) +# BuildKit cache mount keeps the uv package cache across builds +RUN --mount=type=cache,target=/root/.cache/uv \ + uv pip install . -CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "template-code-location.repository"] +ENV PYTHONPATH="/app/src" + +# Make /app writable for the non-root user (e.g. spaCy model downloads) +RUN chown -R 1000:1000 /app && chmod -R u+w /app + +# Provide a real home directory for appuser +RUN mkdir -p /home/appuser && chown -R 1000:1000 /home/appuser +ENV HOME=/home/appuser + +USER appuser + +# Sanity-check: fail the build early if the dagster CLI is missing +RUN dagster --version + +EXPOSE 4000 + +CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "src/template_code_location/repository.py"] diff --git a/documents/Development Guide.md b/documents/Development Guide.md index 0f140ad..6582768 100644 --- a/documents/Development Guide.md +++ b/documents/Development Guide.md @@ -9,64 +9,100 @@ By following a *code-first approach*, developers ensure consistency, traceabilit Development must always begin in a local environment. This allows developers to rapidly iterate, test business logic, and validate DAG (Directed Acyclic Graph) structures without impacting production data. ### 2.1 Project Layout -To ensure compatibility with the Simpl-Open platform, every Dagster code location must adhere to the following directory structure: +This repository (`template-code-location`) serves as the **single consolidated code location** for all data services workflows. It imports jobs and ops from three external packages (`data-processing`, `dataframe-level-anonymisation`, and `field-level-pseudo-anonymisation`) which are installed as Git dependencies, and also provides a place for custom template jobs/ops. + ```text -project-root/ -├── dagster_code_location/ -│ ├── jobs/ # Executable workflows -│ ├── ops/ # Individual functional units (business logic) -│ ├── resources/ # External connections (Object storage, APIs, etc...) -│ └── repository.py # Central entry point for the code location -├── tests/ # Unit and integration tests -├── Dockerfile # Containerization instructions -├── pyproject.toml # Dependency management (Poetry/Pip/UV) -└── README.md # Documentation +template-code-location/ +├── src/ +│ └── template_code_location/ +│ ├── __init__.py +│ ├── repository.py # Unified entry point (all jobs/sensors/resources) +│ ├── jobs/ # Custom jobs specific to this code location +│ │ ├── __init__.py +│ │ └── jobs.py +│ └── ops/ # Custom ops specific to this code location +│ ├── __init__.py +│ └── ops.py +├── tests/ # Unit & integration tests +├── Dockerfile +├── pyproject.toml # Dependencies & external package sources +└── README.md ``` -### 2.2 Code Examples (Ops, Jobs, and Definitions) +### 2.2 External Dependencies (Git Packages) + +The heavy-lifting logic lives in separate repositories, pulled in as installable Python packages via `pyproject.toml` and `[tool.uv.sources]`: + +| Package | Purpose | Source | +|---------|---------|--------| +| `data-processing` | Data cleaning & transformation jobs | Git (branch: `develop`) | +| `dataframe-level-anonymisation` | k-anonymity, l-diversity, t-closeness | Git (branch: `develop`) | +| `field-level-pseudo-anonymisation` | Field-level encryption/hashing/redaction | Git (branch: `develop`) | +| `util-services` | Shared resources, sensors, and logging | Git (tag: `v0.5.0`) | + +These packages expose their jobs and ops which are then imported and registered in `repository.py`. + +### 2.3 Code Examples (Ops, Jobs, and Definitions) The orchestration logic should be modular. Here is a practical example of how to construct a workflow. -**1. Defining Ops (ops.py)** +**1. Defining Ops (`ops/ops.py`)** Ops are the core units of computation. Keep them focused on a single task. + ```python from dagster import op @op -def fetch_raw_data() -> list: - """Fetches raw data from an external source.""" +def fetch_data() -> list: + """Fetches raw data from a source.""" return [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}] @op def process_data(data: list) -> dict: - """Transforms raw data into an aggregated format.""" - return {"processed_count": len(data), "status": "success"} + """Processes raw data and returns a summary.""" + return {"count": len(data), "status": "success"} ``` -**2. Assembling Jobs (jobs.py)** + +**2. Assembling Jobs (`jobs/jobs.py`)** Jobs link ops together to form a dependency graph (workflow). + ```python from dagster import job -from .ops import fetch_raw_data, process_data +from ..ops.ops import fetch_data, process_data @job def data_processing_job(): - """A workflow that fetches and processes data.""" - raw_data = fetch_raw_data() - process_data(raw_data) + """A simple job that fetches and processes data.""" + raw = fetch_data() + process_data(raw) ``` -**3. Registering Definitions (repository.py)** -This file acts as the entry point for the Simpl-Open orchestration platform to discover your code. + +**3. Registering Definitions (`repository.py`)** +This file acts as the entry point for the Simpl-Open orchestration platform to discover your code. It imports jobs from local modules as well as from external packages. + ```python from dagster import Definitions -from .jobs import data_processing_job +from util_services.resources import s3_resource +from util_services.sensors import notify_success, notify_failure, notify_canceled +from util_services.custom_json_logger import simpl_json_logger + +# External package jobs +from data_processing.jobs import remove_duplicates_job_s3, fill_missing_values_job_s3 +from dataframe_level_anonymisation.jobs import k_anonymity_job_s3, l_diversity_job_s3 +from field_level_pseudo_anonymisation.jobs import anonymise_pseudonymise_structured_job_s3 + +# Local template jobs +from template_code_location.jobs.jobs import data_processing_job -# The platform will load this Definitions object defs = Definitions( - jobs=[data_processing_job] - # You can also declare schedules, sensors, and resources here + jobs=[data_processing_job, remove_duplicates_job_s3, ...], + sensors=[notify_success, notify_failure, notify_canceled], + resources={"s3": s3_resource.configured({"resource_name": "selfS3"})}, + loggers={"simpl": simpl_json_logger}, ) ``` -### 2.3 Best Practices & Constraints +### 2.4 Best Practices & Constraints + - **Separation of Concerns**: Keep orchestration logic (how ops connect) strictly separate from heavy business logic (which should ideally live in separate Python modules/classes). - **Naming Conventions**: Use snake_case for jobs and ops. Code locations should be named based on the domain they represent (e.g., inventory_sync_service). - **Dependency Management**: All dependencies must be explicitly declared in pyproject.toml or requirements.txt. diff --git a/pipeline.variables.sh b/pipeline.variables.sh index 3292612..4a3f9c4 100644 --- a/pipeline.variables.sh +++ b/pipeline.variables.sh @@ -1 +1 @@ -PROJECT_VERSION_NUMBER="0.0.1" \ No newline at end of file +PROJECT_VERSION_NUMBER="0.1.0" \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ca2cdc0..f85da15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,25 +4,35 @@ build-backend = "setuptools.build_meta" [project] name = "template-code-location" -version = "0.0.1" -description = "Template code location for data processings services" +version = "0.1.0" +description = "Consolidated code location for all data services workflows" requires-python = ">=3.12" dependencies = [ + # Dagster core "dagster>=1.8.13", - "dagster-webserver>=1.8.13", - "dagster-postgres>=0.24.13", - "pandas>=3.0", - "pyarrow>=23.0", - "lxml>=6.0", - "xmltodict>=1.0", - "rdflib>=7.6", - "numpy>=2.4", - "great_expectations>=1.16", - "pandera>=0.31", - "scrapy>=2.15", - "BeautifulSoup4>=4.14", + # Util services — resolved via [tool.uv.sources] (git) + "util-services", + # Code location packages — resolved via [tool.uv.sources] (git) + "data-processing", + "dataframe-level-anonymisation", + "field-level-pseudo-anonymisation", ] +[tool.uv] +exclude-dependencies = ["transformers", "spacy-transformers"] + +[tool.uv.sources] +torch = { index = "pytorch-cpu" } +util-services = { git = "https://code.europa.eu/simpl/simpl-open/development/data-services/util-services.git", rev = "v0.5.0" } +data-processing = { git = "https://code.europa.eu/simpl/simpl-open/development/data-services/data-processing.git", branch = "develop" } +dataframe-level-anonymisation = { git = "https://code.europa.eu/simpl/simpl-open/development/data-services/dataframe-level-anonymisation.git", branch = "develop" } +field-level-pseudo-anonymisation = { git = "https://code.europa.eu/simpl/simpl-open/development/data-services/field-level-pseudo-anonymisation.git", branch = "develop" } + +[[tool.uv.index]] +name = "pytorch-cpu" +url = "https://download.pytorch.org/whl/cpu" +explicit = true + [project.optional-dependencies] dev = [ "pytest>=8.0.0", diff --git a/src/template-code-location/repository.py b/src/template-code-location/repository.py deleted file mode 100644 index 10c73e6..0000000 --- a/src/template-code-location/repository.py +++ /dev/null @@ -1,6 +0,0 @@ -from dagster import Definitions -from .jobs.jobs import data_processing_job - -defs = Definitions( - jobs=[data_processing_job], -) diff --git a/src/template-code-location/__init__.py b/src/template_code_location/__init__.py similarity index 100% rename from src/template-code-location/__init__.py rename to src/template_code_location/__init__.py diff --git a/src/template-code-location/jobs/__init__.py b/src/template_code_location/jobs/__init__.py similarity index 100% rename from src/template-code-location/jobs/__init__.py rename to src/template_code_location/jobs/__init__.py diff --git a/src/template-code-location/jobs/jobs.py b/src/template_code_location/jobs/jobs.py similarity index 100% rename from src/template-code-location/jobs/jobs.py rename to src/template_code_location/jobs/jobs.py diff --git a/src/template-code-location/ops/__init__.py b/src/template_code_location/ops/__init__.py similarity index 100% rename from src/template-code-location/ops/__init__.py rename to src/template_code_location/ops/__init__.py diff --git a/src/template-code-location/ops/ops.py b/src/template_code_location/ops/ops.py similarity index 100% rename from src/template-code-location/ops/ops.py rename to src/template_code_location/ops/ops.py diff --git a/src/template_code_location/repository.py b/src/template_code_location/repository.py new file mode 100644 index 0000000..94f3746 --- /dev/null +++ b/src/template_code_location/repository.py @@ -0,0 +1,68 @@ +from dagster import Definitions +from util_services.resources import s3_resource +from util_services.sensors import ( + notify_success, + notify_failure, + notify_canceled +) +from util_services.custom_json_logger import simpl_json_logger + +# Data processing jobs +from data_processing.jobs import ( + remove_duplicates_job_s3, + fill_missing_values_job_s3, + standardize_categorical_values_job_s3, + correct_typos_job_s3, + normalize_numeric_min_max_job_s3, + normalize_datetime_job_s3, + normalize_coordinates_job_s3, + add_global_aggregations_job_s3, + filter_dataset_job_s3, +) + +# Dataframe-level anonymisation jobs +from dataframe_level_anonymisation.jobs import ( + k_anonymity_job_s3, + l_diversity_job_s3, + t_closeness_job_s3, + read_write_semistructured_job_s3, +) + +# Field-level pseudo-anonymisation jobs +from field_level_pseudo_anonymisation.jobs import ( + anonymise_pseudonymise_structured_job_s3, + depseudonymise_structured_job_s3, + anonymise_pseudonymise_unstructured_job_s3, + depseudonymise_unstructured_job_s3, +) + +from template_code_location.jobs.jobs import data_processing_job + +defs = Definitions( + jobs=[ + data_processing_job, + # Data processing + remove_duplicates_job_s3, + fill_missing_values_job_s3, + standardize_categorical_values_job_s3, + correct_typos_job_s3, + normalize_numeric_min_max_job_s3, + normalize_datetime_job_s3, + normalize_coordinates_job_s3, + add_global_aggregations_job_s3, + filter_dataset_job_s3, + # Dataframe-level anonymisation + k_anonymity_job_s3, + l_diversity_job_s3, + t_closeness_job_s3, + read_write_semistructured_job_s3, + # Field-level pseudo-anonymisation + anonymise_pseudonymise_structured_job_s3, + depseudonymise_structured_job_s3, + anonymise_pseudonymise_unstructured_job_s3, + depseudonymise_unstructured_job_s3, + ], + sensors=[notify_success, notify_failure, notify_canceled], + resources={"s3": s3_resource.configured({"resource_name": "selfS3"})}, + loggers={"simpl": simpl_json_logger}, +)