Files
template-code-location/tests/field_level_pseudo_anonymisation/test_decrypt_structured.py

1091 lines
40 KiB
Python

"""
Test suite for data restoration (depseudonymization) operations.
This test suite validates the data restoration feature against the following Acceptance Criteria:
## Test Coverage Summary
### Acceptance Criteria Coverage:
- AC1 (Data Restoration with Valid Key): 7 tests
- AC2 (Restoration Denial - Missing Key): 3 tests
- AC3 (Restoration Denial - Unauthorized Access): 2 tests
- AC4 (Restoration Denial - Invalid Key): 3 tests
- Additional Coverage: 3 tests
### Test Pattern:
- Each test uses build_op_context with .model_dump() for configuration
- Tests validate dual outputs (data, metrics)
- Tests verify complete restoration of original values
- Tests validate security controls and error handling
"""
import pandas as pd
import pytest
from cryptography.fernet import Fernet
from template_code_location.field_level_pseudo_anonymisation.config_models.structured_config import (
AnonymisePseudonymizeStructuredConfig,
DepseudonymizeStructuredConfig,
EncryptConfig,
DecryptConfig,
PseudoTechniqueConfig,
DepseudoTechniqueConfig,
)
# Import helper functions (fixtures are auto-discovered by pytest)
from .conftest import (
run_encrypt_op,
run_decrypt_op,
clear_vault_key,
set_vault_key,
deny_vault_access,
get_vault_key,
)
# -------------------------------- Test Markers Configuration --------------------------------
# Register custom markers
pytest.mark.slow = pytest.mark.slow
pytest.mark.security = pytest.mark.security
pytest.mark.edge_case = pytest.mark.edge_case
pytest.mark.integration = pytest.mark.integration
# ---------------------- AC1: Data Restoration with Valid Key --------------------------------
def test_ac1_restore_single_encrypted_field_with_valid_key(
sample_df, encrypt_config_single_field, decrypt_config_single_field
):
"""
AC1: Data Restoration using Secret Management Tool-Stored Decryption Key
Scenario: Restore encrypted field with a valid key
Given: A pseudonymised dataset with encrypted email field
And: A valid decryption key stored in secret management tool
And: The participant provided the field that needs to be restored (email)
And: The participant is authorized
When: The participant requests data restoration
And: Provides the correct key name
Then: The system retrieves the key from secret management tool
And: Decrypts the dataset accurately
And: All original values are restored
And: A success message is presented to the user (via successful return)
And: The result is presented to the user
"""
# Clear any existing test key
clear_vault_key("test_restoration_key_single")
# Step 1: Encrypt the data (pseudonymisation phase)
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Verify encryption occurred
assert not encrypted_df["email"].equals(sample_df["email"]), "Email field should be encrypted"
# Verify key was created in Vault
key = get_vault_key("test_restoration_key_single")
assert key is not None, "Encryption key should exist in Vault"
# Step 2: Restore the data (depseudonymisation phase)
restored_df, metrics = run_decrypt_op(decrypt_config_single_field, encrypted_df.copy())
# Verify restoration succeeded
assert restored_df is not None, "Restored DataFrame should not be None"
assert metrics is not None, "Metrics should not be None"
# Verify all original values are restored exactly
assert restored_df["email"].equals(
sample_df["email"]
), "Email field should be restored to original values"
# Verify each individual value
for idx, (original, restored) in enumerate(zip(sample_df["email"], restored_df["email"])):
assert (
original == restored
), f"Row {idx}: Original '{original}' should match restored '{restored}'"
# Verify row count preserved
assert len(restored_df) == len(sample_df), "Row count should be preserved during restoration"
# Verify non-encrypted columns remain unchanged
assert restored_df["name"].equals(
sample_df["name"]
), "Non-encrypted fields should remain unchanged"
assert restored_df["age"].equals(
sample_df["age"]
), "Non-encrypted fields should remain unchanged"
assert restored_df["department"].equals(
sample_df["department"]
), "Non-encrypted fields should remain unchanged"
def test_ac1_restore_multiple_encrypted_fields_with_valid_key(
sample_df, encrypt_config_multiple_fields, decrypt_config_multiple_fields
):
"""
AC1: Data Restoration of multiple encrypted fields with a valid key
Scenario: Restore multiple encrypted fields (name, email, ssn) with a valid key
Given: A pseudonymised dataset with multiple encrypted fields
And: A valid decryption key stored in secret management tool
And: The participant provided the fields that need to be restored
When: The participant requests data restoration
Then: All specified fields are decrypted accurately
And: All original values are restored
"""
clear_vault_key("test_restoration_key_multi")
# Encrypt multiple fields
encrypted_df, _ = run_encrypt_op(encrypt_config_multiple_fields, sample_df.copy())
# Verify all specified fields were encrypted
assert not encrypted_df["name"].equals(sample_df["name"]), "Name should be encrypted"
assert not encrypted_df["email"].equals(sample_df["email"]), "Email should be encrypted"
assert not encrypted_df["ssn"].equals(sample_df["ssn"]), "SSN should be encrypted"
# Restore all encrypted fields
restored_df, _ = run_decrypt_op(decrypt_config_multiple_fields, encrypted_df.copy())
# Verify all fields restored to original values
assert restored_df["name"].equals(
sample_df["name"]
), "Name field should be restored to original values"
assert restored_df["email"].equals(
sample_df["email"]
), "Email field should be restored to original values"
assert restored_df["ssn"].equals(
sample_df["ssn"]
), "SSN field should be restored to original values"
# Verify non-encrypted columns remain unchanged
assert restored_df["age"].equals(
sample_df["age"]
), "Non-encrypted fields should remain unchanged"
assert restored_df["salary"].equals(
sample_df["salary"]
), "Non-encrypted fields should remain unchanged"
def test_ac1_restore_partial_fields_leaves_others_encrypted(
sample_df, encrypt_config_multiple_fields
):
"""
AC1: Partial restoration - participant specifies only some fields to restore
Scenario: Restore only selected fields while leaving others encrypted
Given: A pseudonymised dataset with multiple encrypted fields (name, email, ssn)
And: The participant specifies only some fields to restore (e.g., only email)
When: The participant requests partial restoration
Then: Only the specified fields are decrypted
And: Other encrypted fields remain encrypted
"""
clear_vault_key("test_restoration_key_multi")
# Encrypt multiple fields
encrypted_df, _ = run_encrypt_op(encrypt_config_multiple_fields, sample_df.copy())
# Create config to restore only email field
partial_decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"], # Only restore email
key_name="test_restoration_key_multi",
)
)
]
)
# Restore only email field
restored_df, _ = run_decrypt_op(partial_decrypt_config, encrypted_df.copy())
# Verify email is restored
assert restored_df["email"].equals(
sample_df["email"]
), "Email field should be restored to original values"
# Verify other fields remain encrypted (different from original)
assert not restored_df["name"].equals(sample_df["name"]), "Name field should remain encrypted"
assert not restored_df["ssn"].equals(sample_df["ssn"]), "SSN field should remain encrypted"
def test_ac1_restore_preserves_data_types(sample_df):
"""
AC1: Data restoration preserves original data types for all fields
Scenario: Restore encrypted numeric and string fields
Given: A dataset with mixed data types (strings, integers, floats)
When: Fields are encrypted and then restored
Then: Original data types are preserved after restoration
"""
# Create config to encrypt mixed types
encrypt_config = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(
type="encrypt",
columns=["name", "age", "salary"],
key_name="test_restoration_types",
)
)
]
)
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["name", "age", "salary"],
key_name="test_restoration_types",
)
)
]
)
clear_vault_key("test_restoration_types")
# Encrypt and restore
encrypted_df, _ = run_encrypt_op(encrypt_config, sample_df.copy())
restored_df, _ = run_decrypt_op(decrypt_config, encrypted_df.copy())
# Verify values are restored (as strings due to encryption/decryption)
# Note: Fernet encryption/decryption converts everything to strings
# This is expected behavior - original types are preserved via string representation
assert (
restored_df["name"].tolist() == sample_df["name"].tolist()
), "String values should be restored"
assert (
restored_df["age"].tolist() == sample_df["age"].astype(str).tolist()
), "Integer values should be restored as strings"
assert (
restored_df["salary"].tolist() == sample_df["salary"].astype(str).tolist()
), "Float values should be restored as strings"
def test_ac1_restore_empty_dataframe(encrypt_config_single_field, decrypt_config_single_field):
"""
AC1: Edge case - restore an empty dataset
Scenario: Attempt to restore an empty pseudonymised dataset
Given: An empty DataFrame with correct schema
When: Restoration is attempted
Then: Operation completes successfully without errors
And: Returns an empty DataFrame
"""
clear_vault_key("test_restoration_key_single")
# Create empty DataFrame with same schema
empty_df = pd.DataFrame(columns=["id", "name", "email", "ssn", "age", "salary", "department"])
# Encrypt (should handle empty DataFrame)
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, empty_df.copy())
# Restore (should also handle empty DataFrame)
restored_df, metrics = run_decrypt_op(decrypt_config_single_field, encrypted_df.copy())
# Verify empty DataFrame returned
assert len(restored_df) == 0, "Restored DataFrame should be empty"
assert list(restored_df.columns) == list(empty_df.columns), "Column schema should be preserved"
def test_ac1_restore_with_special_characters(
encrypt_config_single_field, decrypt_config_single_field
):
"""
AC1: Data restoration with special characters and edge case values
Scenario: Restore data containing special characters, unicode, etc.
Given: A dataset with special characters in string fields
When: Data is encrypted and then restored
Then: All special characters are preserved accurately
"""
clear_vault_key("test_restoration_key_single")
# Create DataFrame with special characters
special_df = pd.DataFrame(
{
"id": [1, 2, 3, 4],
"name": ["José García", "François Müller", "李明", "O'Brien"],
"email": [
"josé@example.com",
"françois@example.com",
"li@example.cn",
"o'brien@example.ie",
],
"ssn": ["123-45-6789", "234-56-7890", "345-67-8901", "456-78-9012"],
"age": [25, 30, 35, 40],
"salary": [50000.0, 60000.0, 70000.0, 80000.0],
"department": ["HR", "IT", "Finance", "IT"],
}
)
# Encrypt and restore
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, special_df.copy())
restored_df, _ = run_decrypt_op(decrypt_config_single_field, encrypted_df.copy())
# Verify special characters preserved
assert restored_df["email"].equals(
special_df["email"]
), "Special characters should be preserved during restoration"
for idx, (original, restored) in enumerate(zip(special_df["email"], restored_df["email"])):
assert (
original == restored
), f"Row {idx}: Special characters in '{original}' should be preserved"
# ------------------- AC2: Restoration Denial when Key is Missing ----------------------------
def test_ac2_restore_fails_when_key_missing(sample_df, encrypt_config_single_field):
"""
AC2: Restoration Denial when Decryption Key is missing
Scenario: Attempt to restore encrypted fields when decryption key is missing
Given: A pseudonymised dataset
And: The decryption key is missing from Vault
And: The participant provides the correct key name
When: The participant attempts to restore the data
Then: The system fails the restoration request
And: Logs the failed key retrieval for auditing (via exception)
And: An error message is presented to the user
"""
clear_vault_key("test_restoration_key_single")
# Encrypt data first
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Delete the key from Vault to simulate missing key
clear_vault_key("test_restoration_key_single")
# Create decrypt config with missing key
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"],
key_name="test_restoration_key_single",
)
)
]
)
# Attempt restoration - should fail with clear error
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config, encrypted_df.copy())
# Verify error message is informative
error_message = str(exc_info.value)
assert (
"not found" in error_message.lower() or "decrypt" in error_message.lower()
), "Error message should indicate key not found for decrypt operation"
assert (
"test_restoration_key_single" in error_message
), "Error message should include the key name for auditing"
def test_ac2_restore_fails_with_nonexistent_key_name(sample_df, encrypt_config_single_field):
"""
AC2: Restoration fails when using a key name that never existed
Scenario: Attempt to restore with a key name that was never created
Given: A pseudonymised dataset
And: A key name that does not exist in Vault
When: The participant attempts to restore the data
Then: The system fails the restoration request with appropriate error
"""
clear_vault_key("test_restoration_key_single")
# Encrypt data with one key
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Try to decrypt with a different, non-existent key
decrypt_config_wrong_key = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt", columns=["email"], key_name="nonexistent_key_name"
)
)
]
)
# Attempt restoration - should fail
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config_wrong_key, encrypted_df.copy())
error_message = str(exc_info.value)
assert "not found" in error_message.lower(), "Error message should indicate key not found"
def test_ac2_restore_fails_when_key_corrupted(sample_df, encrypt_config_single_field):
"""
AC2: Restoration Denial when Decryption Key is corrupted
Scenario: Attempt to restore when key is corrupted in Vault
Given: A pseudonymised dataset
And: The decryption key is corrupted (invalid format)
When: The participant attempts to restore the data
Then: The system fails the restoration request
And: An appropriate error message is presented
"""
clear_vault_key("test_restoration_key_single")
# Encrypt data first
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Corrupt the key by replacing it with invalid data
set_vault_key("test_restoration_key_single", "corrupted_invalid_key_data")
# Create decrypt config
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"],
key_name="test_restoration_key_single",
)
)
]
)
# Attempt restoration - should fail due to corrupted key
with pytest.raises(Exception) as exc_info:
run_decrypt_op(decrypt_config, encrypted_df.copy())
# Should raise either ValueError or Fernet-related exception
assert "Fernet" in str(type(exc_info.value)) or "ValueError" in str(
type(exc_info.value)
), "Should raise Fernet or ValueError for corrupted key"
# ------------- AC3: Restoration Denial when Access is Unauthorized --------------------------
def test_ac3_restore_fails_when_access_unauthorized(sample_df, encrypt_config_single_field):
"""
AC3: Restoration Denial when Decryption Key access is unauthorized
Scenario: Attempt to restore encrypted fields without authorization
Given: A pseudonymised dataset
And: A decryption key in secret management tool
And: The participant is not authorized to access the key
When: The participant attempts to restore the data
Then: The system denies the participant access to the key
And: The system denies the initiation of the restoration process
And: The system logs the unauthorized access attempt (via exception)
And: An appropriate error message is presented to the user
"""
clear_vault_key("test_restoration_key_single")
# Encrypt data first
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Set access control to deny access
deny_vault_access("test_restoration_key_single")
# Create decrypt config
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"],
key_name="test_restoration_key_single",
)
)
]
)
# Attempt restoration - should fail with ValueError (wrapping Forbidden)
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config, encrypted_df.copy())
# Verify error indicates access denial
error_message = str(exc_info.value)
assert (
"access denied" in error_message.lower() or "error while reading" in error_message.lower()
), "Error message should indicate access denial or error reading key"
assert (
"test_restoration_key_single" in error_message
), "Error message should include the key name for auditing"
def test_ac3_restore_multiple_keys_with_mixed_authorization(sample_df):
"""
AC3: Restoration with mixed authorization - some keys authorized, others not
Scenario: Attempt to restore multiple fields where user has access to some keys but not others
Given: A pseudonymised dataset with multiple encrypted fields using different keys
And: The participant is authorized for some keys but not others
When: The participant attempts to restore all fields
Then: The system denies access when unauthorized key is encountered
"""
# Encrypt email with one key, ssn with another
encrypt_config_multi_keys = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(
type="encrypt", columns=["email"], key_name="authorized_key"
)
)
]
)
clear_vault_key("authorized_key")
clear_vault_key("unauthorized_key")
# Encrypt data
encrypted_df, _ = run_encrypt_op(encrypt_config_multi_keys, sample_df.copy())
# Manually encrypt another field with different key (simulating separate encryption)
encrypt_config_ssn = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(
type="encrypt", columns=["ssn"], key_name="unauthorized_key"
)
)
]
)
encrypted_df, _ = run_encrypt_op(encrypt_config_ssn, encrypted_df.copy())
# Deny access to unauthorized_key
deny_vault_access("unauthorized_key")
# Try to decrypt both fields
decrypt_config_both = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt", columns=["email"], key_name="authorized_key"
)
),
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt", columns=["ssn"], key_name="unauthorized_key"
)
),
]
)
# Should fail when trying to access unauthorized_key with ValueError (wrapping Forbidden)
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config_both, encrypted_df.copy())
# Verify error indicates access issue with unauthorized key
error_message = str(exc_info.value)
assert (
"access denied" in error_message.lower() or "error while reading" in error_message.lower()
), "Error message should indicate access denial"
assert "unauthorized_key" in error_message, "Error message should mention the unauthorized key"
# ------------------- AC4: Restoration Denial when Key is Invalid ----------------------------
def test_ac4_restore_fails_with_wrong_key(sample_df):
"""
AC4: Restoration Denial when Decryption Key is invalid
Scenario: Attempt to restore encrypted fields with a key that doesn't match the encryption key
Given: A pseudonymised dataset encrypted with key A
And: A different valid decryption key B is stored in secret management tool
And: The participant provides key B (which is not the correct key)
And: Key B does not correspond to the fields to be restored
When: The participant attempts to restore the data
Then: The system fails the restoration request
And: Logs the failed decryption attempt for auditing (via exception)
And: An error message is presented to the user
"""
# Encrypt with one key
encrypt_config_key_a = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(
type="encrypt", columns=["email"], key_name="encryption_key_a"
)
)
]
)
clear_vault_key("encryption_key_a")
clear_vault_key("encryption_key_b")
# Encrypt data with key A
encrypted_df, _ = run_encrypt_op(encrypt_config_key_a, sample_df.copy())
# Generate a different valid key B in Vault
different_key = Fernet.generate_key().decode()
set_vault_key("encryption_key_b", different_key)
# Try to decrypt with key B (wrong key)
decrypt_config_key_b = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt", columns=["email"], key_name="encryption_key_b"
)
)
]
)
# Attempt restoration - should fail with InvalidToken or ValueError
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config_key_b, encrypted_df.copy())
# Verify error message indicates decryption failure
error_message = str(exc_info.value)
assert (
"invalid" in error_message.lower() or "token" in error_message.lower()
), "Error message should indicate invalid token or decryption failure"
assert (
"encryption_key_b" in error_message
), "Error message should include the key name for auditing"
def test_ac4_restore_fails_with_key_from_different_field(sample_df):
"""
AC4: Restoration fails when using a key intended for a different field
Scenario: Attempt to restore field A using the key for field B
Given: A dataset with multiple fields encrypted with different keys
And: The participant provides the key for field B to decrypt field A
When: The participant attempts to restore field A
Then: The system fails the restoration request
"""
# Encrypt email and ssn with different keys
encrypt_config_email = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(type="encrypt", columns=["email"], key_name="email_key")
)
]
)
encrypt_config_ssn = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(type="encrypt", columns=["ssn"], key_name="ssn_key")
)
]
)
clear_vault_key("email_key")
clear_vault_key("ssn_key")
# Encrypt both fields
encrypted_df, _ = run_encrypt_op(encrypt_config_email, sample_df.copy())
encrypted_df, _ = run_encrypt_op(encrypt_config_ssn, encrypted_df.copy())
# Try to decrypt email field using ssn_key
decrypt_config_wrong_field = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"], # Trying to decrypt email
key_name="ssn_key", # But using ssn's key
)
)
]
)
# Should fail with InvalidToken
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config_wrong_field, encrypted_df.copy())
error_message = str(exc_info.value)
assert (
"invalid" in error_message.lower() or "token" in error_message.lower()
), "Error message should indicate invalid token"
def test_ac4_restore_fails_with_tampered_encrypted_data(sample_df, encrypt_config_single_field):
"""
AC4: Restoration fails when encrypted data has been tampered with
Scenario: Attempt to restore encrypted data that has been modified
Given: A pseudonymised dataset
And: Some encrypted values have been tampered with
And: The correct decryption key is provided
When: The participant attempts to restore the data
Then: The system fails the restoration for tampered values
And: An appropriate error message is presented
"""
clear_vault_key("test_restoration_key_single")
# Encrypt data
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Tamper with encrypted data (modify one encrypted value)
encrypted_df.loc[0, "email"] = "tampered_invalid_encrypted_data"
# Create decrypt config
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"],
key_name="test_restoration_key_single",
)
)
]
)
# Attempt restoration - should fail on tampered data
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config, encrypted_df.copy())
error_message = str(exc_info.value)
assert (
"invalid" in error_message.lower() or "token" in error_message.lower()
), "Error message should indicate invalid token due to tampering"
# ---------------- Additional Edge Cases and Integration Tests -------------------------------
def test_integration_full_cycle_encrypt_decrypt_multiple_operations(sample_df):
"""
Integration test: Full cycle of multiple encrypt/decrypt operations
Scenario: Complex workflow with multiple encryption and restoration operations
Given: A dataset
When: Multiple fields are encrypted at different times
And: Fields are restored in different orders
Then: All operations complete successfully
And: Final restored data matches original
"""
# Phase 1: Encrypt email
encrypt_config_1 = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(type="encrypt", columns=["email"], key_name="key_1")
)
]
)
clear_vault_key("key_1")
encrypted_df_1, _ = run_encrypt_op(encrypt_config_1, sample_df.copy())
# Phase 2: Encrypt name and ssn
encrypt_config_2 = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(type="encrypt", columns=["name", "ssn"], key_name="key_2")
)
]
)
clear_vault_key("key_2")
encrypted_df_2, _ = run_encrypt_op(encrypt_config_2, encrypted_df_1.copy())
# Phase 3: Restore email first
decrypt_config_1 = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(type="decrypt", columns=["email"], key_name="key_1")
)
]
)
restored_df_1, _ = run_decrypt_op(decrypt_config_1, encrypted_df_2.copy())
assert restored_df_1["email"].equals(sample_df["email"]), "Email should be restored"
# Phase 4: Restore name and ssn
decrypt_config_2 = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(type="decrypt", columns=["name", "ssn"], key_name="key_2")
)
]
)
restored_df_2, _ = run_decrypt_op(decrypt_config_2, restored_df_1.copy())
# Verify all fields restored
assert restored_df_2["email"].equals(sample_df["email"]), "Email should remain restored"
assert restored_df_2["name"].equals(sample_df["name"]), "Name should be restored"
assert restored_df_2["ssn"].equals(sample_df["ssn"]), "SSN should be restored"
def test_restore_with_null_values(encrypt_config_single_field, decrypt_config_single_field):
"""
Edge case: Restoration of dataset with null/NaN values
Scenario: Dataset contains null values in encrypted fields
Given: A dataset with null values in fields to be encrypted
When: Data is encrypted and then restored
Then: Null values are handled appropriately
"""
clear_vault_key("test_restoration_key_single")
# Create DataFrame with null values
df_with_nulls = pd.DataFrame(
{
"id": [1, 2, 3, 4],
"name": ["Alice", "Bob", None, "David"],
"email": [
"alice@example.com",
None,
"charlie@example.com",
"david@example.com",
],
"ssn": ["123-45-6789", "234-56-7890", "345-67-8901", None],
"age": [25, 30, 35, 40],
"salary": [50000.0, 60000.0, 70000.0, 80000.0],
"department": ["HR", "IT", "Finance", "IT"],
}
)
# Note: Encryption of NaN/None values will convert them to string "nan" or "None"
# This is expected behavior - Fernet encryption requires string input
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, df_with_nulls.copy())
restored_df, _ = run_decrypt_op(decrypt_config_single_field, encrypted_df.copy())
# Verify non-null values are restored correctly
assert restored_df.loc[0, "email"] == "alice@example.com"
assert restored_df.loc[2, "email"] == "charlie@example.com"
assert restored_df.loc[3, "email"] == "david@example.com"
def test_restore_large_dataset_performance():
"""
Performance test: Restoration of large dataset
Scenario: Restore a large dataset with many rows
Given: A large dataset with 10,000 rows
When: Data is encrypted and then restored
Then: Operation completes without errors or timeout
And: All values are restored correctly
"""
# Create large dataset
large_df = pd.DataFrame(
{
"id": range(1, 10001),
"email": [f"user{i}@example.com" for i in range(1, 10001)],
"name": [f"User {i}" for i in range(1, 10001)],
"ssn": [f"{i:03d}-{i:02d}-{i:04d}" for i in range(1, 10001)],
"age": [20 + (i % 50) for i in range(1, 10001)],
"salary": [30000 + (i * 10) for i in range(1, 10001)],
"department": [["HR", "IT", "Finance", "Sales"][i % 4] for i in range(1, 10001)],
}
)
encrypt_config = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(
type="encrypt", columns=["email"], key_name="test_large_dataset"
)
)
]
)
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt", columns=["email"], key_name="test_large_dataset"
)
)
]
)
clear_vault_key("test_large_dataset")
# Encrypt and restore
encrypted_df, _ = run_encrypt_op(encrypt_config, large_df.copy())
restored_df, _ = run_decrypt_op(decrypt_config, encrypted_df.copy())
# Verify sample of values
assert len(restored_df) == 10000, "Should restore all 10,000 rows"
assert restored_df["email"].equals(large_df["email"]), "All emails should be restored"
# Spot check specific values
assert restored_df.loc[0, "email"] == "user1@example.com"
assert restored_df.loc[5000, "email"] == "user5001@example.com"
assert restored_df.loc[9999, "email"] == "user10000@example.com"
@pytest.mark.edge_case
@pytest.mark.security
def test_restore_after_key_rotation(sample_df, encrypt_config_single_field):
"""
AC4: Restoration fails after key rotation (key changed in Vault)
Scenario: Key is rotated in Vault after encryption
Given: Data encrypted with key version 1
And: Key is rotated to version 2 in Vault
When: Participant attempts to restore using new key version
Then: Restoration fails with clear error message
"""
clear_vault_key("test_restoration_key_single")
# Encrypt with original key
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Rotate key (replace with new key)
new_key = Fernet.generate_key().decode()
set_vault_key("test_restoration_key_single", new_key)
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"],
key_name="test_restoration_key_single",
)
)
]
)
# Should fail - key mismatch
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config, encrypted_df.copy())
assert (
"invalid" in str(exc_info.value).lower() or "decrypt" in str(exc_info.value).lower()
), "Should indicate invalid token due to key rotation"
@pytest.mark.edge_case
def test_restore_partially_encrypted_column(sample_df, encrypt_config_single_field):
"""
Edge case: Attempt to restore column where only some rows are encrypted
Scenario: Column has mixed encrypted/plaintext values (data corruption scenario)
"""
clear_vault_key("test_restoration_key_single")
# Encrypt data
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Corrupt by replacing some encrypted values with plaintext
encrypted_df.loc[0, "email"] = "plaintext@example.com"
encrypted_df.loc[2, "email"] = "another_plaintext@example.com"
decrypt_config = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(
type="decrypt",
columns=["email"],
key_name="test_restoration_key_single",
)
)
]
)
# Should fail on plaintext values
with pytest.raises(ValueError) as exc_info:
run_decrypt_op(decrypt_config, encrypted_df.copy())
assert (
"invalid" in str(exc_info.value).lower() or "decrypt" in str(exc_info.value).lower()
), "Should indicate invalid token for plaintext values"
@pytest.mark.edge_case
def test_restore_with_missing_column_in_encrypted_data(
sample_df, encrypt_config_single_field, decrypt_config_single_field
):
"""
AC2: Restoration fails when specified column doesn't exist in encrypted dataset
"""
clear_vault_key("test_restoration_key_single")
# First encrypt the sample data to create the key
encrypted_df, _ = run_encrypt_op(encrypt_config_single_field, sample_df.copy())
# Create encrypted DataFrame missing the 'email' column
incomplete_df = pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
# Missing 'email' column that decrypt config expects
"age": [25, 30, 35],
"salary": [50000.0, 60000.0, 70000.0],
"department": ["HR", "IT", "Finance"],
}
)
with pytest.raises((ValueError, KeyError)) as exc_info:
run_decrypt_op(decrypt_config_single_field, incomplete_df)
error_msg = str(exc_info.value)
assert (
"email" in error_msg or "not present" in error_msg or "not found" in error_msg
), f"Error should indicate missing column, got: {error_msg}"
@pytest.mark.integration
def test_restore_with_multiple_encryption_keys(sample_df):
"""
Integration test: Restore data encrypted with multiple different keys
Scenario: Different fields encrypted with different keys
Given: name encrypted with key_a, email encrypted with key_b
When: Participant provides both keys for restoration
Then: Both fields are restored correctly
"""
clear_vault_key("key_a")
clear_vault_key("key_b")
# Encrypt name with key_a
encrypt_config_name = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(type="encrypt", columns=["name"], key_name="key_a")
)
]
)
# Encrypt email with key_b
encrypt_config_email = AnonymisePseudonymizeStructuredConfig(
used_function=[
PseudoTechniqueConfig(
technique=EncryptConfig(type="encrypt", columns=["email"], key_name="key_b")
)
]
)
# Encrypt both fields
df_encrypted = sample_df.copy()
df_encrypted, _ = run_encrypt_op(encrypt_config_name, df_encrypted)
df_encrypted, _ = run_encrypt_op(encrypt_config_email, df_encrypted)
# Decrypt name with key_a
decrypt_config_name = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(type="decrypt", columns=["name"], key_name="key_a")
)
]
)
# Decrypt email with key_b
decrypt_config_email = DepseudonymizeStructuredConfig(
used_function=[
DepseudoTechniqueConfig(
technique=DecryptConfig(type="decrypt", columns=["email"], key_name="key_b")
)
]
)
# Restore both fields
df_restored = df_encrypted.copy()
df_restored, _ = run_decrypt_op(decrypt_config_name, df_restored)
df_restored, _ = run_decrypt_op(decrypt_config_email, df_restored)
# Verify both fields restored
assert df_restored["name"].equals(sample_df["name"]), "Name field should be restored with key_a"
assert df_restored["email"].equals(
sample_df["email"]
), "Email field should be restored with key_b"