diff --git a/Dockerfile b/Dockerfile index b61745b..fd4e780 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,13 +1,16 @@ FROM python:3.12-slim-bookworm +# Install git for git-based dependencies +RUN apt-get update && apt-get install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/* + WORKDIR /app COPY pyproject.toml . -RUN pip install --no-cache-dir dagster dagster-webserver - COPY src/ src/ + +# Install the package and all dependencies RUN pip install --no-cache-dir . -EXPOSE 3000 +EXPOSE 4000 -CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "template-code-location.repository"] +CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "src/template_code_location/repository.py"] diff --git a/pyproject.toml b/pyproject.toml index ca2cdc0..3b2741f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,23 +4,43 @@ build-backend = "setuptools.build_meta" [project] name = "template-code-location" -version = "0.0.1" -description = "Template code location for data processings services" +version = "0.1.0" +description = "Consolidated code location for all data services workflows" requires-python = ">=3.12" dependencies = [ + # Dagster core "dagster>=1.8.13", "dagster-webserver>=1.8.13", "dagster-postgres>=0.24.13", - "pandas>=3.0", + # Data processing + "pandas>=2.1.4", "pyarrow>=23.0", + "numpy>=2.4", "lxml>=6.0", "xmltodict>=1.0", "rdflib>=7.6", - "numpy>=2.4", + "openpyxl", + "xlrd>=2.0.1", + "tabulate==0.8.10", + "pyspellchecker>=0.8.4", + "PyGeodesy>=24.6.11", + # Validation "great_expectations>=1.16", "pandera>=0.31", + "pydantic>=2.6.0,<3.0.0", + # Scraping "scrapy>=2.15", "BeautifulSoup4>=4.14", + # Anonymisation libraries + "pycanon==1.0.1.post2", + "anjana>=1.0.0", + # Field-level pseudo-anonymisation + "scrubadub", + "scrubadub_spacy", + "hvac", + "cryptography", + # Util services (git dependency) + "util-services @ git+https://code.europa.eu/simpl/simpl-open/development/data-services/util-services.git@v0.4.1", ] [project.optional-dependencies] diff --git a/src/template-code-location/repository.py b/src/template-code-location/repository.py deleted file mode 100644 index 10c73e6..0000000 --- a/src/template-code-location/repository.py +++ /dev/null @@ -1,6 +0,0 @@ -from dagster import Definitions -from .jobs.jobs import data_processing_job - -defs = Definitions( - jobs=[data_processing_job], -) diff --git a/src/template-code-location/__init__.py b/src/template_code_location/__init__.py similarity index 100% rename from src/template-code-location/__init__.py rename to src/template_code_location/__init__.py diff --git a/src/template-code-location/jobs/__init__.py b/src/template_code_location/data_processing/__init__.py similarity index 100% rename from src/template-code-location/jobs/__init__.py rename to src/template_code_location/data_processing/__init__.py diff --git a/src/template_code_location/data_processing/config_models/__init__.py b/src/template_code_location/data_processing/config_models/__init__.py new file mode 100644 index 0000000..5833cab --- /dev/null +++ b/src/template_code_location/data_processing/config_models/__init__.py @@ -0,0 +1,18 @@ +"""Configuration models for data processing.""" + +from .columns_select_configuration import ColumnsSelectConfiguration +from .fill_missing_config import FillMissingConfiguration +from .spell_check_configuration import SpellCheckConfiguration +from .coordinates_normalization_configuration import CoordinatesNormalizationConfiguration +from .aggregation_configuration import AggregationConfiguration +from .filter_configuration import DatasetFilterConfiguration, FilterCondition + +__all__ = [ + "ColumnsSelectConfiguration", + "FillMissingConfiguration", + "SpellCheckConfiguration", + "CoordinatesNormalizationConfiguration", + "AggregationConfiguration", + "FilterCondition", + "DatasetFilterConfiguration" +] diff --git a/src/template_code_location/data_processing/config_models/aggregation_configuration.py b/src/template_code_location/data_processing/config_models/aggregation_configuration.py new file mode 100644 index 0000000..553740f --- /dev/null +++ b/src/template_code_location/data_processing/config_models/aggregation_configuration.py @@ -0,0 +1,25 @@ +from typing import List + +from pydantic import Field, field_validator + +from .columns_select_configuration import ColumnsSelectConfiguration + + +class AggregationConfiguration(ColumnsSelectConfiguration): + + operation: str = Field( + default="sum", + description="Aggregation operations: sum, mean, min, max, count" + ) + + @field_validator("operation") + @classmethod + def validate_operations(cls, value): + allowed = {"sum", "mean", "min", "max", "count"} + if value not in allowed: + raise ValueError( + f"Invalid aggregation operation '{value}'. " + f"Allowed values: {allowed}" + ) + + return value diff --git a/src/template_code_location/data_processing/config_models/columns_select_configuration.py b/src/template_code_location/data_processing/config_models/columns_select_configuration.py new file mode 100644 index 0000000..658450d --- /dev/null +++ b/src/template_code_location/data_processing/config_models/columns_select_configuration.py @@ -0,0 +1,17 @@ +from typing import List +from pydantic import Field,field_validator +from dagster import Config + + +class ColumnsSelectConfiguration(Config): + columns: List[str] = Field( + default=["Name"], description="List of columns to process." + ) + + @field_validator("columns") + @classmethod + def ensure_unique_columns(cls, v: List[str]) -> List[str]: + + unique_values = list(dict.fromkeys(v)) + + return unique_values diff --git a/src/template_code_location/data_processing/config_models/coordinates_normalization_configuration.py b/src/template_code_location/data_processing/config_models/coordinates_normalization_configuration.py new file mode 100644 index 0000000..64342e4 --- /dev/null +++ b/src/template_code_location/data_processing/config_models/coordinates_normalization_configuration.py @@ -0,0 +1,22 @@ +from typing import Optional + +from pydantic import Field, model_validator +from dagster import Config + + +class CoordinatesNormalizationConfiguration(Config): + latColumn: Optional[str] = Field( + default="lat", description="Latitude column name" + ) + lonColumn: Optional[str] = Field( + default="lon", description="Longitude column name" + ) + + @model_validator(mode="before") + @classmethod + def replace_nulls_with_defaults(cls, values): + if values.get("latColumn") is None: + values["latColumn"] = "lat" + if values.get("lonColumn") is None: + values["lonColumn"] = "lon" + return values diff --git a/src/template_code_location/data_processing/config_models/fill_missing_config.py b/src/template_code_location/data_processing/config_models/fill_missing_config.py new file mode 100644 index 0000000..4c9e5b2 --- /dev/null +++ b/src/template_code_location/data_processing/config_models/fill_missing_config.py @@ -0,0 +1,9 @@ +from typing import Dict +from dagster import Config +from pydantic import Field + + +class FillMissingConfiguration(Config): + fill_map: Dict[str, str] = Field( + default={"Age": "UNKNOWN_AGE"}, description="Missing values filling map." + ) diff --git a/src/template_code_location/data_processing/config_models/filter_configuration.py b/src/template_code_location/data_processing/config_models/filter_configuration.py new file mode 100644 index 0000000..86bde37 --- /dev/null +++ b/src/template_code_location/data_processing/config_models/filter_configuration.py @@ -0,0 +1,52 @@ +from enum import Enum +import operator +from typing import List, Literal, Callable +from pydantic import Field, model_validator +from dagster import Config +import pandas as pd + +class FilterOperator(str, Enum): + EQ = "==" + NE = "!=" + LT = "<" + LE = "<=" + GT = ">" + GE = ">=" + + @property + def function(self) -> Callable: + mapping = { + FilterOperator.EQ: operator.eq, + FilterOperator.NE: operator.ne, + FilterOperator.LT: operator.lt, + FilterOperator.LE: operator.le, + FilterOperator.GT: operator.gt, + FilterOperator.GE: operator.ge, + } + return mapping[self] + +class FilterCondition(Config): + column: str = Field(..., description="Name of the column to filter") + type: Literal["string", "numeric"] = Field(..., description="Column type (string or numeric)") + value: str = Field(..., description="Value to compare against") + op: FilterOperator = Field(default=FilterOperator.EQ, description="Operator to apply (string supports only EQ and NE)") + + @model_validator(mode="after") + def check_operator_compatibility(self) -> "FilterCondition": + if self.type == "string" and self.op not in [FilterOperator.EQ, FilterOperator.NE]: + raise ValueError( + f"Invalid operator '{self.op.name}' for type 'string'. " + "Only EQ (==) and NE (!=) are allowed." + ) + return self + + def apply(self, df: pd.DataFrame) -> pd.Series: + val = float(self.value) if self.type == "numeric" else self.value + return self.op.function(df[self.column], val) + +class DatasetFilterConfiguration(Config): + conditions: List[FilterCondition] = Field( + default=[], + description="List of filter conditions to apply on the dataset. " + "String columns support only 'EQ' and 'NE', numeric columns also support 'LT', 'LE', 'GT' and 'GE'." + ) diff --git a/src/template_code_location/data_processing/config_models/spell_check_configuration.py b/src/template_code_location/data_processing/config_models/spell_check_configuration.py new file mode 100644 index 0000000..7a12f87 --- /dev/null +++ b/src/template_code_location/data_processing/config_models/spell_check_configuration.py @@ -0,0 +1,8 @@ +from typing import Literal +from pydantic import Field + +from .columns_select_configuration import ColumnsSelectConfiguration + + +class SpellCheckConfiguration(ColumnsSelectConfiguration): + language: Literal["en", "es", "it", "fr", "pt", "de", "nl"] = Field(default="en", description="Language to use in the SpellChecker module.") diff --git a/src/template_code_location/data_processing/jobs.py b/src/template_code_location/data_processing/jobs.py new file mode 100644 index 0000000..54fb939 --- /dev/null +++ b/src/template_code_location/data_processing/jobs.py @@ -0,0 +1,119 @@ +from dagster import job +from util_services.util_ops import ( + preview_dataframe, + read_csv_from_s3, + write_csv_to_s3, +) +from .ops import ( + remove_duplicates, + fill_missing_values, + standardize_categorical_values, + correct_typos, + normalize_numeric_min_max, + normalize_datetime, + normalize_coordinates, + add_global_aggregations, + filter_dataset +) + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def remove_duplicates_job_s3(): + org_df = read_csv_from_s3() + anon_df = remove_duplicates(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def fill_missing_values_job_s3(): + org_df = read_csv_from_s3() + anon_df = fill_missing_values(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def standardize_categorical_values_job_s3(): + org_df = read_csv_from_s3() + anon_df = standardize_categorical_values(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def correct_typos_job_s3(): + org_df = read_csv_from_s3() + anon_df = correct_typos(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def normalize_numeric_min_max_job_s3(): + org_df = read_csv_from_s3() + anon_df = normalize_numeric_min_max(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def normalize_datetime_job_s3(): + org_df = read_csv_from_s3() + anon_df = normalize_datetime(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def normalize_coordinates_job_s3(): + org_df = read_csv_from_s3() + anon_df = normalize_coordinates(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def add_global_aggregations_job_s3(): + org_df = read_csv_from_s3() + anon_df = add_global_aggregations(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) + +@job(tags={ + "business_operation": "PROCESSING", + "resource_type": "RD_DATA" +}) +def filter_dataset_job_s3(): + org_df = read_csv_from_s3() + anon_df = filter_dataset(org_df) + preview_dataframe(org_df) + write_csv_to_s3(anon_df) + preview_dataframe(anon_df) diff --git a/src/template_code_location/data_processing/ops.py b/src/template_code_location/data_processing/ops.py new file mode 100644 index 0000000..e380cb8 --- /dev/null +++ b/src/template_code_location/data_processing/ops.py @@ -0,0 +1,256 @@ +import pandas as pd +from dagster import Out, op +from spellchecker import SpellChecker + +from template_code_location.data_processing.config_models import ( + AggregationConfiguration, + ColumnsSelectConfiguration, + CoordinatesNormalizationConfiguration, + FillMissingConfiguration, + SpellCheckConfiguration, + DatasetFilterConfiguration +) + + +def _parse_dms_to_decimal(value): + """Parse a DMS (degrees-minutes-seconds) string to decimal degrees using PyGeodesy. + + Supported formats include (but are not limited to): + - 40°26'46"N / 40°26′46″N + - 40 26 46 N + - 40:26:46N + - 40d26m46sN + - -40.446 (already decimal – returned as-is) + + Returns None if parsing fails. + """ + from pygeodesy.dms import parseDMS + + if pd.isna(value): + return None + + text = str(value).strip() + if not text: + return None + + try: + return float(parseDMS(text)) + except (ValueError, TypeError): + try: + return float(text) + except (ValueError, TypeError): + return None + + +@op(out={"data": Out()}) +def remove_duplicates(context, df: pd.DataFrame): + """Remove duplicate rows from the input DataFrame.""" + logger = context.log + + before = df.shape[0] + + df = df.drop_duplicates() + + after = df.shape[0] + + logger.info(f"Removed {before - after} duplicate rows") + + return df + +@op(out={"data": Out()}) +def fill_missing_values(context, config: FillMissingConfiguration, df: pd.DataFrame): + """Fill missing values in the DataFrame according to the configured column-to-value mapping.""" + logger = context.log + + logger.info(f"Filling missing values: {config.fill_map}") + + return df.fillna(config.fill_map) + +@op(out={"data": Out()}) +def standardize_categorical_values(context, config: ColumnsSelectConfiguration, df: pd.DataFrame): + """Standardize categorical values in selected columns by trimming whitespace and converting text to lowercase.""" + logger = context.log + + for col in config.columns: + if col not in df.columns: + logger.warning(f"Column '{col}' not found in DataFrame, skipping.") + continue + + original = df[col] + + standardized = ( + df[col] + .fillna("") + .astype(str) + .str.strip() + .str.lower() + ) + + changed_count = (original != standardized).sum() + df[col] = standardized + + logger.info(f"Standardized '{col}' column – {changed_count} values modified") + + return df + +@op(out={"data": Out()}) +def correct_typos(context, config: SpellCheckConfiguration, df: pd.DataFrame): + """Correct spelling mistakes in the specified text columns.""" + logger = context.log + + for column in config.columns: + if column not in df.columns: + logger.warning(f"Column '{column}' not found in DataFrame, skipping.") + continue + + spell = SpellChecker(language=config.language) + + original = df[column].astype(str) + corrected = original.apply(lambda x, spell_checker=spell: spell_checker.correction(x) if x else x) + + changed_count = (original != corrected).sum() + logger.info(f"Corrected typos in '{column}' – {changed_count} values modified") + + df[column] = corrected + + return df + +@op(out={"data": Out()}) +def normalize_datetime(context, config: ColumnsSelectConfiguration, df: pd.DataFrame): + logger = context.log + + for col in config.columns: + if col not in df.columns: + logger.warning(f"Column '{col}' not found, skipping normalization.") + continue + + normalized = pd.to_datetime(df[col], utc=True, format="mixed", dayfirst=True, errors="coerce") + + if normalized.notna().sum() == 0: + logger.warning( + f"Column '{col}' has no normalizable datetime values, skipping." + ) + continue + + iso_col = f"{col}_iso" + + formatted = normalized.dt.strftime("%Y-%m-%dT%H:%M:%SZ").fillna("") + non_empty = formatted[formatted != ""] + if len(non_empty) > 0 and non_empty.str.startswith("1970-01-01").all(): + logger.warning( + f"Column '{col}' all normalized values are '1970-01-01', likely bad input — skipping." + ) + continue + + df[iso_col] = formatted + + logger.info(f"Normalized datetime column '{col}' into '{iso_col}'") + + return df + +@op(out={"data": Out()}) +def normalize_numeric_min_max(context, config: ColumnsSelectConfiguration, df: pd.DataFrame): + logger = context.log + + for col in config.columns: + if col not in df.columns: + logger.warning(f"Column '{col}' not found, skipping normalization.") + continue + + min_val = df[col].min() + max_val = df[col].max() + + if min_val == max_val: + logger.warning(f"Column '{col}' has constant values, skipping normalization.") + continue + + df[col + "_norm"] = (df[col] - min_val) / (max_val - min_val) + logger.info(f"Normalized numeric column '{col}'") + + return df + +@op(out={"data": Out()}) +def normalize_coordinates(context, config: CoordinatesNormalizationConfiguration, df: pd.DataFrame): + logger = context.log + + lat = config.latColumn + lon = config.lonColumn + + for col in [lat, lon]: + if pd.api.types.is_numeric_dtype(df[col]): + logger.info(f"Column '{col}' is numeric — coercing directly") + df[col] = pd.to_numeric(df[col], errors="coerce") + else: + logger.info(f"Column '{col}' is non-numeric — parsing as DMS with PyGeodesy") + df[col] = df[col].apply(_parse_dms_to_decimal) + + invalid_lat = df[lat].isnull().sum() + invalid_lon = df[lon].isnull().sum() + logger.info(f"Found {invalid_lat} invalid latitudes and {invalid_lon} invalid longitudes") + + df[lat] = df[lat].round(4) + df[lon] = df[lon].round(4) + + before_filter_rows = len(df) + df = df[(df[lat].between(-90, 90)) & (df[lon].between(-180, 180))] + after_filter_rows = len(df) + logger.info(f"Filtered coordinates out of range: removed {before_filter_rows - after_filter_rows} rows") + + logger.info(f"Coordinate normalization completed: resulting dataframe has {after_filter_rows} rows") + + return df + +@op(out={"data": Out()}) +def add_global_aggregations(context, config: AggregationConfiguration, df: pd.DataFrame): + logger = context.log + + group_by_cols = [] + + for col in config.columns: + if col not in df.columns: + logger.warning(f"Column '{col}' not found, skipping aggregation.") + continue + group_by_cols.append(col) + + if config.operation not in {"sum", "mean", "min", "max", "count"}: + logger.warning(f"Unsupported aggregation '{config.operation}'") + + numeric_cols = df.select_dtypes(include=['number']).columns.tolist() + cols_to_keep = list(set(numeric_cols + group_by_cols)) + df = df[[c for c in cols_to_keep if c in df.columns]] + df = df.groupby(group_by_cols).agg(config.operation).reset_index() + return df + +@op(out={"data": Out()}) +def filter_dataset(context, config: DatasetFilterConfiguration, df: pd.DataFrame): + logger = context.log + total_rows_before = len(df) + + logger.info(f"Starting dataset filtering: initial dataframe has {total_rows_before} rows") + + combined_mask = pd.Series([True] * total_rows_before, index=df.index) + + for condition in config.conditions: + if condition.column not in df.columns: + logger.warning(f"Column '{condition.column}' not found, skipping filtering.") + continue + if df[condition.column].isna().all(): + logger.warning(f"Column '{condition.column}' is empty (all NaN), skipping filtering.") + continue + try: + current_mask = condition.apply(df) + combined_mask &= current_mask + + logger.info(f"Applied filter: {condition.column} {condition.op.value} '{condition.value}'") + except Exception as e: + logger.error(f"Error applying filter on column '{condition.column}': {e}") + + filtered_df = df[combined_mask] + total_rows_after = len(filtered_df) + + logger.info( + f"Filtering completed: {total_rows_after} rows remain " + f"(removed {total_rows_before - total_rows_after} rows in total)" + ) + + return filtered_df diff --git a/src/template-code-location/ops/__init__.py b/src/template_code_location/dataframe_level_anonymisation/__init__.py similarity index 100% rename from src/template-code-location/ops/__init__.py rename to src/template_code_location/dataframe_level_anonymisation/__init__.py diff --git a/src/template_code_location/dataframe_level_anonymisation/config_models/__init__.py b/src/template_code_location/dataframe_level_anonymisation/config_models/__init__.py new file mode 100644 index 0000000..0f490b5 --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/config_models/__init__.py @@ -0,0 +1,13 @@ +"""Configuration models for dataframe-level anonymization.""" + +from .k_anonymity_configuration import KAnonymityConfiguration +from .l_diversity_configuration import LDiversityConfiguration +from .t_closeness_configuration import TClosenessConfiguration +from .base_config import BaseConfiguration + +__all__ = [ + "BaseConfiguration", + "KAnonymityConfiguration", + "LDiversityConfiguration", + "TClosenessConfiguration", +] diff --git a/src/template_code_location/dataframe_level_anonymisation/config_models/base_config.py b/src/template_code_location/dataframe_level_anonymisation/config_models/base_config.py new file mode 100644 index 0000000..4abf451 --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/config_models/base_config.py @@ -0,0 +1,33 @@ +from typing import Dict, List +from dagster import Config +from pydantic import Field, field_validator, model_validator + + +class BaseConfiguration(Config): + ident: List[str] = Field(default=["Name"], description="List of identifier column names.") + quasi_identifiers: List[str] = Field(default=["Age"], description="List of quasi-identifier column names.") + supp_level: float = Field(default=50.0, ge=0.0, le=100.0, description="Max suppression allowed (0–100).") + generalisation_hierarchies: Dict[str, str] = Field( + default={"Age": "simpl_age"}, description="Hierarchies used to generalize quasi-identifiers." + ) + + @field_validator("quasi_identifiers") + def validate_quasi_identifiers(cls, value): + if not value: + raise ValueError("At least one quasi-identifier must be provided.") + return value + + @field_validator("ident") + def validate_ident(cls, value): + if not value: + raise ValueError("At least one identifier must be provided.") + return value + + @model_validator(mode="after") + def check_no_overlap(self): + ident = set(self.ident) + quasi = set(self.quasi_identifiers) + overlap = ident & quasi + if overlap: + raise ValueError(f"Fields cannot be both identifiers and quasi-identifiers: {overlap}") + return self diff --git a/src/template_code_location/dataframe_level_anonymisation/config_models/hierarchies.py b/src/template_code_location/dataframe_level_anonymisation/config_models/hierarchies.py new file mode 100644 index 0000000..65105a0 --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/config_models/hierarchies.py @@ -0,0 +1,18 @@ +from anjana.anonymity.utils import utils + +simpl_age = { + 0: [age for age in range(0, 100)], + 1: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 5), + 2: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 10), + 3: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 20), + 4: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 100), +} +simpl_age2 = { + 0: [age for age in range(0, 100)], + 1: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 5), +} +simpl_gender = {0: ["M", "F", "O"], 1: ["*", "*", "*"]} + + +def get_all_hierarchies(): + return {name: obj for name, obj in globals().items() if isinstance(obj, dict)} diff --git a/src/template_code_location/dataframe_level_anonymisation/config_models/k_anonymity_configuration.py b/src/template_code_location/dataframe_level_anonymisation/config_models/k_anonymity_configuration.py new file mode 100644 index 0000000..0ddd88f --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/config_models/k_anonymity_configuration.py @@ -0,0 +1,11 @@ +from typing import List +from pydantic import Field + +from .base_config import BaseConfiguration + + +class KAnonymityConfiguration(BaseConfiguration): + k: int = Field(default=3, ge=2, description="Desired level of k-anonymity (must be >= 2).") + sensitive_attributes: List[str] = Field( + default=["Disease"], description="List of sensitive attribute column names." + ) diff --git a/src/template_code_location/dataframe_level_anonymisation/config_models/l_diversity_configuration.py b/src/template_code_location/dataframe_level_anonymisation/config_models/l_diversity_configuration.py new file mode 100644 index 0000000..c764f1d --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/config_models/l_diversity_configuration.py @@ -0,0 +1,8 @@ +from pydantic import Field +from .base_config import BaseConfiguration + + +class LDiversityConfiguration(BaseConfiguration): + k: int = Field(default=2, ge=2, description="Desired level of k-anonymity (must be >= 2).") + l: int = Field(default=3, ge=1, description="L-diversity level (must be >= 1)") + sensitive_attribute: str = Field(default="Disease", description="Sensitive attribute name.") diff --git a/src/template_code_location/dataframe_level_anonymisation/config_models/t_closeness_configuration.py b/src/template_code_location/dataframe_level_anonymisation/config_models/t_closeness_configuration.py new file mode 100644 index 0000000..4461539 --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/config_models/t_closeness_configuration.py @@ -0,0 +1,8 @@ +from pydantic import Field +from .base_config import BaseConfiguration + + +class TClosenessConfiguration(BaseConfiguration): + k: int = Field(default=2, ge=2, description="Desired level of k-anonymity (must be >= 2).") + t: float = Field(default=0.5, ge=0.0, le=1.0, description="Maximum t-distance threshold.") + sensitive_attribute: str = Field(default="Disease", description="Sensitive attribute name.") diff --git a/src/template_code_location/dataframe_level_anonymisation/jobs.py b/src/template_code_location/dataframe_level_anonymisation/jobs.py new file mode 100644 index 0000000..35c76f7 --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/jobs.py @@ -0,0 +1,86 @@ +from dagster import job +from util_services.util_ops import ( + preview_dataframe, + read_structured_to_df, + write_df_to_local, + read_structured_from_s3, + write_df_to_s3, + write_semistructured_to_s3, + read_semistructured_from_s3 +) + +from .ops import apply_k_anonymity, apply_l_diversity, apply_t_closeness + + +@job(tags={ + "business_operation": "ANONYMISATION" +}) +def k_anonymity_job(): + org_df = read_structured_to_df() + anon_df, _ = apply_k_anonymity(org_df) + preview_dataframe(org_df) + write_df_to_local(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "ANONYMISATION" +}) +def l_diversity_job(): + org_df = read_structured_to_df() + anon_df, _ = apply_l_diversity(org_df) + preview_dataframe(org_df) + write_df_to_local(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "ANONYMISATION" +}) +def t_closeness_job(): + org_df = read_structured_to_df() + anon_df, _ = apply_t_closeness(org_df) + preview_dataframe(org_df) + write_df_to_local(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "ANONYMISATION", + "resource_type": "RD_DATA" +}) +def k_anonymity_job_s3(): + org_df = read_structured_from_s3() + anon_df, _ = apply_k_anonymity(org_df) + preview_dataframe(org_df) + write_df_to_s3(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "ANONYMISATION", + "resource_type": "RD_DATA" +}) +def l_diversity_job_s3(): + org_df = read_structured_from_s3() + anon_df, _ = apply_l_diversity(org_df) + preview_dataframe(org_df) + write_df_to_s3(anon_df) + preview_dataframe(anon_df) + + +@job(tags={ + "business_operation": "ANONYMISATION", + "resource_type": "RD_DATA" +}) +def t_closeness_job_s3(): + org_df = read_structured_from_s3() + anon_df, _ = apply_t_closeness(org_df) + preview_dataframe(org_df) + write_df_to_s3(anon_df) + preview_dataframe(anon_df) + +@job() +def read_write_semistructured_job_s3(): + semistruct_data = read_semistructured_from_s3() + write_semistructured_to_s3(semistruct_data) diff --git a/src/template_code_location/dataframe_level_anonymisation/ops.py b/src/template_code_location/dataframe_level_anonymisation/ops.py new file mode 100644 index 0000000..93682bf --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/ops.py @@ -0,0 +1,187 @@ +import json +from textwrap import dedent + +import pandas as pd +from anjana.anonymity import k_anonymity, l_diversity, t_closeness +from dagster import ( + DagsterInvalidInvocationError, + MarkdownMetadataValue, + Out, + Output, + get_dagster_logger, + op, +) +from pycanon import anonymity + +from template_code_location.dataframe_level_anonymisation.config_models import ( + KAnonymityConfiguration, + LDiversityConfiguration, + TClosenessConfiguration, +) +from template_code_location.dataframe_level_anonymisation.config_models.hierarchies import get_all_hierarchies + + +def _calc_dataframe_metrics(df_anon, df_org, quasi_identifiers, sensitive_atttributes): + # --- Metrics --- + # Anonymization metrics + k_anon = anonymity.k_anonymity(df_anon, quasi_identifiers) + l_div = anonymity.l_diversity(df_anon, quasi_identifiers, sensitive_atttributes, True) + t_clos = anonymity.t_closeness(df_anon, quasi_identifiers, sensitive_atttributes, True) + + # Data Utilization metrics + supression_rate = 1 - len(df_anon) / len(df_org) + grouped = df_anon.groupby(quasi_identifiers) + mean_equivalence_class_size = len(df_anon) / len(grouped) if len(grouped) else 0 + + # flake8: noqa + anon_report = dedent( + f""" + ### Anonymization & Data Utilization Metrics + + | Metric | Value | Description | + |--------|-------|-------------| + | **k-anonymity** | `k = {k_anon}` | Minimum number of records sharing the same quasi-identifier values. | + | **l-diversity** | `l = {l_div}` | Diversity of sensitive attributes within each equivalence class. | + | **t-closeness** | `t = {round(t_clos, 2)}` | Distance between sensitive attribute distribution in a group and the overall dataset. | + | **Suppression rate** | `{round(supression_rate, 2)}` | Fraction of records or attributes suppressed to meet privacy requirements. | + | **Mean equivalence class size** | `{round(mean_equivalence_class_size, 2)}` | Average size of equivalence classes for quasi-identifiers, indicates data grouping. | + """ + ) + # flake8: enable + metrics = { + "k_anon": k_anon, + "l_div": l_div, + "t_clos": t_clos, + "supp_rate": supression_rate, + "mean_equivalence_class": mean_equivalence_class_size, + } + return anon_report, metrics + + +def _validate_and_get_hierarchies(config, df: pd.DataFrame): + hierarchies = get_all_hierarchies() + + # Dataset smaller than k + if len(df) < config.k: + raise DagsterInvalidInvocationError( + f"Cannot apply k-anonymity: dataset has {len(df)} records, but k={config.k}" + ) + + # Missing or incomplete generalisation hierarchies + for qi in config.quasi_identifiers: + if qi not in config.generalisation_hierarchies or not config.generalisation_hierarchies[qi]: + raise DagsterInvalidInvocationError( + f"Generalisation hierarchy for quasi-identifier '{qi}' is missing or incomplete" + ) + if config.generalisation_hierarchies[qi] not in hierarchies: + raise DagsterInvalidInvocationError( + f"Generalisation hierarchy '{config.generalisation_hierarchies[qi]}' is missing in the code basis" + ) + + hier = { + qi: hierarchies[config.generalisation_hierarchies[qi]] for qi in config.quasi_identifiers + } + return hier + + +@op(out={"data": Out(), "metrics": Out()}) +def apply_k_anonymity(context, config: KAnonymityConfiguration, df: pd.DataFrame): + + hier = _validate_and_get_hierarchies(config, df) + + data_anon = k_anonymity( + df, config.ident, config.quasi_identifiers, config.k, config.supp_level, hier + ) + if "index" in data_anon.columns and "index" not in df.columns: + data_anon.drop(columns="index", inplace=True) + anon_report, metrics = _calc_dataframe_metrics( + data_anon, df, config.quasi_identifiers, config.sensitive_attributes + ) + yield Output( + value=data_anon, + metadata={ + "metric_report": MarkdownMetadataValue(anon_report), + "metric_json": json.dumps(metrics), + }, + output_name="data", + ) + yield Output(value=metrics, output_name="metrics") + + +@op(out={"data": Out(), "metrics": Out()}) +def apply_l_diversity(context, config: LDiversityConfiguration, df: pd.DataFrame): + + hier = _validate_and_get_hierarchies(config, df) + + data_anon = l_diversity( + df, + config.ident, + config.quasi_identifiers, + config.sensitive_attribute, + config.k, + config.l, + config.supp_level, + hier, + ) + if data_anon.empty: + raise DagsterInvalidInvocationError( + "Could not tranform the data to l-diversity, empty dataset returned!" + ) + anon_report, metrics = _calc_dataframe_metrics( + data_anon, df, config.quasi_identifiers, [config.sensitive_attribute] + ) + yield Output( + value=data_anon, + metadata={ + "metric_report": MarkdownMetadataValue(anon_report), + "metric_json": json.dumps(metrics), + }, + output_name="data", + ) + yield Output(value=metrics, output_name="metrics") + + +@op(out={"data": Out(), "metrics": Out()}) +def apply_t_closeness(context, config: TClosenessConfiguration, df: pd.DataFrame): + + hier = _validate_and_get_hierarchies(config, df) + + try: + data_anon = t_closeness( + df, + config.ident, + config.quasi_identifiers, + config.sensitive_attribute, + config.k, + config.t, + config.supp_level, + hier, + ) + except ValueError as e: + if "Cannot be quasi-identifiers" in str(e): + raise DagsterInvalidInvocationError( + f"T-closeness failed: k-anonymity parameter = {config.k} is too small " + f"for existing hierarchies of {config.quasi_identifiers} in inner k-anonymity call." + ) + else: + # Re-raise other ValueError types with context + raise DagsterInvalidInvocationError(f"T-closeness failed with error: {str(e)}") + + if data_anon.empty: + raise DagsterInvalidInvocationError( + f"Could not transform the data to t-closeness, empty dataset returned! " + f"This may indicate that the t-closeness constraint (t={config.t}) is too strict for the given data." + ) + + anon_report, metrics = _calc_dataframe_metrics( + data_anon, df, config.quasi_identifiers, [config.sensitive_attribute] + ) + yield Output( + value=data_anon, + metadata={ + "metric_report": MarkdownMetadataValue(anon_report), + "metric_json": json.dumps(metrics), + }, + output_name="data", + ) + yield Output(value=metrics, output_name="metrics") diff --git a/src/template_code_location/dataframe_level_anonymisation/utils.py b/src/template_code_location/dataframe_level_anonymisation/utils.py new file mode 100644 index 0000000..c233c4e --- /dev/null +++ b/src/template_code_location/dataframe_level_anonymisation/utils.py @@ -0,0 +1,19 @@ +import numpy as np + + +def parse_value_list(values): + return [int(v) if isinstance(v, str) and v.isdigit() else v for v in values] + + +# Hierarchy normalization for Anjana +def normalize_hierarchy_levels(hierarchy_dict): + normalized = {} + for column, levels in hierarchy_dict.items(): + normalized[column] = {} + for level_str, mapping_list in levels.items(): + level = int(level_str) + if level == 0: + normalized[column][level] = np.array(parse_value_list(mapping_list)) + else: + normalized[column][level] = mapping_list + return normalized diff --git a/src/template_code_location/field_level_pseudo_anonymisation/__init__.py b/src/template_code_location/field_level_pseudo_anonymisation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/template_code_location/field_level_pseudo_anonymisation/config_models/__init__.py b/src/template_code_location/field_level_pseudo_anonymisation/config_models/__init__.py new file mode 100644 index 0000000..60944be --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/config_models/__init__.py @@ -0,0 +1,28 @@ +from .structured_config import ( # noqa: F401 + HashConfig, + EncryptConfig, + RedactConfig, + ReplaceConfig, + PseudoTechniqueConfig, + AnonymisePseudonymizeStructuredConfig, + DecryptConfig, + DepseudoTechniqueConfig, + DepseudonymizeStructuredConfig, +) + +from .unstructured_config import ( # noqa: F401, F811 + HashConfig, + EncryptConfig, + RedactConfig, + ReplaceConfig, + RetainConfig, + PseudoTechniqueConfig, + AnonymisePseudonymizeUnstructuredConfig, + DecryptConfig, + DepseudoTechniqueConfig, + DepseudonymizeUnstructuredConfig, +) + +from .languages import SupportedLanguages, LanguageEnum # noqa: F401 + +from .pii_entities import PIIEntityEnum, PII_MAPPING # noqa: F401 diff --git a/src/template_code_location/field_level_pseudo_anonymisation/config_models/languages.py b/src/template_code_location/field_level_pseudo_anonymisation/config_models/languages.py new file mode 100644 index 0000000..e3ba89e --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/config_models/languages.py @@ -0,0 +1,72 @@ +from enum import Enum +from typing import ClassVar + + +class SupportedLanguages: + LANGUAGES: ClassVar[dict[str, str]] = { + "hr": "hr_HR", # Croatian + "da": "da_DK", # Danish + "nl": "nl_NL", # Dutch + "en": "en_US", # English + "fi": "fi_FI", # Finnish + "fr": "fr_FR", # French + "de": "de_DE", # German + "el": "el_GR", # Greek + "it": "it_IT", # Italian + "lt": "lt_LT", # Lithuanian + "pl": "pl_PL", # Polish + "pt": "pt_PT", # Portuguese + "ro": "ro_RO", # Romanian + "sl": "sl_SI", # Slovenian + "es": "es_ES", # Spanish + "sv": "sv_SE", # Swedish + } + LANGUAGE_MODELS = { + "en": "en_core_web_sm", + "it": "it_core_news_sm", + "de": "de_core_news_sm", + "fr": "fr_core_news_sm", + "es": "es_core_news_sm", + "nl": "nl_core_news_sm", + "da": "da_core_news_sm", + "sv": "sv_core_news_sm", + "fi": "fi_core_news_sm", + "pl": "pl_core_news_sm", + "el": "el_core_news_sm", + "hr": "hr_core_news_sm", + "lt": "lt_core_news_sm", + "pt": "pt_core_news_sm", + "ro": "ro_core_news_sm", + "sl": "sl_core_news_sm", + } + + @classmethod + def codes(cls) -> list[str]: + return list(cls.LANGUAGES.keys()) + + @classmethod + def get_locale(cls, code: str) -> str: + return cls.LANGUAGES[code] + + @classmethod + def get_language_model(cls, code: str) -> str: + return cls.LANGUAGE_MODELS[code] + + +class LanguageEnum(str, Enum): + hr = "hr" + da = "da" + nl = "nl" + en = "en" + fi = "fi" + fr = "fr" + de = "de" + el = "el" + it = "it" + lt = "lt" + pl = "pl" + pt = "pt" + ro = "ro" + sl = "sl" + es = "es" + sv = "sv" diff --git a/src/template_code_location/field_level_pseudo_anonymisation/config_models/pii_entities.py b/src/template_code_location/field_level_pseudo_anonymisation/config_models/pii_entities.py new file mode 100644 index 0000000..e730b6d --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/config_models/pii_entities.py @@ -0,0 +1,24 @@ +from enum import Enum + + +class PIIEntityEnum(str, Enum): + PERSON = "Person" + EMAIL = "Email" + CREDIT_CARD = "Credit card" + DATE_OF_BIRTH = "Date of birth" + URL = "URLs" + PHONE_NUMBERS = "Phone numbers" + CREDENTIALS = "Credentials" + X_SOCIAL = "X (formally known as Twitter) username" + + +PII_MAPPING: dict[PIIEntityEnum, str] = { + PIIEntityEnum.PERSON: "NameFilth", + PIIEntityEnum.EMAIL: "EmailFilth", + PIIEntityEnum.CREDIT_CARD: "CreditCardFilth", + PIIEntityEnum.DATE_OF_BIRTH: "DateOfBirthFilth", + PIIEntityEnum.URL: "UrlFilth", + PIIEntityEnum.PHONE_NUMBERS: "PhoneFilth", + PIIEntityEnum.CREDENTIALS: "CredentialFilth", + PIIEntityEnum.X_SOCIAL: "TwitterFilth", +} diff --git a/src/template_code_location/field_level_pseudo_anonymisation/config_models/structured_config.py b/src/template_code_location/field_level_pseudo_anonymisation/config_models/structured_config.py new file mode 100644 index 0000000..af8abf6 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/config_models/structured_config.py @@ -0,0 +1,110 @@ +from typing import List, Literal, Optional, Union + +from dagster import Config +from pydantic import Field as PydanticField, model_validator, field_validator + + +class HashConfig(Config): + type: Literal["hash"] = "hash" + columns: List[str] = PydanticField(default=["example_column"], description="Columns to hash") + algorithm: str = PydanticField(default="sha256", description="Hashing algorithm") + +class EncryptConfig(Config): + type: Literal["encrypt"] = "encrypt" + columns: List[str] = PydanticField(default=["example_column"], description="Columns to encrypt") + key_name: str = PydanticField(default="my_key", description="Key identifier used for encryption") + +class RedactConfig(Config): + type: Literal["redact"] = "redact" + columns: List[str] = PydanticField(default=["example_column"], description="Columns to redact") + +class ReplaceConfig(Config): + type: Literal["replace"] = "replace" + columns: List[str] = PydanticField(default=["example_column"], description="Columns to replace") + new_value: str = PydanticField(default="REPLACED", description="Replacement value") + +class PseudoTechniqueConfig(Config): + technique: Union[HashConfig, EncryptConfig, RedactConfig, ReplaceConfig] = PydanticField( + default={"hash": HashConfig().model_dump(exclude={"type"})}, + discriminator="type" + ) + + +class AnonymisePseudonymizeStructuredConfig(Config): + used_function: List[PseudoTechniqueConfig] = PydanticField( + default=[{"technique": {"hash": HashConfig().model_dump(exclude={"type"})}}], + description=("List of functions to be used on column"), + ) + + @model_validator(mode="after") + def ensure_unique_columns(self): + column_to_techniques = self._collect_column_to_techniques() + duplicates = { + col: techs for col, techs in column_to_techniques.items() if len(techs) > 1 + } + + if duplicates: + formatted = "; ".join( + f"{col} -> {', '.join(techs)}" for col, techs in duplicates.items() + ) + raise ValueError(f"Duplicate column(s) across techniques not allowed:\n{formatted}") + + return self + + def _collect_column_to_techniques(self): + """Extract column-to-techniques mapping from used_function list.""" + column_to_techniques = {} + for f in self.used_function: + technique_type, cols = self._extract_technique_and_columns(f) + for col in cols: + column_to_techniques.setdefault(col, []).append(technique_type) + return column_to_techniques + + def _extract_technique_and_columns(self, item): + """Extract technique type and columns list from a PseudoTechniqueConfig item (dict or model instance).""" + if isinstance(item, dict): + tech = item.get("technique") or {} + if isinstance(tech, dict): + if "type" in tech: + return tech.get("type"), tech.get("columns") or [] + elif len(tech) == 1: + # variant-key mapping: {'hash': {...}} + technique_type, inner = next(iter(tech.items())) + return technique_type, inner.get("columns") or [] + return None, [] + else: + # item is a PseudoTechniqueConfig instance + technique_type = item.technique.type + cols = getattr(item.technique, "columns", []) + return technique_type, cols + +class DecryptConfig(Config): + type: Literal["decrypt"] = "decrypt" + columns: List[str] = PydanticField(default=["example_column"], description="Columns to decrypt") + key_name: str = PydanticField(default="my_key", description="Key identifier used for decryption") + +class DepseudoTechniqueConfig(Config): + technique: DecryptConfig = PydanticField(default={"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})}) + + +class DepseudonymizeStructuredConfig(Config): + used_function: List[DepseudoTechniqueConfig] = PydanticField( + default=[{"technique": {"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})}}], + description=("Decryption functions to be used on column"), + ) + + @field_validator("used_function", mode="before") + def _normalize_depseudo_used_function(cls, v): + normalized = [] + for item in v: + if isinstance(item, dict): + normalized.append(DepseudoTechniqueConfig.model_validate(item)) + else: + normalized.append(item) + return normalized + + @model_validator(mode="after") + def ensure_unique_columns(self): + # For depseudonymize, we don't have per-column uniqueness constraints, + # but keep a no-op validator to preserve API parity. + return self diff --git a/src/template_code_location/field_level_pseudo_anonymisation/config_models/unstructured_config.py b/src/template_code_location/field_level_pseudo_anonymisation/config_models/unstructured_config.py new file mode 100644 index 0000000..abea0b0 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/config_models/unstructured_config.py @@ -0,0 +1,115 @@ +from typing import List, Literal, Optional, Union + +from dagster import Config +from pydantic import Field as PydanticField, model_validator, field_validator +from .languages import LanguageEnum +from .pii_entities import PIIEntityEnum + + +class HashConfig(Config): + type: Literal["hash"] = "hash" + pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to hash") + algorithm: str = PydanticField(default="sha256", description="Hashing algorithm") + +class EncryptConfig(Config): + type: Literal["encrypt"] = "encrypt" + pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to encrypt") + key_name: str = PydanticField(default="my_key", description="Key identifier used for encryption") + + +class RedactConfig(Config): + type: Literal["redact"] = "redact" + pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to redact") + +class ReplaceConfig(Config): + type: Literal["replace"] = "replace" + pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to replace") + new_value: str = PydanticField(default="REPLACED", description="Replacement value") + +class RetainConfig(Config): + type: Literal["retain"] = "retain" + pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to retain") + +class PseudoTechniqueConfig(Config): + technique: Union[HashConfig, EncryptConfig, RedactConfig, ReplaceConfig, RetainConfig] = PydanticField( + default={"hash": HashConfig().model_dump(exclude={"type"})}, + discriminator="type" + ) + +class AnonymisePseudonymizeUnstructuredConfig(Config): + language: LanguageEnum = PydanticField( + default=LanguageEnum.en, + description="Language code (must be one of: hr, da, nl, en, fi, fr, de, el, it, lt, pl, pt, ro, sl, es, sv)" + + ) + used_function: List[PseudoTechniqueConfig] = PydanticField( + default=[{"technique": {"hash": HashConfig().model_dump(exclude={"type"})}}], + description=("List of functions to be used on PIIs"), + ) + + @field_validator("used_function", mode="before") + def _normalize_used_function(cls, v): + normalized = [] + for item in v: + if isinstance(item, dict): + normalized.append(PseudoTechniqueConfig.model_validate(item)) + else: + normalized.append(item) + return normalized + + @model_validator(mode="after") + def ensure_unique_pii(self): + pii_to_techniques = self._collect_pii_to_techniques() + duplicates = { + pii: techs for pii, techs in pii_to_techniques.items() if len(techs) > 1 + } + + if duplicates: + formatted = "; ".join( + f"{pii} -> {', '.join(techs)}" for pii, techs in duplicates.items() + ) + raise ValueError(f"Duplicate PII(s) across techniques not allowed:\n{formatted}") + + return self + + def _collect_pii_to_techniques(self): + """Extract PII-to-techniques mapping from used_function list.""" + pii_to_techniques = {} + for f in self.used_function: + technique_type, piis = self._extract_technique_and_pii(f) + for pii in piis: + pii_to_techniques.setdefault(pii, []).append(technique_type) + return pii_to_techniques + + def _extract_technique_and_pii(self, item): + """Extract technique type and PII list from a PseudoTechniqueConfig item (dict or model instance).""" + if isinstance(item, dict): + tech = item.get("technique") or {} + if isinstance(tech, dict): + if "type" in tech: + return tech.get("type"), tech.get("pii") or tech.get("columns") or [] + elif len(tech) == 1: + # variant-key mapping: {'hash': {...}} + technique_type, inner = next(iter(tech.items())) + return technique_type, inner.get("pii") or inner.get("columns") or [] + return None, [] + else: + # item is a PseudoTechniqueConfig instance + technique_type = item.technique.type + piis = getattr(item.technique, "pii", []) or getattr(item.technique, "columns", []) + return technique_type, piis + +class DecryptConfig(Config): + type: Literal["decrypt"] = "decrypt" + key_name: str = PydanticField(default="my_key", description="Key identifier used for decryption") + +class DepseudoTechniqueConfig(Config): + technique: DecryptConfig = PydanticField( + default={"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})}, + ) + +class DepseudonymizeUnstructuredConfig(Config): + used_function: List[DepseudoTechniqueConfig] = PydanticField( + default=[{"technique": {"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})}}], + description=("Decryption function"), + ) diff --git a/src/template_code_location/field_level_pseudo_anonymisation/jobs.py b/src/template_code_location/field_level_pseudo_anonymisation/jobs.py new file mode 100644 index 0000000..56baf11 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/jobs.py @@ -0,0 +1,126 @@ +from dagster import job +from util_services.util_ops import ( + preview_dataframe, + read_structured_to_df, + write_df_to_local, + write_string_to_txt, + read_txt_to_string, + preview_txt, + read_structured_from_s3, + write_df_to_s3, + read_txt_from_s3, + write_text_to_s3, +) +from .ops import ( + anonymize_pseudonymize_structured, + depseudonymize_structured, +) +from .unstructured_ops import ( + anonymize_pseudonymize_unstructured, + depseudonymize_unstructured, +) + +@job(tags={ + "business_operation": "ANONYMISATION_PSEUDONYMISATION" +}) +def anonymize_pseudonymize_structured_job(): + df = read_structured_to_df() + preview_dataframe(df) + df_anon, metrics = anonymize_pseudonymize_structured(df) + preview_dataframe(df_anon) + write_df_to_local(df_anon) + + +@job(tags={ + "business_operation": "ANONYMISATION_PSEUDONYMISATION", + "resource_type": "RD_DATA" +}) +def anonymize_pseudonymize_structured_job_s3(): + df = read_structured_from_s3() + preview_dataframe(df) + df_anon, metrics = anonymize_pseudonymize_structured(df) + preview_dataframe(df_anon) + write_df_to_s3(df_anon) + + +@job(tags={ + "business_operation": "DEPSEUDONYMISATION" +}) +def depseudonymize_structured_job(): + df = read_structured_to_df() + preview_dataframe(df) + df_anon, metrics = depseudonymize_structured(df) + preview_dataframe(df_anon) + write_df_to_local(df_anon) + + +@job(tags={ + "business_operation": "DEPSEUDONYMISATION", + "resource_type": "RD_DATA" +}) +def depseudonymize_structured_job_s3(): + df = read_structured_from_s3() + preview_dataframe(df) + df_anon, metrics = depseudonymize_structured(df) + preview_dataframe(df_anon) + write_df_to_s3(df_anon) + + +@job(tags={ + "business_operation": "ANONYMISATION_PSEUDONYMISATION" +}) +def anonymize_pseudonymize_depseudonymize_structured_job(): + df = read_structured_to_df() + preview_dataframe(df) + df_pseduo, metrics = anonymize_pseudonymize_structured(df) + preview_dataframe(df_pseduo) + df_depseduo, metrics = depseudonymize_structured(df_pseduo) + preview_dataframe(df_depseduo) + + +@job(tags={ + "business_operation": "ANONYMISATION_PSEUDONYMISATION" +}) +def anonymize_pseudonymize_unstructured_job(): + text = read_txt_to_string() + preview_txt(text) + text_anon, metrics = anonymize_pseudonymize_unstructured(text) + preview_txt(text_anon) + preview_txt(metrics) + write_string_to_txt(text_anon) + + +@job(tags={ + "business_operation": "ANONYMISATION_PSEUDONYMISATION", + "resource_type": "RD_DATA" +}) +def anonymize_pseudonymize_unstructured_job_s3(): + text = read_txt_from_s3() + preview_txt(text) + text_anon, metrics = anonymize_pseudonymize_unstructured(text) + preview_txt(text_anon) + preview_txt(metrics) + write_text_to_s3(text_anon) + + +@job(tags={ + "business_operation": "DEPSEUDONYMISATION" +}) +def depseudonymize_unstructured_job(): + text = read_txt_to_string() + preview_txt(text) + text_anon, metrics = depseudonymize_unstructured(text) + preview_txt(text_anon) + write_string_to_txt(text_anon) + + +@job(tags={ + "business_operation": "DEPSEUDONYMISATION", + "resource_type": "RD_DATA" +}) +def depseudonymize_unstructured_job_s3(): + text = read_txt_from_s3() + preview_txt(text) + text_anon, metrics = depseudonymize_unstructured(text) + preview_txt(text_anon) + write_text_to_s3(text_anon) diff --git a/src/template_code_location/field_level_pseudo_anonymisation/ops.py b/src/template_code_location/field_level_pseudo_anonymisation/ops.py new file mode 100644 index 0000000..a485ff9 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/ops.py @@ -0,0 +1,77 @@ +import pandas as pd +import numpy as np +from dagster import Out, Output, op +from cryptography.fernet import InvalidToken +from template_code_location.field_level_pseudo_anonymisation.config_models import ( + AnonymisePseudonymizeStructuredConfig, + DepseudonymizeStructuredConfig, +) +from template_code_location.field_level_pseudo_anonymisation.techniques import ( + anonymisation_pseudonymisation_techniques as anon_pseudo_funcs, +) +import template_code_location.field_level_pseudo_anonymisation.techniques.depseudonymisation_techniques as depseudo_funcs +from .utils import create_get_encryption_key + + +def _apply_column_wise_function(config, df, funcs): + for used_function in config.used_function: + func_name = used_function.technique.type + columns = used_function.technique.columns + func = getattr(funcs, func_name) + params = used_function.technique.model_dump() + del params["type"] + del params["columns"] + + if func_name in ["encrypt", "decrypt"]: + key_name = used_function.technique.key_name + del params["key_name"] + params["key"] = create_get_encryption_key(func_name, key_name) + + missing = [col for col in columns if col not in df.columns] + if missing: + raise ValueError( + f"The following columns required by technique '{func_name}' " + f"are not present in the DataFrame: {', '.join(missing)}" + ) + + # Skip processing if DataFrame is empty + if len(df) == 0: + continue + + for column in columns: + try: + vectorized_func = np.vectorize(lambda x: func(x, **params)) + df[column] = vectorized_func(df[column].to_numpy()) + except InvalidToken: + raise ValueError( + f"Invalid Fernet token while decrypting column '{column}' " + f"using key '{key_name}'. The data may not be encrypted " + f"or the key may be incorrect. " + ) + return df + + +@op(out={"data": Out(), "metrics": Out()}) +def anonymize_pseudonymize_structured( + context, config: AnonymisePseudonymizeStructuredConfig, df: pd.DataFrame +): + + df = _apply_column_wise_function(config, df, anon_pseudo_funcs) + yield Output( + value=df, + metadata={}, + output_name="data", + ) + yield Output(value={}, output_name="metrics") + + +@op(out={"data": Out(), "metrics": Out()}) +def depseudonymize_structured(context, config: DepseudonymizeStructuredConfig, df: pd.DataFrame): + + df = _apply_column_wise_function(config, df, depseudo_funcs) + yield Output( + value=df, + metadata={}, + output_name="data", + ) + yield Output(value={}, output_name="metrics") diff --git a/src/template_code_location/field_level_pseudo_anonymisation/techniques/__init__.py b/src/template_code_location/field_level_pseudo_anonymisation/techniques/__init__.py new file mode 100644 index 0000000..128c371 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/techniques/__init__.py @@ -0,0 +1,3 @@ +from .anonymisation_pseudonymisation_techniques import hash, redact, replace, encrypt # noqa: F401 + +from .depseudonymisation_techniques import decrypt # noqa: F401 diff --git a/src/template_code_location/field_level_pseudo_anonymisation/techniques/anonymisation_pseudonymisation_techniques.py b/src/template_code_location/field_level_pseudo_anonymisation/techniques/anonymisation_pseudonymisation_techniques.py new file mode 100644 index 0000000..ce15613 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/techniques/anonymisation_pseudonymisation_techniques.py @@ -0,0 +1,42 @@ +import hashlib +from cryptography.fernet import Fernet + + +def hash(value: str, algorithm: str = "sha256") -> str: + """ + Hash the value using the specified algorithm (default: SHA-256). + """ + value = str(value) + hash_func = hashlib.new(algorithm) + hash_func.update(value.encode("utf-8")) + return hash_func.hexdigest() + + +def redact(value: str) -> str: + """ + Redact the column and return an empty string + """ + return "" + + +def replace(value: str, new_value) -> str: + """ + Replace the value column with the provided value + """ + return new_value + + +def encrypt(value: str, key: bytes) -> str: + """ + Encrypt the value using the provided Fernet key. + """ + value = str(value) + f = Fernet(key) + return f.encrypt(value.encode()).decode() + + +def retain(value: str) -> str: + """ + Retain the original value without any changes. + """ + return value diff --git a/src/template_code_location/field_level_pseudo_anonymisation/techniques/depseudonymisation_techniques.py b/src/template_code_location/field_level_pseudo_anonymisation/techniques/depseudonymisation_techniques.py new file mode 100644 index 0000000..4e0937c --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/techniques/depseudonymisation_techniques.py @@ -0,0 +1,9 @@ +from cryptography.fernet import Fernet + + +def decrypt(value: str, key: bytes) -> str: + """ + Decrypt a string using the provided Fernet key. + """ + f = Fernet(key) + return f.decrypt(value.encode()).decode() diff --git a/src/template_code_location/field_level_pseudo_anonymisation/unstructured_ops.py b/src/template_code_location/field_level_pseudo_anonymisation/unstructured_ops.py new file mode 100644 index 0000000..f8f0ffe --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/unstructured_ops.py @@ -0,0 +1,428 @@ +import importlib +import importlib.abc +import importlib.machinery +import re +import sys +import types + + +# --------------------------------------------------------------------------- +# Stub out the `transformers` and `spacy_transformers` packages before any +# other import triggers spaCy's entry-point scan or scrubadub_spacy's runtime +# import of spacy_transformers.pipeline_component. +# --------------------------------------------------------------------------- +_STUB_PACKAGES = ("transformers", "spacy_transformers") + + +class _StubModule(types.ModuleType): + """Module that returns a dummy class for any attribute access.""" + + def __getattr__(self, name: str): + return type(name, (), {}) + + +class _StubFinder(importlib.abc.MetaPathFinder): + """Intercept any import under the stubbed packages and return a stub module.""" + + def find_spec(self, fullname, path=None, target=None): # noqa: ANN001 + for pkg in _STUB_PACKAGES: + if fullname == pkg or fullname.startswith(pkg + "."): + return importlib.machinery.ModuleSpec(fullname, _StubLoader()) + return None + + +class _StubLoader(importlib.abc.Loader): + def create_module(self, spec): # noqa: ANN001 + mod = _StubModule(spec.name) + mod.__path__ = [] # mark as package + mod.__spec__ = spec + return mod + + def exec_module(self, module): # noqa: ANN001 + pass + + +# Install the finder once, before scrubadub / spacy are imported. +if not any(isinstance(f, _StubFinder) for f in sys.meta_path): + sys.meta_path.insert(0, _StubFinder()) +# --------------------------------------------------------------------------- + + +import scrubadub # noqa: E402 +import scrubadub_spacy # noqa: E402 +from cryptography.fernet import InvalidToken # noqa: E402 +from dagster import Out, Output, get_dagster_logger, op # noqa: E402 +from scrubadub.detectors import RegexDetector # noqa: E402 +from scrubadub.filth import CredentialFilth, NameFilth # noqa: E402 + +from template_code_location.field_level_pseudo_anonymisation.techniques import ( + anonymisation_pseudonymisation_techniques as anon_pseudo_funcs, +) +from template_code_location.field_level_pseudo_anonymisation.techniques import ( + depseudonymisation_techniques as depseudo_funcs, +) + +from .config_models import ( + PII_MAPPING, + AnonymisePseudonymizeUnstructuredConfig, + DepseudonymizeUnstructuredConfig, + PIIEntityEnum, + PseudoTechniqueConfig, + SupportedLanguages, +) +from .utils import create_get_encryption_key + + +def _initialize_scrubber(language: str) -> scrubadub.Scrubber: + class SIMPLCredentialDetector(RegexDetector): + """ + Remove username/password combinations from dirty ``text``. + """ + + filth_cls = CredentialFilth + name = "credential" + autoload = True + + regex = re.compile( + r""" + (?:username|login|u:)\s*(?::\s*)? + (?P[\w.\-@+]+) + [\s\S]{0,500}? + (?:password|pw|p:)\s*(?::\s*)? + (?P[^\s]+) + """, + re.MULTILINE | re.VERBOSE | re.IGNORECASE, + ) + + locale = SupportedLanguages.get_locale(language) + scrubber = scrubadub.Scrubber(locale=locale) + + model_name = SupportedLanguages.get_language_model(language) + spacy_detector = scrubadub_spacy.detectors.SpacyEntityDetector(model=model_name) + spacy_detector.named_entities = { + "PERSON", + "PER", + "ORG", + "persName", + "PRS", + } # Need to set it after the constructor because scrubadub_spacy uses upper on all entries + spacy_detector.filth_cls_map["persName"] = NameFilth # Required because PL uses persName + spacy_detector.filth_cls_map["PRS"] = NameFilth # Required for swedish that uses PRS + scrubber.add_detector(spacy_detector) + if language in ["en", "de"]: + scrubber.add_detector( + scrubadub.detectors.DateOfBirthDetector + ) # add optional data of birth detector + scrubber.remove_detector( + scrubadub.detectors.CredentialDetector + ) # remove the not so great credentials detector and replace with custom SIMPL one + scrubber.add_detector(SIMPLCredentialDetector()) + return scrubber + + +def _map_filth_to_pii_enum(filth) -> PIIEntityEnum | None: + cls_name = filth.__class__.__name__ + for pii_enum, filth_name in PII_MAPPING.items(): + if filth_name == cls_name: + return pii_enum + return None + + +def _get_metrics(metrics_dict: dict, language: str) -> str: + # Format metrics as Markdown table + metrics_report = f""" +## PII Anonymization Report + +### Summary +- **Total PII Detected**: {metrics_dict['total_pii_detected']} +- **Original Length**: {metrics_dict['text_length_original']} chars +- **Anonymized Length**: {metrics_dict['text_length_anonymised']} chars +- **Language**: {language} + +### PII by Type +| Entity Type | Count | +|-------------|-------| +""" + for pii_type, count in metrics_dict["pii_by_type"].items(): + metrics_report += f"| {pii_type} | {count} |\n" + + metrics_report += "\n### Techniques Applied\n" + for pii, technique in metrics_dict["techniques_applied"].items(): + metrics_report += f"- **{pii}**: {technique}\n" + + return metrics_report + + +def _build_metrics_dict( + pii_counts: dict[str, int], + text: str, + anon_text: str, + technique_map: dict[PIIEntityEnum, PseudoTechniqueConfig], +) -> dict: + metrics_dict = { + "total_pii_detected": sum(pii_counts.values()), + "pii_by_type": pii_counts, + "text_length_original": len(text), + "text_length_anonymised": len(anon_text), + "techniques_applied": { + pii.name: technique_map[pii].technique.type for pii in technique_map.keys() + }, + } + + return metrics_dict + + +@op(out={"data": Out(), "metrics": Out()}) +def anonymize_pseudonymize_unstructured( + context, config: AnonymisePseudonymizeUnstructuredConfig, text: str +): + logger = get_dagster_logger() + + if text is None or not text.strip(): + raise ValueError("Input text cannot be None or empty") + + logger.debug( + f"Starting unstructured PII anonymization | lang={config.language.value} " + f"| input_chars={len(text)}" + ) + + # --- Filth detection --- + try: + scrubber = _initialize_scrubber(config.language.value) + filths = list(scrubber.iter_filth(text)) + logger.info(f"Detected {len(filths)} potential PII entities before filtering.") + except Exception as e: + logger.error(f"Scrubber initialization/detection failed | lang={config.language.value}") + raise RuntimeError(f"PII detection failed for language '{config.language.value}'") from e + + # --- Build technique routing map --- + technique_map = _build_technique_map(config) + logger.debug( + "Technique map constructed: " + + ", ".join(f"{pii.name}->{cfg.technique.type}" for pii, cfg in technique_map.items()) + ) + + replacements = [] + key_cache = {} + pii_counts = {} + + # --- Process filths --- + for idx, filth in enumerate(filths, start=1): + pii_enum = _map_filth_to_pii_enum(filth) + + if pii_enum is None: + logger.debug(f"[{idx}] Skipping unknown filth class={filth.__class__.__name__}") + continue + + start_idx, end_idx = _extract_span(filth, logger, idx) + if start_idx is None: + continue + + original_value = text[start_idx:end_idx] + technique_cfg = technique_map.get(pii_enum) + + # No technique configured + if technique_cfg is None: + _handle_missing_technique( + pii_enum, + start_idx, + end_idx, + text, + pii_counts, + replacements, + logger, + idx, + ) + continue + + # Apply configured technique + t = technique_cfg.technique + params = _prepare_params(t, key_cache, idx, logger) + replacement = _apply_technique(original_value, t.type, params, pii_enum, idx, logger) + + replacements.append((start_idx, end_idx, replacement)) + pii_counts[pii_enum.name] = pii_counts.get(pii_enum.name, 0) + 1 + + # --- Apply replacements --- + anon_text = _apply_replacements(text, replacements, logger) + + logger.info(f"Anonymisation completed, total PII counts: {pii_counts}") + + metrics_report = _get_metrics( + _build_metrics_dict(pii_counts, text, anon_text, technique_map), + config.language.value, + ) + + yield Output(value=anon_text, output_name="data") + yield Output(value=metrics_report, output_name="metrics") + + +@op(out={"data": Out(), "metrics": Out()}) +def depseudonymize_unstructured(context, config: DepseudonymizeUnstructuredConfig, input_text: str): + + input_restored, metrics = _apply_depseudonimisation_function(config, input_text, depseudo_funcs) + yield Output( + value=input_restored, + metadata={}, + output_name="data", + ) + yield Output(value=metrics, output_name="metrics") + + +def _apply_depseudonimisation_function(config, input_text: str, funcs_module): + """ + Searches and depseudonymizes text segments formatted as: + {technique:pseudonymized_value} + """ + + total_depseudo_count = 0 + depseudonimized_text = input_text # Initialize with input text + + # Loop through each depseudonymisation technique defined in the config + for used_function in config.used_function: + func_name = used_function.technique.type + func = getattr(funcs_module, func_name) + pseudo_anon_func = "" + + # Prepare parameters + params = used_function.technique.model_dump() + del params["type"] + + if func_name == "decrypt": + key_name = used_function.technique.key_name + del params["key_name"] + pseudo_anon_func = "encrypt" + params["key"] = create_get_encryption_key(func_name, key_name) + + # Regex pattern for this technique, e.g. {encrypt:...} + pattern = rf"\{{{pseudo_anon_func}:([^}}]+)\}}" + + def replace_match(match): + nonlocal total_depseudo_count + pseudovalue = match.group(1) + total_depseudo_count += 1 + try: + return func(pseudovalue, **params) + except InvalidToken: + raise ValueError( + f"Invalid Fernet token while decrypting value using key '{key_name}'. " + f"The data may not be encrypted or the key may be incorrect." + ) + except Exception as e: + raise RuntimeError(f"Error during depseudonymisation with '{func_name}': {e}") + + # Apply replacements for this technique + depseudonimized_text = re.sub(pattern, replace_match, depseudonimized_text) + + yield depseudonimized_text + yield {"total_depseudo_count": total_depseudo_count} + + +def _build_technique_map(config): + technique_map = {} + for func_cfg in config.used_function: + for pii in func_cfg.technique.pii: + technique_map[pii] = func_cfg + return technique_map + + +def _extract_span(filth, logger, idx): + start_idx = getattr(filth, "beg", getattr(filth, "start", None)) + end_idx = getattr(filth, "end", None) + if start_idx is None or end_idx is None: + logger.debug(f"[{idx}] Filth missing span attributes; skipping.") + return None, None + return start_idx, end_idx + + +def _handle_missing_technique( + pii_enum, start_idx, end_idx, text, pii_counts, replacements, logger, idx +): + original_value = text[start_idx:end_idx] + logger.debug( + f"[{idx}] PII={pii_enum.name} span=({start_idx},{end_idx}) value={original_value} " + f"- No technique configured, using placeholder" + ) + placeholder = f"{{{{{pii_enum.name}}}}}" + replacements.append((start_idx, end_idx, placeholder)) + pii_counts[pii_enum.name] = pii_counts.get(pii_enum.name, 0) + 1 + + +def _prepare_params(t, key_cache, idx, logger): + params = t.model_dump() + del params["type"] + del params["pii"] + + if t.type == "encrypt": + try: + if t.key_name not in key_cache: + logger.debug( + f"[{idx}] Retrieving/generating Vault key name={t.key_name} for encryption" + ) + key_cache[t.key_name] = create_get_encryption_key("encrypt", t.key_name) + params["key"] = key_cache[t.key_name] + del params["key_name"] + logger.debug(f"[{idx}] Encryption key prepared") + except Exception as e: + raise RuntimeError( + f"Encryption key retrieval failed for key '{t.key_name}': {type(e).__name__}" + ) from e + + return params + + +def _apply_technique(original_value, t_type, params, pii_enum, idx, logger): + try: + func = getattr(anon_pseudo_funcs, t_type) + replacement = func(original_value, **params) + + if t_type == "encrypt": + replacement = f"{{encrypt:{replacement}}}" + + logger.debug(f"[{idx}] {t_type.capitalize()} complete") + return replacement + + except AttributeError: + logger.warning(f"[{idx}] Technique '{t_type}' not recognized; inserting placeholder.") + return f"{{UNIMPL_{t_type}_{pii_enum.name}}}" + + except Exception as e: + raise RuntimeError( + f"Technique '{t_type}' failed for PII type '{pii_enum.name}': {type(e).__name__}" + ) from e + + +def _apply_replacements(text, replacements, logger): + if not replacements: + logger.info("No PII detected; returning original text.") + return text + + logger.debug(f"Applying {len(replacements)} replacements to text body.") + replacements.sort(key=lambda r: r[0]) + + # Detect overlaps + for i in range(len(replacements) - 1): + if replacements[i][1] > replacements[i + 1][0]: + logger.warning( + f"Overlapping PII detected at positions " + f"({replacements[i][0]},{replacements[i][1]}) " + f"and ({replacements[i+1][0]},{replacements[i+1][1]}). " + f"Using first match." + ) + replacements[i + 1] = ( + replacements[i][1], + replacements[i + 1][1], + replacements[i + 1][2], + ) + + result_parts = [] + last = 0 + for start, end, repl in replacements: + if start < last: + continue + result_parts.append(text[last:start]) + result_parts.append(repl) + last = end + + result_parts.append(text[last:]) + return "".join(result_parts) diff --git a/src/template_code_location/field_level_pseudo_anonymisation/utils.py b/src/template_code_location/field_level_pseudo_anonymisation/utils.py new file mode 100644 index 0000000..25ebd75 --- /dev/null +++ b/src/template_code_location/field_level_pseudo_anonymisation/utils.py @@ -0,0 +1,32 @@ +import os +import hvac +from hvac.exceptions import InvalidPath +from cryptography.fernet import Fernet + + +def create_get_encryption_key(func_name: str, key_name: str) -> bytes: + client = hvac.Client(url=os.getenv("OPENBAO_URL"), token=os.getenv("OPENBAO_TOKEN")) + + secret_folder = os.getenv("ENCRYPTION_KEYS_PATH") + secret_path = f"{secret_folder}/{key_name}" if secret_folder else key_name + mount_point = os.getenv("ENCRYPTION_KEYS_MOUNT_POINT") + + try: + secret_response = client.secrets.kv.v2.read_secret_version( + path=secret_path, mount_point=mount_point + ) + key_value = secret_response["data"]["data"]["value"] + + except InvalidPath: + if func_name == "encrypt": + new_key = Fernet.generate_key().decode() + client.secrets.kv.v2.create_or_update_secret( + path=secret_path, mount_point=mount_point, secret={"value": new_key} + ) + key_value = new_key + else: + raise ValueError(f"Fernet key '{key_name}' not found in Vault for decrypt.") + except Exception as e: + raise ValueError(f"Error while reading Fernet key '{key_name}': {e}") + + return key_value.encode() diff --git a/src/template_code_location/jobs/__init__.py b/src/template_code_location/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/template-code-location/jobs/jobs.py b/src/template_code_location/jobs/jobs.py similarity index 100% rename from src/template-code-location/jobs/jobs.py rename to src/template_code_location/jobs/jobs.py diff --git a/src/template_code_location/ops/__init__.py b/src/template_code_location/ops/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/template-code-location/ops/ops.py b/src/template_code_location/ops/ops.py similarity index 100% rename from src/template-code-location/ops/ops.py rename to src/template_code_location/ops/ops.py diff --git a/src/template_code_location/repository.py b/src/template_code_location/repository.py new file mode 100644 index 0000000..cf97606 --- /dev/null +++ b/src/template_code_location/repository.py @@ -0,0 +1,65 @@ +from dagster import Definitions +from util_services.resources import s3_resource +from util_services.sensors import ( + notify_success, + notify_failure, + notify_canceled +) +from util_services.custom_json_logger import simpl_json_logger + +# Data processing jobs +from template_code_location.data_processing.jobs import ( + remove_duplicates_job_s3, + fill_missing_values_job_s3, + standardize_categorical_values_job_s3, + correct_typos_job_s3, + normalize_numeric_min_max_job_s3, + normalize_datetime_job_s3, + normalize_coordinates_job_s3, + add_global_aggregations_job_s3, + filter_dataset_job_s3, +) + +# Dataframe-level anonymisation jobs +from template_code_location.dataframe_level_anonymisation.jobs import ( + k_anonymity_job_s3, + l_diversity_job_s3, + t_closeness_job_s3, + read_write_semistructured_job_s3, +) + +# Field-level pseudo-anonymisation jobs +from template_code_location.field_level_pseudo_anonymisation.jobs import ( + anonymize_pseudonymize_structured_job_s3, + depseudonymize_structured_job_s3, + anonymize_pseudonymize_unstructured_job_s3, + depseudonymize_unstructured_job_s3, +) + +defs = Definitions( + jobs=[ + # Data processing + remove_duplicates_job_s3, + fill_missing_values_job_s3, + standardize_categorical_values_job_s3, + correct_typos_job_s3, + normalize_numeric_min_max_job_s3, + normalize_datetime_job_s3, + normalize_coordinates_job_s3, + add_global_aggregations_job_s3, + filter_dataset_job_s3, + # Dataframe-level anonymisation + k_anonymity_job_s3, + l_diversity_job_s3, + t_closeness_job_s3, + read_write_semistructured_job_s3, + # Field-level pseudo-anonymisation + anonymize_pseudonymize_structured_job_s3, + depseudonymize_structured_job_s3, + anonymize_pseudonymize_unstructured_job_s3, + depseudonymize_unstructured_job_s3, + ], + sensors=[notify_success, notify_failure, notify_canceled], + resources={"s3": s3_resource.configured({"resource_name": "selfS3"})}, + loggers={"simpl": simpl_json_logger}, +)