""" Shared pytest fixtures and helpers for field-level pseudonymisation tests. This module provides: - Mock Vault client for testing without real Vault connections - Sample data fixtures - Configuration fixtures for encryption/decryption operations - Helper functions for running ops and managing test Vault storage """ import pandas as pd import pytest from dagster import build_op_context from cryptography.fernet import Fernet from hvac.exceptions import InvalidPath, Forbidden from unittest.mock import patch, MagicMock from template_code_location.field_level_pseudo_anonymisation.config_models.structured_config import ( AnonymisePseudonymizeStructuredConfig, DepseudonymizeStructuredConfig, EncryptConfig, DecryptConfig, PseudoTechniqueConfig, DepseudoTechniqueConfig, ) from template_code_location.field_level_pseudo_anonymisation.ops import ( anonymize_pseudonymize_structured, depseudonymize_structured, ) # -------------------------------- Mock Vault Storage ---------------------------------------- # In-memory Vault simulation for tests _test_vault_storage = {} _test_vault_access_control = {} # For simulating access control @pytest.fixture(autouse=True) def mock_vault_client(): """ Auto-use fixture that mocks the hvac.Client to avoid real Vault connections. Uses an in-memory dict to simulate Vault storage for tests. Includes access control simulation for AC3. """ global _test_vault_storage, _test_vault_access_control _test_vault_storage = {} # Reset storage before each test _test_vault_access_control = {} # Reset access control def mock_read_secret(path, mount_point): """Mock reading secret from Vault with access control""" full_path = f"{mount_point}/{path}" # Check access control first if full_path in _test_vault_access_control: if not _test_vault_access_control[full_path]: raise Forbidden(f"Access denied to secret: {full_path}") if full_path not in _test_vault_storage: raise InvalidPath(f"Secret not found: {full_path}") return {"data": {"data": {"value": _test_vault_storage[full_path]}}} def mock_create_or_update_secret(path, mount_point, secret): """Mock creating/updating secret in Vault""" full_path = f"{mount_point}/{path}" _test_vault_storage[full_path] = secret["value"] def mock_delete_metadata(path, mount_point): """Mock deleting secret from Vault""" full_path = f"{mount_point}/{path}" if full_path in _test_vault_storage: del _test_vault_storage[full_path] if full_path in _test_vault_access_control: del _test_vault_access_control[full_path] with patch("hvac.Client") as mock_client_class: mock_instance = MagicMock() mock_instance.secrets.kv.v2.read_secret_version.side_effect = mock_read_secret mock_instance.secrets.kv.v2.create_or_update_secret.side_effect = ( mock_create_or_update_secret ) mock_instance.secrets.kv.v2.delete_metadata_and_all_versions.side_effect = ( mock_delete_metadata ) mock_client_class.return_value = mock_instance yield mock_instance # -------------------------------- Sample Data Fixtures ---------------------------------------- @pytest.fixture def sample_df(): """ Fixture providing a sample structured dataset with PII data. Represents typical data that requires pseudonymisation and restoration. """ return pd.DataFrame( { "id": [1, 2, 3, 4, 5], "name": [ "Alice Smith", "Bob Jones", "Charlie Brown", "David Wilson", "Eva Garcia", ], "email": [ "alice@example.com", "bob@example.com", "charlie@example.com", "david@example.com", "eva@example.com", ], "ssn": [ "123-45-6789", "234-56-7890", "345-67-8901", "456-78-9012", "567-89-0123", ], "age": [25, 30, 35, 40, 45], "salary": [50000.0, 60000.0, 70000.0, 80000.0, 90000.0], "department": ["HR", "IT", "Finance", "IT", "HR"], } ) # -------------------------------- Configuration Fixtures ---------------------------------------- @pytest.fixture def encrypt_config_single_field(): """ Configuration for encrypting a single field (email). Used to create pseudonymised data for restoration tests. """ return AnonymisePseudonymizeStructuredConfig( used_function=[ PseudoTechniqueConfig( technique=EncryptConfig( type="encrypt", columns=["email"], key_name="test_restoration_key_single", ) ) ] ) @pytest.fixture def decrypt_config_single_field(): """ Configuration for decrypting a single field (email). Used to restore original values. """ return DepseudonymizeStructuredConfig( used_function=[ DepseudoTechniqueConfig( technique=DecryptConfig( type="decrypt", columns=["email"], key_name="test_restoration_key_single", ) ) ] ) @pytest.fixture def encrypt_config_multiple_fields(): """ Configuration for encrypting multiple fields (name, email, ssn). Tests restoration of multiple sensitive fields. """ return AnonymisePseudonymizeStructuredConfig( used_function=[ PseudoTechniqueConfig( technique=EncryptConfig( type="encrypt", columns=["name", "email", "ssn"], key_name="test_restoration_key_multi", ) ) ] ) @pytest.fixture def decrypt_config_multiple_fields(): """ Configuration for decrypting multiple fields (name, email, ssn). """ return DepseudonymizeStructuredConfig( used_function=[ DepseudoTechniqueConfig( technique=DecryptConfig( type="decrypt", columns=["name", "email", "ssn"], key_name="test_restoration_key_multi", ) ) ] ) @pytest.fixture def encrypt_config_partial_fields(): """ Configuration for encrypting only some fields (email, ssn). Tests partial restoration scenarios. """ return AnonymisePseudonymizeStructuredConfig( used_function=[ PseudoTechniqueConfig( technique=EncryptConfig( type="encrypt", columns=["email", "ssn"], key_name="test_restoration_key_partial", ) ) ] ) @pytest.fixture def decrypt_config_partial_fields(): """ Configuration for decrypting only some fields (email, ssn). """ return DepseudonymizeStructuredConfig( used_function=[ DepseudoTechniqueConfig( technique=DecryptConfig( type="decrypt", columns=["email", "ssn"], key_name="test_restoration_key_partial", ) ) ] ) @pytest.fixture def authorized_multi_key_scenario(): """ Fixture for testing multi-key authorization scenarios. Sets up two keys: one authorized, one denied. """ clear_vault_key("authorized_key") clear_vault_key("unauthorized_key") # Create authorized key by generating it authorized_key = Fernet.generate_key().decode() set_vault_key("authorized_key", authorized_key) # Create unauthorized key and deny access unauthorized_key = Fernet.generate_key().decode() set_vault_key("unauthorized_key", unauthorized_key) deny_vault_access("unauthorized_key") yield {"authorized": "authorized_key", "unauthorized": "unauthorized_key"} # Cleanup clear_vault_key("authorized_key") clear_vault_key("unauthorized_key") @pytest.fixture def large_dataset(): """ Fixture providing a large dataset (10,000 rows) for performance testing. Reusable across multiple performance tests. """ return pd.DataFrame( { "id": range(1, 10001), "email": [f"user{i}@example.com" for i in range(1, 10001)], "name": [f"User {i}" for i in range(1, 10001)], "ssn": [f"{i:03d}-{i:02d}-{i:04d}" for i in range(1, 10001)], "age": [20 + (i % 50) for i in range(1, 10001)], "salary": [30000.0 + (i * 10) for i in range(1, 10001)], "department": [["HR", "IT", "Finance", "Sales"][i % 4] for i in range(1, 10001)], } ) @pytest.fixture(scope="session") def vault_test_keys(): """ Session-scoped fixture to pre-generate test keys for faster test execution. Avoids repeated key generation in each test. """ keys = {f"test_key_{i}": Fernet.generate_key().decode() for i in range(10)} return keys @pytest.fixture def cleanup_test_keys(request): """ Fixture to automatically cleanup test keys after each test. Use with: @pytest.mark.usefixtures("cleanup_test_keys") """ yield # Cleanup all test keys from mock Vault test_keys = [k for k in _test_vault_storage.keys() if "test_" in k] for key in test_keys: _test_vault_storage.pop(key, None) # -------------------------------- Helper Functions ---------------------------------------- def config_to_dagster_dict(config): """ Convert Pydantic config to Dagster-compatible dictionary. For AnonymisePseudonymizeStructuredConfig (uses discriminated Union): Pydantic v2 outputs: {'technique': {'type': 'encrypt', 'columns': [...], 'key_name': '...'}} Dagster expects: {'technique': {'encrypt': {'columns': [...], 'key_name': '...'}}} For DepseudonymizeStructuredConfig (direct DecryptConfig, no Union): Pydantic v2 outputs: {'technique': {'type': 'decrypt', 'columns': [...], 'key_name': '...'}} Dagster expects: Same flat structure with 'type' field Args: config: Pydantic config instance (AnonymisePseudonymizeStructuredConfig or DepseudonymizeStructuredConfig) Returns: dict: Dagster-compatible configuration dictionary """ from template_code_location.field_level_pseudo_anonymisation.config_models.structured_config import ( AnonymisePseudonymizeStructuredConfig, ) config_dict = config.model_dump() # Only convert discriminated unions for AnonymisePseudonymizeStructuredConfig # DepseudonymizeStructuredConfig uses direct DecryptConfig (no discriminated union) if isinstance(config, AnonymisePseudonymizeStructuredConfig): if "used_function" in config_dict: for func_config in config_dict["used_function"]: if "technique" in func_config: technique = func_config["technique"] # Pydantic outputs flat dict with 'type' field for discriminated unions if isinstance(technique, dict) and "type" in technique: # Extract the type discriminator technique_type = technique["type"] # Create nested structure without the 'type' field technique_data = {k: v for k, v in technique.items() if k != "type"} # Nest under the discriminator key for Dagster func_config["technique"] = {technique_type: technique_data} return config_dict def run_encrypt_op(config, df): """ Helper function to execute the anonymize_pseudonymize_structured op. Args: config: AnonymisePseudonymizeStructuredConfig instance df: Input pandas DataFrame Returns: tuple: (result_df, metrics) - Output DataFrame and metrics dict """ context = build_op_context(op_config=config_to_dagster_dict(config)) result_df, metrics = anonymize_pseudonymize_structured(context, df=df) return result_df.value, metrics.value def run_decrypt_op(config, df): """ Helper function to execute the depseudonymize_structured op. Args: config: DepseudonymizeStructuredConfig instance df: Input pandas DataFrame Returns: tuple: (result_df, metrics) - Output DataFrame and metrics dict """ context = build_op_context(op_config=config_to_dagster_dict(config)) result_df, metrics = depseudonymize_structured(context, df=df) return result_df.value, metrics.value def clear_vault_key(key_name: str): """ Helper function to clear a key from the simulated Vault storage for test isolation. Args: key_name: Name of the key to delete from Vault """ full_path = f"secret/PseudonymKeys/{key_name}" if full_path in _test_vault_storage: del _test_vault_storage[full_path] if full_path in _test_vault_access_control: del _test_vault_access_control[full_path] def set_vault_key(key_name: str, key_value: str): """ Helper function to set a key in the simulated Vault storage. Args: key_name: Name of the key key_value: Value of the key (Fernet key as string) """ full_path = f"secret/PseudonymKeys/{key_name}" _test_vault_storage[full_path] = key_value def deny_vault_access(key_name: str): """ Helper function to deny access to a key for authorization testing (AC3). Args: key_name: Name of the key to deny access to """ full_path = f"secret/PseudonymKeys/{key_name}" _test_vault_access_control[full_path] = False def get_vault_key(key_name: str) -> bytes: """ Helper function to retrieve a key from the simulated Vault storage. Args: key_name: Name of the key to retrieve Returns: bytes: The encryption key """ full_path = f"secret/PseudonymKeys/{key_name}" if full_path not in _test_vault_storage: raise InvalidPath(f"Key not found: {key_name}") return _test_vault_storage[full_path].encode()