diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b61745b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-slim-bookworm + +WORKDIR /app + +COPY pyproject.toml . +RUN pip install --no-cache-dir dagster dagster-webserver + +COPY src/ src/ +RUN pip install --no-cache-dir . + +EXPOSE 3000 + +CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "template-code-location.repository"] diff --git a/documents/Development Guide.md b/documents/Development Guide.md new file mode 100644 index 0000000..0f140ad --- /dev/null +++ b/documents/Development Guide.md @@ -0,0 +1,115 @@ +# Workflow Orchestration: Development and Deployment Guide + +## 1. Goal and Scope + +The purpose of this document is to provide a comprehensive guide for participants to create, manage, and update workflows within the Simpl-Open orchestration platform. +By following a *code-first approach*, developers ensure consistency, traceability, and reliability across all environments. + +## 2. Local Development +Development must always begin in a local environment. This allows developers to rapidly iterate, test business logic, and validate DAG (Directed Acyclic Graph) structures without impacting production data. + +### 2.1 Project Layout +To ensure compatibility with the Simpl-Open platform, every Dagster code location must adhere to the following directory structure: +```text +project-root/ +├── dagster_code_location/ +│ ├── jobs/ # Executable workflows +│ ├── ops/ # Individual functional units (business logic) +│ ├── resources/ # External connections (Object storage, APIs, etc...) +│ └── repository.py # Central entry point for the code location +├── tests/ # Unit and integration tests +├── Dockerfile # Containerization instructions +├── pyproject.toml # Dependency management (Poetry/Pip/UV) +└── README.md # Documentation +``` + +### 2.2 Code Examples (Ops, Jobs, and Definitions) +The orchestration logic should be modular. Here is a practical example of how to construct a workflow. + +**1. Defining Ops (ops.py)** +Ops are the core units of computation. Keep them focused on a single task. +```python +from dagster import op + +@op +def fetch_raw_data() -> list: + """Fetches raw data from an external source.""" + return [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}] + +@op +def process_data(data: list) -> dict: + """Transforms raw data into an aggregated format.""" + return {"processed_count": len(data), "status": "success"} +``` +**2. Assembling Jobs (jobs.py)** +Jobs link ops together to form a dependency graph (workflow). +```python +from dagster import job +from .ops import fetch_raw_data, process_data + +@job +def data_processing_job(): + """A workflow that fetches and processes data.""" + raw_data = fetch_raw_data() + process_data(raw_data) +``` +**3. Registering Definitions (repository.py)** +This file acts as the entry point for the Simpl-Open orchestration platform to discover your code. +```python +from dagster import Definitions +from .jobs import data_processing_job + +# The platform will load this Definitions object +defs = Definitions( + jobs=[data_processing_job] + # You can also declare schedules, sensors, and resources here +) +``` + +### 2.3 Best Practices & Constraints +- **Separation of Concerns**: Keep orchestration logic (how ops connect) strictly separate from heavy business logic (which should ideally live in separate Python modules/classes). +- **Naming Conventions**: Use snake_case for jobs and ops. Code locations should be named based on the domain they represent (e.g., inventory_sync_service). +- **Dependency Management**: All dependencies must be explicitly declared in pyproject.toml or requirements.txt. +- **Environment Agnosticism**: Avoid hardcoding credentials. Use environment variables to handle configuration. + +## 3. Publishing to Production (Gitea) + +Once the local validation is complete, the code must be published to the centralized Gitea repository. + +1. **Repository Hosting**: All workflows are stored in Gitea instances within the agent environment. +2. **Versioning**: Workflows are versioned using Git. Each version of a workflow must correspond to a specific Git commit. +3. **Artifact Generation**: Workflows are packaged as Docker container images. + - Images must be pushed to the Gitea Integrated Container Registry. + - **Tagging Policy**: Use semantic versioning or Git commit SHAs. Avoid using the latest tag in production to ensure idempotency and easy rollbacks. + +## 4. Review and Approval Process + +To maintain high-quality standards and security, no code is deployed directly to the main branch. + +1. **Feature Branching**: Developers must push their changes to a dedicated feature branch. +2. **Pull Request (PR)**: Open a Pull Request in Gitea from the feature branch to the main branch. +3. **Peer Review**: At least one developer (other than the author) must review the code. + - Reviewers check for logic errors, security vulnerabilities, and adherence to the standards defined in Section 2. +4. **Approval**: Once comments are addressed and the reviewer provides an "Approve" status, the PR can be merged. + +## 5. Production Deployment + +After the code is merged and the artifact is published, the final step is deploying to the orchestration platform. + +### 5.1 Deployment Pipeline + +The deployment follows these automated steps: + +1. **CI/CD Trigger**: A merge to the main branch triggers the CI pipeline. +2. **Image Build**: The pipeline builds the Docker image and pushes it to the Gitea Registry. +3. **Manifest Update**: The deployment configuration (e.g., Helm values or Kubernetes manifests) is updated to reference the new image tag. +4. **Platform Reload**: The Simpl-Open orchestration platform (Dagster) is notified of the change. + +### 5.2 Verification + +To confirm a successful deployment: + +- **Dagster UI**: Navigate to the "Deployment" or "Code Locations" tab. Verify that the loaded image tag matches the latest Git commit. +- **Health Check**: Trigger a "Test Run" of the job in the production environment using a limited data slice. +- **Logs**: Monitor the initialization logs in the Dagster daemon to ensure the code location was loaded without schema or dependency errors. + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5f31862 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,33 @@ +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "template-code-location" +version = "0.0.1" +requires-python = ">=3.12" +dependencies = [ + "dagster>=1.8.13", + "dagster-webserver>=1.8.13", + "dagster-postgres>=0.24.13", + "pandas>=3.0", + "pyarrow>=23.0", + "lxml>=6.0", + "xmltodict>=1.0", + "rdflib>=7.6", + "numpy>=2.4", + "great_expectations>=1.16", + "pandera>=0.31", + "scrapy>=2.15", + "BeautifulSoup4>=4.14", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-cov>=7.0.0", + "pytest-mock>=3.0.0" +] + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/template-code-location/__init__.py b/src/template-code-location/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/template-code-location/jobs/__init__.py b/src/template-code-location/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/template-code-location/jobs/jobs.py b/src/template-code-location/jobs/jobs.py new file mode 100644 index 0000000..d194c3e --- /dev/null +++ b/src/template-code-location/jobs/jobs.py @@ -0,0 +1,9 @@ +from dagster import job +from ..ops.ops import fetch_data, process_data + + +@job +def data_processing_job(): + """A simple job that fetches and processes data.""" + raw = fetch_data() + process_data(raw) diff --git a/src/template-code-location/ops/__init__.py b/src/template-code-location/ops/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/template-code-location/ops/ops.py b/src/template-code-location/ops/ops.py new file mode 100644 index 0000000..3d1a5e4 --- /dev/null +++ b/src/template-code-location/ops/ops.py @@ -0,0 +1,13 @@ +from dagster import op + + +@op +def fetch_data() -> list: + """Fetches raw data from a source.""" + return [{"id": 1, "value": "A"}, {"id": 2, "value": "B"}] + + +@op +def process_data(data: list) -> dict: + """Processes raw data and returns a summary.""" + return {"count": len(data), "status": "success"} diff --git a/src/template-code-location/repository.py b/src/template-code-location/repository.py new file mode 100644 index 0000000..10c73e6 --- /dev/null +++ b/src/template-code-location/repository.py @@ -0,0 +1,6 @@ +from dagster import Definitions +from .jobs.jobs import data_processing_job + +defs = Definitions( + jobs=[data_processing_job], +)