first commit

This commit is contained in:
Matteo Basile
2026-06-04 17:00:42 +02:00
commit 0173ceecb3
34 changed files with 909 additions and 0 deletions

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Test package for Simpl-Open credential delivery."""

10
tests/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
"""Pytest configuration for Simpl-Open credential delivery tests."""
import pytest
from dagster import build_op_context
@pytest.fixture
def op_context():
"""Fixture for creating a basic op context."""
return build_op_context()

View File

@@ -0,0 +1,21 @@
"""Tests for workflow config mapping."""
from simpl_open_credential_delivery.config import (
simpl_open_gitea_workflow_config_mapping_fn,
)
def test_config_mapping_routes_inputs_to_ops():
cfg = simpl_open_gitea_workflow_config_mapping_fn(
{
"repository_identifier": "simpl-open/platform-automation",
"consumer_email": "consumer@example.local",
"access_level": "read-only",
"token_name_prefix": "simpl-open",
"token_expiration_days": 30,
}
)
assert cfg["ops"]["generate_gitea_credentials"]["config"]["repository_identifier"] == "simpl-open/platform-automation"
assert cfg["ops"]["generate_gitea_credentials"]["config"]["consumer_email"] == "consumer@example.local"
assert cfg["ops"]["send_credentials_email"]["config"] == {}

View File

@@ -0,0 +1,70 @@
"""Tests for Gitea credential generation."""
import pytest
from dagster import build_op_context
from simpl_open_credential_delivery.ops.generate_gitea_credentials import (
generate_gitea_credentials,
)
class FakeResponse:
def __init__(self, status_code, payload=None, text="OK"):
self.status_code = status_code
self._payload = payload or {}
self.text = text
self.content = b"{}" if payload is not None else b""
def json(self):
return self._payload
def test_generate_gitea_credentials_success(monkeypatch, op_context):
responses = [
FakeResponse(201),
FakeResponse(201, {"sha1": "token-123"}),
FakeResponse(204),
]
def fake_request(*args, **kwargs):
return responses.pop(0)
monkeypatch.setenv("GITEA_BASE_URL", "https://gitea.example.local")
monkeypatch.setenv("GITEA_ADMIN_TOKEN", "admin-token")
monkeypatch.setenv("GITEA_DEFAULT_OWNER", "simpl-open")
monkeypatch.setattr("requests.request", fake_request)
op_context = build_op_context(
op_config={
"repository_identifier": "platform-automation",
"consumer_email": "consumer@example.local",
"access_level": "read-only",
"token_name_prefix": "simpl-open",
"token_expiration_days": 30,
}
)
result = generate_gitea_credentials(op_context)
assert result["username"] == "consumer"
assert result["access_token"] == "token-123"
assert result["repository_access_url"] == "https://gitea.example.local/simpl-open/platform-automation"
def test_generate_gitea_credentials_requires_gitea_base_url(monkeypatch, op_context):
monkeypatch.delenv("GITEA_BASE_URL", raising=False)
monkeypatch.setenv("GITEA_ADMIN_TOKEN", "admin-token")
op_context = build_op_context(
op_config={
"repository_identifier": "simpl-open/platform-automation",
"consumer_email": "consumer@example.local",
"access_level": "read-only",
"token_name_prefix": "simpl-open",
"token_expiration_days": 30,
}
)
with pytest.raises(ValueError, match="GITEA_BASE_URL"):
generate_gitea_credentials(op_context)

View File

@@ -0,0 +1,74 @@
"""Tests for email credential delivery."""
import pytest
from dagster import build_op_context
from simpl_open_credential_delivery.ops.send_credentials_email import (
send_credentials_email,
)
from simpl_open_credential_delivery.services.email_service import (
EmailDeliveryService,
)
class FakeSMTP:
def __init__(self, host, port, timeout=None):
self.host = host
self.port = port
self.timeout = timeout
self.started_tls = False
self.logged_in = False
self.sent_messages = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self):
self.started_tls = True
def login(self, username, password):
self.logged_in = True
def send_message(self, message):
self.sent_messages.append(message)
def test_send_credentials_email_success(monkeypatch, op_context):
fake_smtp = FakeSMTP("smtp.example.local", 587)
monkeypatch.setenv("SMTP_HOST", "smtp.example.local")
monkeypatch.setenv("SMTP_PORT", "587")
monkeypatch.setenv("SMTP_FROM_ADDRESS", "no-reply@example.local")
monkeypatch.setenv("SMTP_USE_TLS", "true")
monkeypatch.setenv("EMAIL_SUBJECT_TEMPLATE", "Subject {repository_identifier}")
monkeypatch.setenv("EMAIL_BODY_TEMPLATE", "Hello {consumer_email} {access_token}")
monkeypatch.setattr("smtplib.SMTP", lambda host, port, timeout=None: fake_smtp)
result = send_credentials_email(
op_context,
{
"repository_identifier": "simpl-open/platform-automation",
"consumer_email": "consumer@example.local",
"access_level": "read-only",
"username": "application-consumer",
"access_token": "token-123",
"repository_access_url": "https://gitea.example.local/simpl-open/platform-automation",
},
)
assert result["email_delivery_status"] == "SENT"
assert result["email_message_id"] == ""
assert fake_smtp.started_tls is True
assert fake_smtp.sent_messages
def test_email_delivery_service_requires_sender(monkeypatch):
monkeypatch.setenv("SMTP_HOST", "smtp.example.local")
monkeypatch.setenv("SMTP_PORT", "587")
monkeypatch.delenv("SMTP_FROM_ADDRESS", raising=False)
with pytest.raises(Exception, match="SMTP_FROM_ADDRESS"):
EmailDeliveryService.from_environment()