1120 lines
41 KiB
Python
1120 lines
41 KiB
Python
"""
|
|
Test suite for field-level pseudonymisation operations (encrypt technique).
|
|
|
|
This test suite covers the encryption pseudonymisation technique for structured dataframes,
|
|
validating the following Acceptance Criteria:
|
|
|
|
## Test Coverage Summary
|
|
|
|
### Acceptance Criteria Coverage:
|
|
- AC1 (Supported Technique Applied Correctly): 7 tests
|
|
- AC2 (Invalid Execution Handling): 7 tests
|
|
- AC3 (DataFrame Compliance): 6 tests
|
|
- AC4 (Audit Logging - Success): 2 tests
|
|
- AC5 (Audit Logging - Failure): 3 tests
|
|
- Additional Coverage: 7 tests
|
|
|
|
### Test Pattern:
|
|
- Each test uses build_op_context with config_to_dagster_dict for configuration
|
|
- Tests validate dual outputs (data, metrics)
|
|
- Vault access is mocked for isolation
|
|
|
|
"""
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
from dagster import build_op_context
|
|
from cryptography.fernet import Fernet
|
|
from hvac.exceptions import InvalidPath
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from template_code_location.field_level_pseudo_anonymisation.config_models.structured_config import (
|
|
AnonymisePseudonymizeStructuredConfig,
|
|
EncryptConfig,
|
|
HashConfig,
|
|
PseudoTechniqueConfig,
|
|
)
|
|
from template_code_location.field_level_pseudo_anonymisation.ops import anonymize_pseudonymize_structured
|
|
|
|
# Import helper functions (fixtures are auto-discovered by pytest)
|
|
from .conftest import (
|
|
run_encrypt_op,
|
|
clear_vault_key,
|
|
get_vault_key,
|
|
config_to_dagster_dict,
|
|
)
|
|
|
|
|
|
# -------------------------------- Test Markers Configuration --------------------------------
|
|
|
|
# Register custom markers
|
|
pytest.mark.slow = pytest.mark.slow
|
|
pytest.mark.security = pytest.mark.security
|
|
pytest.mark.edge_case = pytest.mark.edge_case
|
|
|
|
|
|
# -------------------------------- Test-Specific Fixtures ----------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def encrypt_single_column_config():
|
|
"""
|
|
Configuration for encrypting a single column (email).
|
|
Tests basic encryption functionality.
|
|
"""
|
|
return AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt", columns=["email"], key_name="test_email_key"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def encrypt_multiple_columns_config():
|
|
"""
|
|
Configuration for encrypting multiple columns (name, email).
|
|
Tests encryption across multiple fields.
|
|
"""
|
|
return AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt", columns=["name", "email"], key_name="test_multi_key"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def encrypt_mixed_types_config():
|
|
"""
|
|
Configuration for encrypting columns with different data types.
|
|
Tests that encryption handles type conversion (int, float -> string).
|
|
"""
|
|
return AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt",
|
|
columns=["id", "age", "salary"],
|
|
key_name="test_numeric_key",
|
|
)
|
|
)
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def encrypt_with_unchanged_columns_config():
|
|
"""
|
|
Configuration that encrypts some columns while leaving others unchanged.
|
|
Tests AC3 requirement for unchanged column preservation.
|
|
"""
|
|
return AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt", columns=["email"], key_name="test_partial_key"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
|
|
|
|
# -------------------------------- Test-Specific Fixtures ----------------------------------------
|
|
|
|
|
|
def test_encrypt_single_column_applied_correctly(sample_df, encrypt_single_column_config):
|
|
"""
|
|
AC1: Tests that encryption is applied correctly to a single column.
|
|
|
|
Scenario: The system applies encryption to the 'email' field
|
|
Given: A structured dataset with an email column
|
|
And: A valid encryption configuration for the email field
|
|
When: The participant triggers the execution
|
|
Then: The email field must be transformed with Fernet encryption
|
|
And: The encrypted values must be different from the original values
|
|
And: The encrypted values must be valid Fernet tokens (decodable)
|
|
"""
|
|
# Clear any existing test key
|
|
clear_vault_key("test_email_key")
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
|
|
# Verify output structure
|
|
assert result_df is not None, "Result DataFrame should not be None"
|
|
assert metrics is not None, "Metrics should not be None"
|
|
|
|
# Verify email column is encrypted (values changed)
|
|
assert not result_df["email"].equals(
|
|
sample_df["email"]
|
|
), "Email column should be encrypted (values should change)"
|
|
|
|
# Verify all encrypted values are different from originals
|
|
for orig, enc in zip(sample_df["email"], result_df["email"]):
|
|
assert orig != enc, f"Original value '{orig}' should be encrypted"
|
|
|
|
# Verify encrypted values are valid Fernet tokens (can be decrypted)
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
for enc_value in result_df["email"]:
|
|
decrypted = f.decrypt(enc_value.encode()).decode()
|
|
assert (
|
|
decrypted in sample_df["email"].values
|
|
), f"Decrypted value '{decrypted}' should match an original email"
|
|
|
|
# Verify row count is preserved
|
|
assert len(result_df) == len(sample_df), "Row count should be preserved"
|
|
|
|
|
|
def test_encrypt_multiple_columns_applied_correctly(sample_df, encrypt_multiple_columns_config):
|
|
"""
|
|
AC1: Tests that encryption is applied correctly to multiple columns.
|
|
|
|
Scenario: The system applies encryption to multiple fields (name, email)
|
|
Given: A structured dataset with name and email columns
|
|
And: A valid encryption configuration for both fields
|
|
When: The participant triggers the execution
|
|
Then: Both fields must be transformed with Fernet encryption
|
|
And: Each field uses the same encryption key (as specified)
|
|
"""
|
|
clear_vault_key("test_multi_key")
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_multiple_columns_config, sample_df.copy())
|
|
|
|
# Verify both columns are encrypted
|
|
assert not result_df["name"].equals(sample_df["name"]), "Name column should be encrypted"
|
|
assert not result_df["email"].equals(sample_df["email"]), "Email column should be encrypted"
|
|
|
|
# Verify all values are encrypted
|
|
key = get_vault_key("test_multi_key")
|
|
f = Fernet(key)
|
|
|
|
for enc_name in result_df["name"]:
|
|
decrypted = f.decrypt(enc_name.encode()).decode()
|
|
assert decrypted in sample_df["name"].values
|
|
|
|
for enc_email in result_df["email"]:
|
|
decrypted = f.decrypt(enc_email.encode()).decode()
|
|
assert decrypted in sample_df["email"].values
|
|
|
|
|
|
def test_encrypt_numeric_columns_applied_correctly(sample_df, encrypt_mixed_types_config):
|
|
"""
|
|
AC1: Tests that encryption handles numeric data types correctly.
|
|
|
|
Scenario: The system applies encryption to numeric fields (id, age, salary)
|
|
Given: A structured dataset with integer and float columns
|
|
And: A valid encryption configuration for numeric fields
|
|
When: The participant triggers the execution
|
|
Then: Numeric values must be converted to strings and encrypted
|
|
And: Original numeric values should be recoverable via decryption
|
|
"""
|
|
clear_vault_key("test_numeric_key")
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_mixed_types_config, sample_df.copy())
|
|
|
|
# Verify all numeric columns are now string type (encrypted)
|
|
assert result_df["id"].dtype == object, "Encrypted id should be object/string type"
|
|
assert result_df["age"].dtype == object, "Encrypted age should be object/string type"
|
|
assert result_df["salary"].dtype == object, "Encrypted salary should be object/string type"
|
|
|
|
# Verify original numeric values can be recovered
|
|
key = get_vault_key("test_numeric_key")
|
|
f = Fernet(key)
|
|
|
|
for enc_id in result_df["id"]:
|
|
decrypted = int(f.decrypt(enc_id.encode()).decode())
|
|
assert decrypted in sample_df["id"].values
|
|
|
|
|
|
def test_encrypt_key_generation_on_first_use(sample_df, encrypt_single_column_config):
|
|
"""
|
|
AC1: Tests that encryption key is automatically generated and stored in Vault.
|
|
|
|
Scenario: First-time encryption generates a key automatically
|
|
Given: A structured dataset with valid configuration
|
|
And: No encryption key exists in Vault for the specified key_name
|
|
When: The participant triggers the execution
|
|
Then: The system must generate a new Fernet key
|
|
And: Store it in Vault at the specified path
|
|
And: Use it for encryption
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
# Verify key doesn't exist before encryption
|
|
with pytest.raises(InvalidPath):
|
|
get_vault_key("test_email_key")
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
|
|
# Verify key was created
|
|
key = get_vault_key("test_email_key")
|
|
assert key is not None, "Encryption key should be created in Vault"
|
|
assert len(key) == 44, "Fernet key should be 44 bytes (base64 encoded 32 bytes)"
|
|
|
|
# Verify the key works for decryption
|
|
f = Fernet(key)
|
|
for enc_email in result_df["email"]:
|
|
decrypted = f.decrypt(enc_email.encode()).decode()
|
|
assert decrypted in sample_df["email"].values
|
|
|
|
|
|
def test_encrypt_uses_existing_vault_key(sample_df, encrypt_single_column_config):
|
|
"""
|
|
AC1: Tests that encryption uses an existing key from Vault if present.
|
|
|
|
Scenario: Encryption reuses existing key for consistent pseudonymisation
|
|
Given: A structured dataset
|
|
And: An encryption key already exists in Vault
|
|
When: The participant triggers the execution
|
|
Then: The system must use the existing key (not generate a new one)
|
|
And: The same input produces the same encrypted output (deterministic with same key)
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
# First encryption - generates key
|
|
result_df_1, _ = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
key_1 = get_vault_key("test_email_key")
|
|
|
|
# Second encryption - should use same key
|
|
result_df_2, _ = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
key_2 = get_vault_key("test_email_key")
|
|
|
|
# Verify same key is used
|
|
assert key_1 == key_2, "Encryption should reuse existing Vault key"
|
|
|
|
|
|
# ----------------------- AC2: Invalid Execution Handling ------------------------------------
|
|
|
|
|
|
def test_encrypt_missing_column_error(encrypt_single_column_config):
|
|
"""
|
|
AC2: Tests graceful error handling when a specified column doesn't exist.
|
|
|
|
Scenario: The system aborts gracefully when column is missing
|
|
Given: A structured dataset
|
|
And: A configuration specifying a non-existent column
|
|
When: The participant triggers the execution
|
|
Then: The system must raise a clear ValueError
|
|
And: The error message must indicate which columns are missing
|
|
"""
|
|
df_missing_column = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"name": ["Alice", "Bob", "Charlie"],
|
|
"age": [25, 30, 35],
|
|
# Missing 'email' column
|
|
}
|
|
)
|
|
|
|
with pytest.raises(ValueError) as exc_info:
|
|
run_encrypt_op(encrypt_single_column_config, df_missing_column)
|
|
|
|
assert "not present in the DataFrame" in str(
|
|
exc_info.value
|
|
), "Error message should indicate missing columns"
|
|
assert "email" in str(exc_info.value), "Error message should mention the missing 'email' column"
|
|
|
|
|
|
def test_encrypt_empty_dataframe_handled(encrypt_single_column_config):
|
|
"""
|
|
AC2: Tests graceful handling of empty DataFrame input.
|
|
|
|
Scenario: The system processes empty DataFrame without errors
|
|
Given: An empty structured dataset (no rows)
|
|
And: A valid encryption configuration
|
|
When: The participant triggers the execution
|
|
Then: The system must return an empty DataFrame with correct schema
|
|
And: No errors should be raised
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
empty_df = pd.DataFrame(columns=["id", "name", "email", "age", "salary", "department"])
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_single_column_config, empty_df)
|
|
|
|
assert len(result_df) == 0, "Result should be empty"
|
|
assert "email" in result_df.columns, "Email column should exist in schema"
|
|
|
|
|
|
def test_encrypt_vault_connection_error():
|
|
"""
|
|
AC2: Tests error handling when Vault is unreachable.
|
|
|
|
Scenario: The system fails gracefully when Vault is unavailable
|
|
Given: A structured dataset with valid configuration
|
|
When: Vault service is unreachable or misconfigured
|
|
Then: The system must raise a clear error
|
|
And: The error message must indicate the Vault connection issue
|
|
|
|
Note: This test requires Vault to be down or uses a bad URL.
|
|
For testing purposes, we simulate by using invalid credentials.
|
|
"""
|
|
# Create a mock client that raises an exception when accessing Vault
|
|
mock_client_instance = MagicMock()
|
|
mock_client_instance.secrets.kv.v2.read_secret_version.side_effect = Exception(
|
|
"Simulated Vault connection error"
|
|
)
|
|
|
|
with patch("hvac.Client", return_value=mock_client_instance):
|
|
df = pd.DataFrame(
|
|
{
|
|
"id": [1],
|
|
"name": ["Test"],
|
|
"email": ["test@example.com"],
|
|
"age": [30],
|
|
"salary": [50000.0],
|
|
"department": ["IT"],
|
|
}
|
|
)
|
|
config = AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt", columns=["email"], key_name="test_email_key"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
run_encrypt_op(config, df)
|
|
|
|
error_message = str(exc_info.value)
|
|
assert (
|
|
"Simulated Vault connection error" in error_message
|
|
), "Error should indicate Vault connection issue"
|
|
|
|
|
|
def test_encrypt_null_values_handled(encrypt_single_column_config):
|
|
"""
|
|
AC2: Tests handling of NULL/NaN values in encrypted columns.
|
|
|
|
Scenario: The system handles null values appropriately
|
|
Given: A structured dataset with NULL values in the column to encrypt
|
|
And: A valid encryption configuration
|
|
When: The participant triggers the execution
|
|
Then: The system must process null values (encrypt "nan" string or handle appropriately)
|
|
And: Not raise an exception
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
df_with_nulls = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3, 4],
|
|
"name": ["Alice", "Bob", "Charlie", "David"],
|
|
"email": ["alice@example.com", None, "charlie@example.com", pd.NA],
|
|
"age": [25, 30, 35, 40],
|
|
"salary": [50000.0, 60000.0, 70000.0, 80000.0],
|
|
"department": ["HR", "IT", "Finance", "IT"],
|
|
}
|
|
)
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_single_column_config, df_with_nulls)
|
|
|
|
# Verify execution completed without errors
|
|
assert result_df is not None
|
|
assert len(result_df) == 4
|
|
|
|
# Verify null values were processed (encrypted as string "None" or "nan")
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
|
|
# The null values get converted to string "None" or "nan" before encryption
|
|
for enc_email in result_df["email"]:
|
|
decrypted = f.decrypt(enc_email.encode()).decode()
|
|
# Decrypted value should be original or string representation of null
|
|
assert decrypted in [
|
|
"alice@example.com",
|
|
"charlie@example.com",
|
|
"None",
|
|
"nan",
|
|
"<NA>",
|
|
]
|
|
|
|
|
|
def test_encrypt_duplicate_column_configuration_error():
|
|
"""
|
|
AC2: Tests that duplicate columns across techniques are rejected.
|
|
|
|
Scenario: Configuration validation prevents duplicate column assignments
|
|
Given: A configuration that assigns the same column to multiple techniques
|
|
When: The configuration is validated
|
|
Then: The system must raise a ValueError during configuration creation
|
|
And: The error message must indicate duplicate column assignment
|
|
"""
|
|
with pytest.raises(ValueError) as exc_info:
|
|
AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(type="encrypt", columns=["email"], key_name="key1")
|
|
),
|
|
PseudoTechniqueConfig(
|
|
technique=HashConfig(
|
|
type="hash",
|
|
columns=["email"], # Duplicate column
|
|
algorithm="sha256",
|
|
)
|
|
),
|
|
]
|
|
)
|
|
|
|
assert "Duplicate column" in str(
|
|
exc_info.value
|
|
), "Error should indicate duplicate column configuration"
|
|
|
|
|
|
# ------------------ AC3: DataFrame Input and Output Compliance ------------------------------
|
|
|
|
|
|
def test_encrypt_dataframe_input_output_format(sample_df, encrypt_single_column_config):
|
|
"""
|
|
AC3: Tests that input and output are both pandas DataFrames.
|
|
|
|
Scenario: The system accepts DataFrame input and returns DataFrame output
|
|
Given: A structured dataset as pandas DataFrame
|
|
And: A valid encryption configuration
|
|
When: The participant triggers the execution
|
|
Then: The system must return a pandas DataFrame
|
|
And: The DataFrame structure must be preserved
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
|
|
# Verify output is a DataFrame
|
|
assert isinstance(result_df, pd.DataFrame), "Output must be a pandas DataFrame"
|
|
|
|
# Verify DataFrame structure preserved
|
|
assert list(result_df.columns) == list(sample_df.columns), "Column names should be preserved"
|
|
assert len(result_df) == len(sample_df), "Row count should be preserved"
|
|
|
|
|
|
def test_encrypt_data_types_transformed_correctly(sample_df, encrypt_mixed_types_config):
|
|
"""
|
|
AC3: Tests that data types are transformed appropriately after encryption.
|
|
|
|
Scenario: Encrypted columns change to string type
|
|
Given: A structured dataset with various data types (int, float, str)
|
|
And: An encryption configuration for multiple columns
|
|
When: The participant triggers the execution
|
|
Then: All encrypted columns must be of type object/string
|
|
And: This transformation is valid and consistent with encryption technique
|
|
"""
|
|
clear_vault_key("test_numeric_key")
|
|
|
|
# Store original types
|
|
original_types = sample_df.dtypes.to_dict()
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_mixed_types_config, sample_df.copy())
|
|
|
|
# Verify encrypted columns are now object/string type
|
|
assert result_df["id"].dtype == object, "Encrypted integer column should become object type"
|
|
assert result_df["age"].dtype == object, "Encrypted integer column should become object type"
|
|
assert result_df["salary"].dtype == object, "Encrypted float column should become object type"
|
|
|
|
# Verify data types changed (not same as original)
|
|
assert result_df["id"].dtype != original_types["id"], "Data type should change after encryption"
|
|
|
|
|
|
def test_encrypt_unchanged_columns_preserved(sample_df, encrypt_with_unchanged_columns_config):
|
|
"""
|
|
AC3: Tests that columns not specified for encryption remain unchanged.
|
|
|
|
Scenario: Non-encrypted columns remain identical
|
|
Given: A structured dataset with multiple columns
|
|
And: An encryption configuration for only one column (email)
|
|
When: The participant triggers the execution
|
|
Then: Columns not specified (id, name, age, salary, department) must remain unchanged
|
|
And: Their values and data types must be identical to the input
|
|
"""
|
|
clear_vault_key("test_partial_key")
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_with_unchanged_columns_config, sample_df.copy())
|
|
|
|
# Verify unchanged columns are identical
|
|
assert result_df["id"].equals(sample_df["id"]), "ID column should remain unchanged"
|
|
assert result_df["name"].equals(sample_df["name"]), "Name column should remain unchanged"
|
|
assert result_df["age"].equals(sample_df["age"]), "Age column should remain unchanged"
|
|
assert result_df["salary"].equals(sample_df["salary"]), "Salary column should remain unchanged"
|
|
assert result_df["department"].equals(
|
|
sample_df["department"]
|
|
), "Department column should remain unchanged"
|
|
|
|
# Verify encrypted column is changed
|
|
assert not result_df["email"].equals(
|
|
sample_df["email"]
|
|
), "Email column should be encrypted (changed)"
|
|
|
|
|
|
def test_encrypt_schema_consistency(sample_df, encrypt_multiple_columns_config):
|
|
"""
|
|
AC3: Tests that DataFrame schema is consistent and coherent.
|
|
|
|
Scenario: Output DataFrame has consistent schema
|
|
Given: A structured dataset
|
|
And: A multi-column encryption configuration
|
|
When: The participant triggers the execution
|
|
Then: Output DataFrame must have same column names as input
|
|
And: Column order must be preserved
|
|
And: No columns should be added or removed
|
|
"""
|
|
clear_vault_key("test_multi_key")
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_multiple_columns_config, sample_df.copy())
|
|
|
|
# Verify column names are identical
|
|
assert list(result_df.columns) == list(sample_df.columns), "Column names must be identical"
|
|
|
|
# Verify column order is preserved
|
|
for i, col in enumerate(sample_df.columns):
|
|
assert result_df.columns[i] == col, f"Column order should be preserved at position {i}"
|
|
|
|
# Verify no extra columns added
|
|
assert len(result_df.columns) == len(
|
|
sample_df.columns
|
|
), "Number of columns should remain the same"
|
|
|
|
|
|
def test_encrypt_index_preservation(sample_df, encrypt_single_column_config):
|
|
"""
|
|
AC3: Tests that DataFrame index is preserved after encryption.
|
|
|
|
Scenario: DataFrame index remains unchanged
|
|
Given: A structured dataset with default index
|
|
And: A valid encryption configuration
|
|
When: The participant triggers the execution
|
|
Then: The output DataFrame must preserve the original index
|
|
And: No extraneous index column should be added
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
# Set custom index to verify preservation
|
|
sample_df_with_index = sample_df.copy()
|
|
sample_df_with_index.index = [10, 20, 30, 40, 50]
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_single_column_config, sample_df_with_index)
|
|
|
|
# Verify index is preserved
|
|
assert list(result_df.index) == list(
|
|
sample_df_with_index.index
|
|
), "DataFrame index should be preserved"
|
|
|
|
|
|
# ------------- AC4: Execution Audit & Logging - Positive Scenario ---------------------------
|
|
|
|
|
|
def test_encrypt_successful_execution_logging(sample_df, encrypt_single_column_config):
|
|
"""
|
|
AC4: Tests that successful execution produces appropriate logs/metadata.
|
|
|
|
Scenario: Successful pseudonymisation execution is logged
|
|
Given: A structured dataset with valid configuration
|
|
When: The participant triggers the execution
|
|
And: The execution completes successfully
|
|
Then: The system must return metrics output
|
|
And: Metrics should confirm successful operation
|
|
|
|
Note: Dagster automatically logs:
|
|
- Timestamp of execution (run start/end times)
|
|
- Workflow run identifier (run_id)
|
|
- Configuration parameters (captured in op_config)
|
|
- Success status (run status in Dagster UI)
|
|
|
|
This test validates the op returns proper outputs for Dagster to log.
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
op_config_dict = config_to_dagster_dict(encrypt_single_column_config)
|
|
context = build_op_context(op_config=op_config_dict)
|
|
|
|
# Capture run context information
|
|
run_id = context.run_id
|
|
|
|
# Execute the operation
|
|
result_df, metrics = anonymize_pseudonymize_structured(context, df=sample_df.copy())
|
|
|
|
# Verify outputs for logging
|
|
assert result_df is not None, "Data output should be present for logging"
|
|
assert metrics is not None, "Metrics output should be present for logging"
|
|
assert isinstance(metrics.value, dict), "Metrics should be a dict"
|
|
|
|
# Verify run context is available (Dagster provides this automatically)
|
|
assert run_id is not None, "Run ID should be available for audit logging"
|
|
|
|
# Verify configuration is captured (can be logged)
|
|
assert "used_function" in op_config_dict, "Configuration should be captured for audit"
|
|
# In Dagster format, technique is nested under the discriminator key
|
|
technique_config = op_config_dict["used_function"][0]["technique"]
|
|
assert "encrypt" in technique_config, "Encrypt technique should be present"
|
|
assert (
|
|
technique_config["encrypt"]["key_name"] == "test_email_key"
|
|
), "Key name should be logged (but not key value)"
|
|
|
|
# Verify no PII is in metrics (compliance requirement)
|
|
metrics_str = str(metrics.value)
|
|
for email in sample_df["email"]:
|
|
assert email not in metrics_str, "PII values should not appear in metrics/logs"
|
|
|
|
|
|
def test_encrypt_configuration_parameters_logged(sample_df, encrypt_multiple_columns_config):
|
|
"""
|
|
AC4: Tests that configuration parameters are properly captured for audit.
|
|
|
|
Scenario: Configuration details are available for compliance logging
|
|
Given: A multi-column encryption configuration
|
|
When: The participant triggers the execution
|
|
Then: The system must capture configuration parameters including:
|
|
- Selected technique (encrypt)
|
|
- Columns to encrypt
|
|
- Key name (but not key value)
|
|
And: These parameters should be accessible for audit logging
|
|
"""
|
|
clear_vault_key("test_multi_key")
|
|
|
|
op_config_dict = config_to_dagster_dict(encrypt_multiple_columns_config)
|
|
context = build_op_context(op_config=op_config_dict)
|
|
|
|
result_df, metrics = anonymize_pseudonymize_structured(context, df=sample_df.copy())
|
|
|
|
# Verify configuration details are captured
|
|
technique_config = op_config_dict["used_function"][0]["technique"]
|
|
assert "encrypt" in technique_config, "Encrypt technique should be present"
|
|
assert set(technique_config["encrypt"]["columns"]) == {"name", "email"}
|
|
assert technique_config["encrypt"]["key_name"] == "test_multi_key"
|
|
|
|
# Verify encryption key itself is NOT in config (security)
|
|
config_str = str(op_config_dict)
|
|
try:
|
|
key = get_vault_key("test_multi_key")
|
|
assert (
|
|
key.decode() not in config_str
|
|
), "Encryption key value should never be in logged configuration"
|
|
except Exception:
|
|
pass # Key might not exist yet
|
|
|
|
|
|
# ------------- AC5: Execution Audit & Logging - Negative Scenario ---------------------------
|
|
|
|
|
|
def test_encrypt_failed_execution_logging(encrypt_single_column_config):
|
|
"""
|
|
AC5: Tests that failed execution provides error details for audit.
|
|
|
|
Scenario: Failed pseudonymisation execution is logged with error details
|
|
Given: A structured dataset with valid configuration
|
|
When: The participant triggers the execution
|
|
And: The execution fails (e.g., missing column)
|
|
Then: The system must raise an exception with clear error message
|
|
And: The error message should indicate the failure reason
|
|
And: Configuration parameters should still be accessible for audit
|
|
And: No PII should be exposed in error messages
|
|
"""
|
|
df_missing_column = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"name": ["Alice", "Bob", "Charlie"],
|
|
# Missing 'email' column - will cause failure
|
|
}
|
|
)
|
|
|
|
op_config_dict = config_to_dagster_dict(encrypt_single_column_config)
|
|
context = build_op_context(op_config=op_config_dict)
|
|
run_id = context.run_id
|
|
|
|
# Execute and capture failure
|
|
with pytest.raises(ValueError) as exc_info:
|
|
# Need to consume the generator to trigger execution
|
|
list(anonymize_pseudonymize_structured(context, df=df_missing_column))
|
|
|
|
# Verify error details are available for logging
|
|
error_message = str(exc_info.value)
|
|
assert (
|
|
"not present in the DataFrame" in error_message
|
|
), "Error message should explain failure reason"
|
|
assert "email" in error_message, "Error message should mention the problematic column"
|
|
|
|
# Verify run context is available for failure logging
|
|
assert run_id is not None, "Run ID should be available for failure audit"
|
|
|
|
# Verify configuration is still accessible for audit
|
|
assert op_config_dict is not None, "Configuration should be accessible for failure audit"
|
|
|
|
# Verify no actual data values in error message (PII protection)
|
|
for name in ["Alice", "Bob", "Charlie"]:
|
|
assert name not in error_message, "PII values should not appear in error messages"
|
|
|
|
|
|
def test_encrypt_stack_trace_available_on_failure(encrypt_single_column_config):
|
|
"""
|
|
AC5: Tests that stack trace is available for debugging failed executions.
|
|
|
|
Scenario: Failed execution provides stack trace for troubleshooting
|
|
Given: A configuration that will cause failure
|
|
When: The execution fails
|
|
Then: Python exception with stack trace should be raised
|
|
And: Stack trace should be available for logging (Dagster captures this)
|
|
And: Stack trace should not contain PII values
|
|
"""
|
|
df_missing_column = pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
|
|
|
|
try:
|
|
run_encrypt_op(encrypt_single_column_config, df_missing_column)
|
|
pytest.fail("Should have raised ValueError")
|
|
except ValueError:
|
|
# Verify exception information is available
|
|
import traceback
|
|
|
|
stack_trace = traceback.format_exc()
|
|
|
|
assert "ValueError" in stack_trace, "Exception type should be in stack trace"
|
|
assert (
|
|
"not present in the DataFrame" in stack_trace
|
|
), "Error message should be in stack trace"
|
|
|
|
# Verify stack trace contains code location
|
|
assert (
|
|
"ops.py" in stack_trace or "anonymize_pseudonymize_structured" in stack_trace
|
|
), "Stack trace should indicate error location"
|
|
|
|
|
|
def test_encrypt_vault_error_logged_appropriately(sample_df):
|
|
"""
|
|
AC5: Tests that Vault-related errors are logged with appropriate detail.
|
|
|
|
Scenario: Vault connection/authentication errors are captured
|
|
Given: A configuration with invalid Vault setup
|
|
When: The execution attempts to access Vault
|
|
And: Vault access fails
|
|
Then: The system must raise an error with Vault-specific details
|
|
And: The error should indicate the Vault-related nature of the failure
|
|
|
|
Note: This test validates error handling structure; actual Vault errors
|
|
depend on Vault availability.
|
|
"""
|
|
# Create a mock client that raises an exception when accessing Vault
|
|
mock_client_instance = MagicMock()
|
|
mock_client_instance.secrets.kv.v2.read_secret_version.side_effect = Exception(
|
|
"Simulated Vault authentication error"
|
|
)
|
|
|
|
with patch("hvac.Client", return_value=mock_client_instance):
|
|
config = AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt", columns=["email"], key_name="test_email_key"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
with pytest.raises(ValueError) as exc_info:
|
|
run_encrypt_op(config, sample_df)
|
|
|
|
error_message = str(exc_info.value)
|
|
assert (
|
|
"Simulated Vault authentication error" in error_message
|
|
), "Error should indicate Vault-related failure"
|
|
|
|
|
|
# --------------- Additional Edge Cases & Integration Tests ----------------------------------
|
|
|
|
|
|
def test_encrypt_large_dataset_performance(encrypt_single_column_config):
|
|
"""
|
|
Additional test: Validates encryption works with larger datasets.
|
|
|
|
Tests that encryption scales to realistic dataset sizes without errors.
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
# Create a larger dataset (1000 rows)
|
|
large_df = pd.DataFrame(
|
|
{
|
|
"id": range(1000),
|
|
"name": [f"Person{i}" for i in range(1000)],
|
|
"email": [f"person{i}@example.com" for i in range(1000)],
|
|
"age": [25 + (i % 50) for i in range(1000)],
|
|
"salary": [50000.0 + (i * 100) for i in range(1000)],
|
|
"department": ["HR", "IT", "Finance"] * 333 + ["HR"],
|
|
}
|
|
)
|
|
|
|
# Save original values for comparison
|
|
original_emails = large_df["email"].copy()
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_single_column_config, large_df)
|
|
|
|
assert len(result_df) == 1000, "All rows should be processed"
|
|
assert not result_df["email"].equals(original_emails), "All email values should be encrypted"
|
|
|
|
|
|
def test_encrypt_special_characters_in_data(encrypt_single_column_config):
|
|
"""
|
|
Additional test: Validates encryption handles special characters correctly.
|
|
|
|
Tests that encryption works with unicode, special chars, emojis, etc.
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
df_special = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3, 4],
|
|
"name": ["Müller", "José", "李明", "🙂 John"],
|
|
"email": [
|
|
"test@müller.de",
|
|
"josé@example.com",
|
|
"李明@example.cn",
|
|
"emoji@😀.com",
|
|
],
|
|
"age": [25, 30, 35, 40],
|
|
"salary": [50000.0, 60000.0, 70000.0, 80000.0],
|
|
"department": ["HR", "IT", "Finance", "IT"],
|
|
}
|
|
)
|
|
|
|
# Save original values for comparison
|
|
original_emails = df_special["email"].copy().tolist()
|
|
|
|
result_df, metrics = run_encrypt_op(encrypt_single_column_config, df_special)
|
|
|
|
# Verify special characters are encrypted and recoverable
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
|
|
decrypted_emails = [f.decrypt(enc.encode()).decode() for enc in result_df["email"]]
|
|
assert set(decrypted_emails) == set(
|
|
original_emails
|
|
), "Special characters should be preserved through encryption/decryption"
|
|
|
|
|
|
def test_encrypt_deterministic_within_session(sample_df, encrypt_single_column_config):
|
|
"""
|
|
Additional test: Validates encryption produces consistent results with same key.
|
|
|
|
Note: Fernet encryption includes a timestamp, so it's NOT deterministic.
|
|
This test validates that decryption recovers the original value consistently.
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
# First encryption
|
|
result_df_1, _ = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
|
|
# Get the key used
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
|
|
# Verify first encryption decrypts correctly
|
|
decrypted_1 = [f.decrypt(enc.encode()).decode() for enc in result_df_1["email"]]
|
|
assert decrypted_1 == sample_df["email"].tolist(), "Decryption should recover original values"
|
|
|
|
# Second encryption with same key (different encrypted values due to timestamp)
|
|
result_df_2, _ = run_encrypt_op(encrypt_single_column_config, sample_df.copy())
|
|
|
|
# Verify second encryption also decrypts correctly
|
|
decrypted_2 = [f.decrypt(enc.encode()).decode() for enc in result_df_2["email"]]
|
|
assert (
|
|
decrypted_2 == sample_df["email"].tolist()
|
|
), "Decryption should consistently recover original values"
|
|
|
|
# Note: Encrypted values will be different due to Fernet's timestamp
|
|
assert not result_df_1["email"].equals(
|
|
result_df_2["email"]
|
|
), "Fernet encryption includes timestamp, so outputs differ"
|
|
|
|
|
|
def test_encrypt_empty_string_values(encrypt_single_column_config):
|
|
"""
|
|
Additional test: Validates encryption handles empty strings correctly.
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
df_empty_strings = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"name": ["Alice", "", "Charlie"],
|
|
"email": ["alice@example.com", "", "charlie@example.com"],
|
|
"age": [25, 30, 35],
|
|
"salary": [50000.0, 60000.0, 70000.0],
|
|
"department": ["HR", "IT", "Finance"],
|
|
}
|
|
)
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_single_column_config, df_empty_strings)
|
|
|
|
# Verify empty strings are encrypted
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
|
|
decrypted_emails = [f.decrypt(enc.encode()).decode() for enc in result_df["email"]]
|
|
assert "" in decrypted_emails, "Empty strings should be encrypted and recoverable"
|
|
|
|
|
|
@pytest.mark.edge_case
|
|
def test_encrypt_very_long_strings(encrypt_single_column_config):
|
|
"""
|
|
Edge case: Encryption of very long string values (e.g., 10KB+)
|
|
|
|
Validates that Fernet encryption handles large strings without truncation.
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
# Create DataFrame with very long strings
|
|
long_string = "x" * 10000 # 10KB string
|
|
df_long_strings = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"name": ["Alice", "Bob", "Charlie"],
|
|
"email": [
|
|
f"{long_string}@example.com",
|
|
"bob@example.com",
|
|
"charlie@example.com",
|
|
],
|
|
"age": [25, 30, 35],
|
|
"salary": [50000.0, 60000.0, 70000.0],
|
|
"department": ["HR", "IT", "Finance"],
|
|
}
|
|
)
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_single_column_config, df_long_strings)
|
|
|
|
# Verify long string is encrypted and recoverable
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
decrypted = f.decrypt(result_df.loc[0, "email"].encode()).decode()
|
|
assert (
|
|
decrypted == f"{long_string}@example.com"
|
|
), "Very long strings should be encrypted and recoverable"
|
|
|
|
|
|
@pytest.mark.edge_case
|
|
def test_encrypt_column_with_all_identical_values(encrypt_single_column_config):
|
|
"""
|
|
Edge case: Encryption when all values in a column are identical
|
|
|
|
Validates that encryption produces different outputs for identical inputs
|
|
(due to Fernet's timestamp-based nonce).
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
df_identical = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3, 4, 5],
|
|
"name": ["Alice"] * 5,
|
|
"email": ["same@example.com"] * 5, # All identical
|
|
"age": [30] * 5,
|
|
"salary": [60000.0] * 5,
|
|
"department": ["IT"] * 5,
|
|
}
|
|
)
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_single_column_config, df_identical)
|
|
|
|
# Verify all encrypted values are unique (due to Fernet timestamp)
|
|
encrypted_values = result_df["email"].tolist()
|
|
assert (
|
|
len(set(encrypted_values)) == 5
|
|
), "Fernet should produce unique ciphertexts even for identical plaintexts"
|
|
|
|
# Verify all decrypt to same original value
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
decrypted_values = [f.decrypt(enc.encode()).decode() for enc in encrypted_values]
|
|
assert all(
|
|
val == "same@example.com" for val in decrypted_values
|
|
), "All encrypted values should decrypt to same original"
|
|
|
|
|
|
@pytest.mark.edge_case
|
|
def test_encrypt_whitespace_only_values(encrypt_single_column_config):
|
|
"""
|
|
Edge case: Encryption of whitespace-only values
|
|
"""
|
|
clear_vault_key("test_email_key")
|
|
|
|
df_whitespace = pd.DataFrame(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"name": ["Alice", "Bob", "Charlie"],
|
|
"email": [" ", "\t\t", "\n\n"], # Various whitespace
|
|
"age": [25, 30, 35],
|
|
"salary": [50000.0, 60000.0, 70000.0],
|
|
"department": ["HR", "IT", "Finance"],
|
|
}
|
|
)
|
|
|
|
# Store original values before encryption
|
|
original_emails = df_whitespace["email"].tolist()
|
|
|
|
result_df, _ = run_encrypt_op(encrypt_single_column_config, df_whitespace)
|
|
|
|
# Verify whitespace values are encrypted and recoverable
|
|
key = get_vault_key("test_email_key")
|
|
f = Fernet(key)
|
|
encrypted_emails = result_df["email"].tolist()
|
|
|
|
for orig_ws, enc_val in zip(original_emails, encrypted_emails):
|
|
decrypted = f.decrypt(enc_val.encode()).decode()
|
|
assert (
|
|
decrypted == orig_ws
|
|
), f"Whitespace value {repr(orig_ws)} should be preserved, but got {repr(decrypted)}"
|
|
|
|
|
|
@pytest.mark.edge_case
|
|
@pytest.mark.parametrize(
|
|
"column_type,test_values",
|
|
[
|
|
("integer", [1, 2, 3, 4, 5]),
|
|
("float", [1.1, 2.2, 3.3, 4.4, 5.5]),
|
|
("string", ["a", "b", "c", "d", "e"]),
|
|
],
|
|
)
|
|
def test_encrypt_various_data_types(column_type, test_values):
|
|
"""
|
|
Parameterized test: Encryption across different pandas data types
|
|
"""
|
|
clear_vault_key("test_type_key")
|
|
|
|
df = pd.DataFrame(
|
|
{
|
|
"id": range(len(test_values)),
|
|
"test_column": test_values,
|
|
"name": ["Person"] * len(test_values),
|
|
"email": ["test@example.com"] * len(test_values),
|
|
"age": [30] * len(test_values),
|
|
"salary": [60000.0] * len(test_values),
|
|
"department": ["IT"] * len(test_values),
|
|
}
|
|
)
|
|
|
|
config = AnonymisePseudonymizeStructuredConfig(
|
|
used_function=[
|
|
PseudoTechniqueConfig(
|
|
technique=EncryptConfig(
|
|
type="encrypt", columns=["test_column"], key_name="test_type_key"
|
|
)
|
|
)
|
|
]
|
|
)
|
|
|
|
result_df, _ = run_encrypt_op(config, df)
|
|
|
|
# Verify encryption occurred (values changed to strings)
|
|
assert (
|
|
result_df["test_column"].dtype == object
|
|
), f"Encrypted {column_type} should become object type"
|
|
|
|
# Verify decryption recovers original values
|
|
key = get_vault_key("test_type_key")
|
|
f = Fernet(key)
|
|
for idx, orig_val in enumerate(test_values):
|
|
decrypted = f.decrypt(result_df.loc[idx, "test_column"].encode()).decode()
|
|
assert decrypted == str(
|
|
orig_val
|
|
), f"Decrypted value should match original {column_type} value"
|