""" Test suite for data restoration (depseudonymisation) of unstructured text. ## Test Coverage Summary ### Acceptance Criteria Coverage: - AC1 (Data Restoration with Valid Key): 2 tests - AC2 (Restoration Denial - Missing Key): 1 test - AC3 (Restoration Denial - Unauthorized Access): 1 test - AC4 (Restoration Denial - Invalid Key): 1 test - Additional Coverage: 2 tests (edge cases) ### Test Pattern: - Each test uses build_op_context with .model_dump() for configuration - Tests validate dual outputs (data, metrics) - Tests verify complete restoration of original text - Tests validate security controls and error handling - Tests use descriptive names mapping to AC scenarios """ import pytest from unittest.mock import patch from cryptography.fernet import Fernet from dagster import build_op_context from src.field_level_pseudo_anonymisation.unstructured_ops import ( depseudonymize_unstructured, ) from src.field_level_pseudo_anonymisation.config_models.unstructured_config import ( DepseudonymizeUnstructuredConfig, DecryptConfig, DepseudoTechniqueConfig, ) @pytest.fixture def fernet_key() -> bytes: """Generate a valid Fernet key for encryption in tests.""" return Fernet.generate_key() @pytest.fixture def encrypted_text_data(fernet_key: bytes) -> dict: """ Create encrypted data for testing decryption. Returns a dict with: - original_text: The unencrypted text - encrypted_text: Text with PII values encrypted in {encrypt:...} format """ original_text = "My name is John Doe and my email is john.doe@example.com." fernet = Fernet(fernet_key) encrypted_name = fernet.encrypt(b"John Doe").decode() encrypted_email = fernet.encrypt(b"john.doe@example.com").decode() encrypted_text = ( f"My name is {{encrypt:{encrypted_name}}} and my email is {{encrypt:{encrypted_email}}}." ) return { "original_text": original_text, "encrypted_text": encrypted_text, } # ---------------------- AC1: Data Restoration with Valid Key -------------------------------- @patch("src.field_level_pseudo_anonymisation.unstructured_ops.create_get_encryption_key") def test_ac1_restore_encrypted_pii_entities_with_valid_key( mock_create_get_key, fernet_key: bytes, encrypted_text_data: dict ): """AC1: Restore encrypted PII entities with a valid key from secret management tool.""" # Arrange - Mock the Vault key retrieval to return the valid key mock_create_get_key.return_value = fernet_key config = DepseudonymizeUnstructuredConfig( used_function=[ DepseudoTechniqueConfig(technique=DecryptConfig(type="decrypt", key_name="test_key")) ] ) context = build_op_context(op_config=config.model_dump()) # Act - Request data restoration result_gen = depseudonymize_unstructured( context, input_text=encrypted_text_data["encrypted_text"] ) data_output = next(result_gen) metrics_output = next(result_gen) # Assert - Verify successful restoration # 1. All original values are restored exactly assert ( data_output.value == encrypted_text_data["original_text"] ), "Original text should be fully restored" # 2. Correct output structure assert data_output.output_name == "data", "Output should be named 'data'" # 3. Metrics show correct number of restored entities assert ( metrics_output.value["total_depseudo_count"] == 2 ), "Should restore 2 encrypted entities (name and email)" # 4. System retrieved key from secret management tool mock_create_get_key.assert_called_once_with("decrypt", "test_key") @patch("src.field_level_pseudo_anonymisation.unstructured_ops.create_get_encryption_key") def test_ac1_restore_multiple_pii_types_with_valid_key(mock_create_get_key, fernet_key: bytes): """AC1: Restore multiple encrypted PII entity types (name, email, phone) with a valid key.""" # Arrange - Create text with multiple PII types encrypted original_text = "Contact John Doe at john.doe@example.com or call 555-1234." fernet = Fernet(fernet_key) encrypted_name = fernet.encrypt(b"John Doe").decode() encrypted_email = fernet.encrypt(b"john.doe@example.com").decode() encrypted_phone = fernet.encrypt(b"555-1234").decode() encrypted_text = ( f"Contact {{encrypt:{encrypted_name}}} at " f"{{encrypt:{encrypted_email}}} or call {{encrypt:{encrypted_phone}}}." ) mock_create_get_key.return_value = fernet_key config = DepseudonymizeUnstructuredConfig( used_function=[ DepseudoTechniqueConfig( technique=DecryptConfig(type="decrypt", key_name="multi_pii_key") ) ] ) context = build_op_context(op_config=config.model_dump()) # Act result_gen = depseudonymize_unstructured(context, input_text=encrypted_text) data_output = next(result_gen) metrics_output = next(result_gen) # Assert assert data_output.value == original_text, "All PII types should be restored" assert ( metrics_output.value["total_depseudo_count"] == 3 ), "Should restore 3 encrypted entities (name, email, phone)" mock_create_get_key.assert_called_once_with("decrypt", "multi_pii_key") # ------------------- AC2: Restoration Denial when Key is Missing ---------------------------- @patch("src.field_level_pseudo_anonymisation.unstructured_ops.create_get_encryption_key") def test_ac2_restoration_denial_when_key_missing(mock_create_get_key, encrypted_text_data: dict): """AC2: Deny restoration when decryption key is missing from secret management tool.""" # Arrange - Mock Vault to indicate key is missing mock_create_get_key.side_effect = ValueError( "Fernet key 'non_existent_key' not found in Vault for decrypt." ) config = DepseudonymizeUnstructuredConfig( used_function=[ DepseudoTechniqueConfig( technique=DecryptConfig(type="decrypt", key_name="non_existent_key") ) ] ) context = build_op_context(op_config=config.model_dump()) # Act & Assert - Verify system fails the restoration request with pytest.raises( ValueError, match="Fernet key 'non_existent_key' not found in Vault for decrypt.", ) as exc_info: list(depseudonymize_unstructured(context, input_text=encrypted_text_data["encrypted_text"])) # Verify error message is clear and actionable assert "not found in Vault" in str( exc_info.value ), "Error message should indicate key is missing from Vault" # Verify system attempted to retrieve the key (logged attempt) mock_create_get_key.assert_called_once_with("decrypt", "non_existent_key") # ------------- AC3: Restoration Denial when Access is Unauthorized -------------------------- @patch("src.field_level_pseudo_anonymisation.unstructured_ops.create_get_encryption_key") def test_ac3_restoration_denial_when_unauthorized_access( mock_create_get_key, encrypted_text_data: dict ): """AC3: Deny restoration when participant is not authorized to access the decryption key.""" # Arrange - Mock Vault to deny access mock_create_get_key.side_effect = ValueError("Access denied to secret: unauthorized_key") config = DepseudonymizeUnstructuredConfig( used_function=[ DepseudoTechniqueConfig( technique=DecryptConfig(type="decrypt", key_name="unauthorized_key") ) ] ) context = build_op_context(op_config=config.model_dump()) # Act & Assert - Verify system denies access with pytest.raises(ValueError, match="Access denied to secret: unauthorized_key") as exc_info: list(depseudonymize_unstructured(context, input_text=encrypted_text_data["encrypted_text"])) # Verify error message clearly indicates access denial assert "Access denied" in str( exc_info.value ), "Error message should clearly indicate access was denied" # Verify the unauthorized access attempt was logged (function was called) mock_create_get_key.assert_called_once_with("decrypt", "unauthorized_key") # ------------------- AC4: Restoration Denial when Key is Invalid ---------------------------- @patch("src.field_level_pseudo_anonymisation.unstructured_ops.create_get_encryption_key") def test_ac4_restoration_denial_when_key_invalid(mock_create_get_key, encrypted_text_data: dict): """AC4: Deny restoration when decryption key does not correspond to the encrypted fields.""" # Arrange - Mock Vault to return a different (wrong) key invalid_key = Fernet.generate_key() # A different, incorrect key mock_create_get_key.return_value = invalid_key config = DepseudonymizeUnstructuredConfig( used_function=[ DepseudoTechniqueConfig(technique=DecryptConfig(type="decrypt", key_name="wrong_key")) ] ) context = build_op_context(op_config=config.model_dump()) # Act & Assert - Verify system fails the restoration with pytest.raises(ValueError, match="Invalid Fernet token") as exc_info: list(depseudonymize_unstructured(context, input_text=encrypted_text_data["encrypted_text"])) # Verify error message indicates decryption failure assert "Invalid Fernet token" in str( exc_info.value ), "Error message should indicate the key is invalid for this data" # Verify key was retrieved (system attempted decryption) mock_create_get_key.assert_called_once_with("decrypt", "wrong_key") # -------------------------------- Additional Edge Cases ---------------------------------------- def test_depseudonymize_unstructured_no_decrypt_config(): """Edge case: Text is returned unchanged when no decryption techniques are configured.""" # Arrange original_text = "This text has no {encrypt:values} to decrypt." config = DepseudonymizeUnstructuredConfig(used_function=[]) # No techniques context = build_op_context(op_config=config.model_dump()) # Act result_gen = depseudonymize_unstructured(context, input_text=original_text) result_output = next(result_gen) metrics_output = next(result_gen) # Assert assert ( result_output.value == original_text ), "Text should remain unchanged when no decryption is configured" assert ( metrics_output.value["total_depseudo_count"] == 0 ), "Should report zero decryptions performed" def test_depseudonymize_unstructured_empty_text(): """Edge case: Empty input text is returned unchanged with zero decryptions performed.""" # Arrange empty_text = "" config = DepseudonymizeUnstructuredConfig( used_function=[ DepseudoTechniqueConfig(technique=DecryptConfig(type="decrypt", key_name="test_key")) ] ) context = build_op_context(op_config=config.model_dump()) # Act with patch( "src.field_level_pseudo_anonymisation.unstructured_ops.create_get_encryption_key" ) as mock_key: mock_key.return_value = Fernet.generate_key() result_gen = depseudonymize_unstructured(context, input_text=empty_text) result_output = next(result_gen) metrics_output = next(result_gen) # Assert assert result_output.value == "", "Empty text should remain empty" assert ( metrics_output.value["total_depseudo_count"] == 0 ), "Should report zero decryptions for empty text"