feat(SIMPL-24642): consolidate all code locations into template-code-location
- Rename src/template-code-location to src/template_code_location - Copy data-processing jobs/ops/config_models - Copy dataframe-level-anonymisation jobs/ops/utils/config_models - Copy field-level-pseudo-anonymisation jobs/ops/techniques/config_models - Update all imports to template_code_location namespace - Merge all jobs into unified repository.py with sensors/resources/loggers - Update pyproject.toml with all dependencies - Update Dockerfile for consolidated image
This commit is contained in:
11
Dockerfile
11
Dockerfile
@@ -1,13 +1,16 @@
|
|||||||
FROM python:3.12-slim-bookworm
|
FROM python:3.12-slim-bookworm
|
||||||
|
|
||||||
|
# Install git for git-based dependencies
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
COPY pyproject.toml .
|
COPY pyproject.toml .
|
||||||
RUN pip install --no-cache-dir dagster dagster-webserver
|
|
||||||
|
|
||||||
COPY src/ src/
|
COPY src/ src/
|
||||||
|
|
||||||
|
# Install the package and all dependencies
|
||||||
RUN pip install --no-cache-dir .
|
RUN pip install --no-cache-dir .
|
||||||
|
|
||||||
EXPOSE 3000
|
EXPOSE 4000
|
||||||
|
|
||||||
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "template-code-location.repository"]
|
CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "src/template_code_location/repository.py"]
|
||||||
|
|||||||
@@ -4,23 +4,43 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "template-code-location"
|
name = "template-code-location"
|
||||||
version = "0.0.1"
|
version = "0.1.0"
|
||||||
description = "Template code location for data processings services"
|
description = "Consolidated code location for all data services workflows"
|
||||||
requires-python = ">=3.12"
|
requires-python = ">=3.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
# Dagster core
|
||||||
"dagster>=1.8.13",
|
"dagster>=1.8.13",
|
||||||
"dagster-webserver>=1.8.13",
|
"dagster-webserver>=1.8.13",
|
||||||
"dagster-postgres>=0.24.13",
|
"dagster-postgres>=0.24.13",
|
||||||
"pandas>=3.0",
|
# Data processing
|
||||||
|
"pandas>=2.1.4",
|
||||||
"pyarrow>=23.0",
|
"pyarrow>=23.0",
|
||||||
|
"numpy>=2.4",
|
||||||
"lxml>=6.0",
|
"lxml>=6.0",
|
||||||
"xmltodict>=1.0",
|
"xmltodict>=1.0",
|
||||||
"rdflib>=7.6",
|
"rdflib>=7.6",
|
||||||
"numpy>=2.4",
|
"openpyxl",
|
||||||
|
"xlrd>=2.0.1",
|
||||||
|
"tabulate==0.8.10",
|
||||||
|
"pyspellchecker>=0.8.4",
|
||||||
|
"PyGeodesy>=24.6.11",
|
||||||
|
# Validation
|
||||||
"great_expectations>=1.16",
|
"great_expectations>=1.16",
|
||||||
"pandera>=0.31",
|
"pandera>=0.31",
|
||||||
|
"pydantic>=2.6.0,<3.0.0",
|
||||||
|
# Scraping
|
||||||
"scrapy>=2.15",
|
"scrapy>=2.15",
|
||||||
"BeautifulSoup4>=4.14",
|
"BeautifulSoup4>=4.14",
|
||||||
|
# Anonymisation libraries
|
||||||
|
"pycanon==1.0.1.post2",
|
||||||
|
"anjana>=1.0.0",
|
||||||
|
# Field-level pseudo-anonymisation
|
||||||
|
"scrubadub",
|
||||||
|
"scrubadub_spacy",
|
||||||
|
"hvac",
|
||||||
|
"cryptography",
|
||||||
|
# Util services (git dependency)
|
||||||
|
"util-services @ git+https://code.europa.eu/simpl/simpl-open/development/data-services/util-services.git@v0.4.1",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|||||||
@@ -1,6 +0,0 @@
|
|||||||
from dagster import Definitions
|
|
||||||
from .jobs.jobs import data_processing_job
|
|
||||||
|
|
||||||
defs = Definitions(
|
|
||||||
jobs=[data_processing_job],
|
|
||||||
)
|
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
"""Configuration models for data processing."""
|
||||||
|
|
||||||
|
from .columns_select_configuration import ColumnsSelectConfiguration
|
||||||
|
from .fill_missing_config import FillMissingConfiguration
|
||||||
|
from .spell_check_configuration import SpellCheckConfiguration
|
||||||
|
from .coordinates_normalization_configuration import CoordinatesNormalizationConfiguration
|
||||||
|
from .aggregation_configuration import AggregationConfiguration
|
||||||
|
from .filter_configuration import DatasetFilterConfiguration, FilterCondition
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ColumnsSelectConfiguration",
|
||||||
|
"FillMissingConfiguration",
|
||||||
|
"SpellCheckConfiguration",
|
||||||
|
"CoordinatesNormalizationConfiguration",
|
||||||
|
"AggregationConfiguration",
|
||||||
|
"FilterCondition",
|
||||||
|
"DatasetFilterConfiguration"
|
||||||
|
]
|
||||||
@@ -0,0 +1,25 @@
|
|||||||
|
from typing import List
|
||||||
|
|
||||||
|
from pydantic import Field, field_validator
|
||||||
|
|
||||||
|
from .columns_select_configuration import ColumnsSelectConfiguration
|
||||||
|
|
||||||
|
|
||||||
|
class AggregationConfiguration(ColumnsSelectConfiguration):
|
||||||
|
|
||||||
|
operation: str = Field(
|
||||||
|
default="sum",
|
||||||
|
description="Aggregation operations: sum, mean, min, max, count"
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator("operation")
|
||||||
|
@classmethod
|
||||||
|
def validate_operations(cls, value):
|
||||||
|
allowed = {"sum", "mean", "min", "max", "count"}
|
||||||
|
if value not in allowed:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid aggregation operation '{value}'. "
|
||||||
|
f"Allowed values: {allowed}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return value
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
from typing import List
|
||||||
|
from pydantic import Field,field_validator
|
||||||
|
from dagster import Config
|
||||||
|
|
||||||
|
|
||||||
|
class ColumnsSelectConfiguration(Config):
|
||||||
|
columns: List[str] = Field(
|
||||||
|
default=["Name"], description="List of columns to process."
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator("columns")
|
||||||
|
@classmethod
|
||||||
|
def ensure_unique_columns(cls, v: List[str]) -> List[str]:
|
||||||
|
|
||||||
|
unique_values = list(dict.fromkeys(v))
|
||||||
|
|
||||||
|
return unique_values
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pydantic import Field, model_validator
|
||||||
|
from dagster import Config
|
||||||
|
|
||||||
|
|
||||||
|
class CoordinatesNormalizationConfiguration(Config):
|
||||||
|
latColumn: Optional[str] = Field(
|
||||||
|
default="lat", description="Latitude column name"
|
||||||
|
)
|
||||||
|
lonColumn: Optional[str] = Field(
|
||||||
|
default="lon", description="Longitude column name"
|
||||||
|
)
|
||||||
|
|
||||||
|
@model_validator(mode="before")
|
||||||
|
@classmethod
|
||||||
|
def replace_nulls_with_defaults(cls, values):
|
||||||
|
if values.get("latColumn") is None:
|
||||||
|
values["latColumn"] = "lat"
|
||||||
|
if values.get("lonColumn") is None:
|
||||||
|
values["lonColumn"] = "lon"
|
||||||
|
return values
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
from typing import Dict
|
||||||
|
from dagster import Config
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
|
||||||
|
class FillMissingConfiguration(Config):
|
||||||
|
fill_map: Dict[str, str] = Field(
|
||||||
|
default={"Age": "UNKNOWN_AGE"}, description="Missing values filling map."
|
||||||
|
)
|
||||||
@@ -0,0 +1,52 @@
|
|||||||
|
from enum import Enum
|
||||||
|
import operator
|
||||||
|
from typing import List, Literal, Callable
|
||||||
|
from pydantic import Field, model_validator
|
||||||
|
from dagster import Config
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
class FilterOperator(str, Enum):
|
||||||
|
EQ = "=="
|
||||||
|
NE = "!="
|
||||||
|
LT = "<"
|
||||||
|
LE = "<="
|
||||||
|
GT = ">"
|
||||||
|
GE = ">="
|
||||||
|
|
||||||
|
@property
|
||||||
|
def function(self) -> Callable:
|
||||||
|
mapping = {
|
||||||
|
FilterOperator.EQ: operator.eq,
|
||||||
|
FilterOperator.NE: operator.ne,
|
||||||
|
FilterOperator.LT: operator.lt,
|
||||||
|
FilterOperator.LE: operator.le,
|
||||||
|
FilterOperator.GT: operator.gt,
|
||||||
|
FilterOperator.GE: operator.ge,
|
||||||
|
}
|
||||||
|
return mapping[self]
|
||||||
|
|
||||||
|
class FilterCondition(Config):
|
||||||
|
column: str = Field(..., description="Name of the column to filter")
|
||||||
|
type: Literal["string", "numeric"] = Field(..., description="Column type (string or numeric)")
|
||||||
|
value: str = Field(..., description="Value to compare against")
|
||||||
|
op: FilterOperator = Field(default=FilterOperator.EQ, description="Operator to apply (string supports only EQ and NE)")
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def check_operator_compatibility(self) -> "FilterCondition":
|
||||||
|
if self.type == "string" and self.op not in [FilterOperator.EQ, FilterOperator.NE]:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid operator '{self.op.name}' for type 'string'. "
|
||||||
|
"Only EQ (==) and NE (!=) are allowed."
|
||||||
|
)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def apply(self, df: pd.DataFrame) -> pd.Series:
|
||||||
|
val = float(self.value) if self.type == "numeric" else self.value
|
||||||
|
return self.op.function(df[self.column], val)
|
||||||
|
|
||||||
|
class DatasetFilterConfiguration(Config):
|
||||||
|
conditions: List[FilterCondition] = Field(
|
||||||
|
default=[],
|
||||||
|
description="List of filter conditions to apply on the dataset. "
|
||||||
|
"String columns support only 'EQ' and 'NE', numeric columns also support 'LT', 'LE', 'GT' and 'GE'."
|
||||||
|
)
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
from typing import Literal
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from .columns_select_configuration import ColumnsSelectConfiguration
|
||||||
|
|
||||||
|
|
||||||
|
class SpellCheckConfiguration(ColumnsSelectConfiguration):
|
||||||
|
language: Literal["en", "es", "it", "fr", "pt", "de", "nl"] = Field(default="en", description="Language to use in the SpellChecker module.")
|
||||||
119
src/template_code_location/data_processing/jobs.py
Normal file
119
src/template_code_location/data_processing/jobs.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
from dagster import job
|
||||||
|
from util_services.util_ops import (
|
||||||
|
preview_dataframe,
|
||||||
|
read_csv_from_s3,
|
||||||
|
write_csv_to_s3,
|
||||||
|
)
|
||||||
|
from .ops import (
|
||||||
|
remove_duplicates,
|
||||||
|
fill_missing_values,
|
||||||
|
standardize_categorical_values,
|
||||||
|
correct_typos,
|
||||||
|
normalize_numeric_min_max,
|
||||||
|
normalize_datetime,
|
||||||
|
normalize_coordinates,
|
||||||
|
add_global_aggregations,
|
||||||
|
filter_dataset
|
||||||
|
)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def remove_duplicates_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = remove_duplicates(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def fill_missing_values_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = fill_missing_values(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def standardize_categorical_values_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = standardize_categorical_values(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def correct_typos_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = correct_typos(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def normalize_numeric_min_max_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = normalize_numeric_min_max(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def normalize_datetime_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = normalize_datetime(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def normalize_coordinates_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = normalize_coordinates(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def add_global_aggregations_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = add_global_aggregations(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "PROCESSING",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def filter_dataset_job_s3():
|
||||||
|
org_df = read_csv_from_s3()
|
||||||
|
anon_df = filter_dataset(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_csv_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
256
src/template_code_location/data_processing/ops.py
Normal file
256
src/template_code_location/data_processing/ops.py
Normal file
@@ -0,0 +1,256 @@
|
|||||||
|
import pandas as pd
|
||||||
|
from dagster import Out, op
|
||||||
|
from spellchecker import SpellChecker
|
||||||
|
|
||||||
|
from template_code_location.data_processing.config_models import (
|
||||||
|
AggregationConfiguration,
|
||||||
|
ColumnsSelectConfiguration,
|
||||||
|
CoordinatesNormalizationConfiguration,
|
||||||
|
FillMissingConfiguration,
|
||||||
|
SpellCheckConfiguration,
|
||||||
|
DatasetFilterConfiguration
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_dms_to_decimal(value):
|
||||||
|
"""Parse a DMS (degrees-minutes-seconds) string to decimal degrees using PyGeodesy.
|
||||||
|
|
||||||
|
Supported formats include (but are not limited to):
|
||||||
|
- 40°26'46"N / 40°26′46″N
|
||||||
|
- 40 26 46 N
|
||||||
|
- 40:26:46N
|
||||||
|
- 40d26m46sN
|
||||||
|
- -40.446 (already decimal – returned as-is)
|
||||||
|
|
||||||
|
Returns None if parsing fails.
|
||||||
|
"""
|
||||||
|
from pygeodesy.dms import parseDMS
|
||||||
|
|
||||||
|
if pd.isna(value):
|
||||||
|
return None
|
||||||
|
|
||||||
|
text = str(value).strip()
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
return float(parseDMS(text))
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
try:
|
||||||
|
return float(text)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def remove_duplicates(context, df: pd.DataFrame):
|
||||||
|
"""Remove duplicate rows from the input DataFrame."""
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
before = df.shape[0]
|
||||||
|
|
||||||
|
df = df.drop_duplicates()
|
||||||
|
|
||||||
|
after = df.shape[0]
|
||||||
|
|
||||||
|
logger.info(f"Removed {before - after} duplicate rows")
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def fill_missing_values(context, config: FillMissingConfiguration, df: pd.DataFrame):
|
||||||
|
"""Fill missing values in the DataFrame according to the configured column-to-value mapping."""
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
logger.info(f"Filling missing values: {config.fill_map}")
|
||||||
|
|
||||||
|
return df.fillna(config.fill_map)
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def standardize_categorical_values(context, config: ColumnsSelectConfiguration, df: pd.DataFrame):
|
||||||
|
"""Standardize categorical values in selected columns by trimming whitespace and converting text to lowercase."""
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
for col in config.columns:
|
||||||
|
if col not in df.columns:
|
||||||
|
logger.warning(f"Column '{col}' not found in DataFrame, skipping.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
original = df[col]
|
||||||
|
|
||||||
|
standardized = (
|
||||||
|
df[col]
|
||||||
|
.fillna("")
|
||||||
|
.astype(str)
|
||||||
|
.str.strip()
|
||||||
|
.str.lower()
|
||||||
|
)
|
||||||
|
|
||||||
|
changed_count = (original != standardized).sum()
|
||||||
|
df[col] = standardized
|
||||||
|
|
||||||
|
logger.info(f"Standardized '{col}' column – {changed_count} values modified")
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def correct_typos(context, config: SpellCheckConfiguration, df: pd.DataFrame):
|
||||||
|
"""Correct spelling mistakes in the specified text columns."""
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
for column in config.columns:
|
||||||
|
if column not in df.columns:
|
||||||
|
logger.warning(f"Column '{column}' not found in DataFrame, skipping.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
spell = SpellChecker(language=config.language)
|
||||||
|
|
||||||
|
original = df[column].astype(str)
|
||||||
|
corrected = original.apply(lambda x, spell_checker=spell: spell_checker.correction(x) if x else x)
|
||||||
|
|
||||||
|
changed_count = (original != corrected).sum()
|
||||||
|
logger.info(f"Corrected typos in '{column}' – {changed_count} values modified")
|
||||||
|
|
||||||
|
df[column] = corrected
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def normalize_datetime(context, config: ColumnsSelectConfiguration, df: pd.DataFrame):
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
for col in config.columns:
|
||||||
|
if col not in df.columns:
|
||||||
|
logger.warning(f"Column '{col}' not found, skipping normalization.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
normalized = pd.to_datetime(df[col], utc=True, format="mixed", dayfirst=True, errors="coerce")
|
||||||
|
|
||||||
|
if normalized.notna().sum() == 0:
|
||||||
|
logger.warning(
|
||||||
|
f"Column '{col}' has no normalizable datetime values, skipping."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
iso_col = f"{col}_iso"
|
||||||
|
|
||||||
|
formatted = normalized.dt.strftime("%Y-%m-%dT%H:%M:%SZ").fillna("")
|
||||||
|
non_empty = formatted[formatted != ""]
|
||||||
|
if len(non_empty) > 0 and non_empty.str.startswith("1970-01-01").all():
|
||||||
|
logger.warning(
|
||||||
|
f"Column '{col}' all normalized values are '1970-01-01', likely bad input — skipping."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
df[iso_col] = formatted
|
||||||
|
|
||||||
|
logger.info(f"Normalized datetime column '{col}' into '{iso_col}'")
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def normalize_numeric_min_max(context, config: ColumnsSelectConfiguration, df: pd.DataFrame):
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
for col in config.columns:
|
||||||
|
if col not in df.columns:
|
||||||
|
logger.warning(f"Column '{col}' not found, skipping normalization.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
min_val = df[col].min()
|
||||||
|
max_val = df[col].max()
|
||||||
|
|
||||||
|
if min_val == max_val:
|
||||||
|
logger.warning(f"Column '{col}' has constant values, skipping normalization.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
df[col + "_norm"] = (df[col] - min_val) / (max_val - min_val)
|
||||||
|
logger.info(f"Normalized numeric column '{col}'")
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def normalize_coordinates(context, config: CoordinatesNormalizationConfiguration, df: pd.DataFrame):
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
lat = config.latColumn
|
||||||
|
lon = config.lonColumn
|
||||||
|
|
||||||
|
for col in [lat, lon]:
|
||||||
|
if pd.api.types.is_numeric_dtype(df[col]):
|
||||||
|
logger.info(f"Column '{col}' is numeric — coercing directly")
|
||||||
|
df[col] = pd.to_numeric(df[col], errors="coerce")
|
||||||
|
else:
|
||||||
|
logger.info(f"Column '{col}' is non-numeric — parsing as DMS with PyGeodesy")
|
||||||
|
df[col] = df[col].apply(_parse_dms_to_decimal)
|
||||||
|
|
||||||
|
invalid_lat = df[lat].isnull().sum()
|
||||||
|
invalid_lon = df[lon].isnull().sum()
|
||||||
|
logger.info(f"Found {invalid_lat} invalid latitudes and {invalid_lon} invalid longitudes")
|
||||||
|
|
||||||
|
df[lat] = df[lat].round(4)
|
||||||
|
df[lon] = df[lon].round(4)
|
||||||
|
|
||||||
|
before_filter_rows = len(df)
|
||||||
|
df = df[(df[lat].between(-90, 90)) & (df[lon].between(-180, 180))]
|
||||||
|
after_filter_rows = len(df)
|
||||||
|
logger.info(f"Filtered coordinates out of range: removed {before_filter_rows - after_filter_rows} rows")
|
||||||
|
|
||||||
|
logger.info(f"Coordinate normalization completed: resulting dataframe has {after_filter_rows} rows")
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def add_global_aggregations(context, config: AggregationConfiguration, df: pd.DataFrame):
|
||||||
|
logger = context.log
|
||||||
|
|
||||||
|
group_by_cols = []
|
||||||
|
|
||||||
|
for col in config.columns:
|
||||||
|
if col not in df.columns:
|
||||||
|
logger.warning(f"Column '{col}' not found, skipping aggregation.")
|
||||||
|
continue
|
||||||
|
group_by_cols.append(col)
|
||||||
|
|
||||||
|
if config.operation not in {"sum", "mean", "min", "max", "count"}:
|
||||||
|
logger.warning(f"Unsupported aggregation '{config.operation}'")
|
||||||
|
|
||||||
|
numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
|
||||||
|
cols_to_keep = list(set(numeric_cols + group_by_cols))
|
||||||
|
df = df[[c for c in cols_to_keep if c in df.columns]]
|
||||||
|
df = df.groupby(group_by_cols).agg(config.operation).reset_index()
|
||||||
|
return df
|
||||||
|
|
||||||
|
@op(out={"data": Out()})
|
||||||
|
def filter_dataset(context, config: DatasetFilterConfiguration, df: pd.DataFrame):
|
||||||
|
logger = context.log
|
||||||
|
total_rows_before = len(df)
|
||||||
|
|
||||||
|
logger.info(f"Starting dataset filtering: initial dataframe has {total_rows_before} rows")
|
||||||
|
|
||||||
|
combined_mask = pd.Series([True] * total_rows_before, index=df.index)
|
||||||
|
|
||||||
|
for condition in config.conditions:
|
||||||
|
if condition.column not in df.columns:
|
||||||
|
logger.warning(f"Column '{condition.column}' not found, skipping filtering.")
|
||||||
|
continue
|
||||||
|
if df[condition.column].isna().all():
|
||||||
|
logger.warning(f"Column '{condition.column}' is empty (all NaN), skipping filtering.")
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
current_mask = condition.apply(df)
|
||||||
|
combined_mask &= current_mask
|
||||||
|
|
||||||
|
logger.info(f"Applied filter: {condition.column} {condition.op.value} '{condition.value}'")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error applying filter on column '{condition.column}': {e}")
|
||||||
|
|
||||||
|
filtered_df = df[combined_mask]
|
||||||
|
total_rows_after = len(filtered_df)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Filtering completed: {total_rows_after} rows remain "
|
||||||
|
f"(removed {total_rows_before - total_rows_after} rows in total)"
|
||||||
|
)
|
||||||
|
|
||||||
|
return filtered_df
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
"""Configuration models for dataframe-level anonymization."""
|
||||||
|
|
||||||
|
from .k_anonymity_configuration import KAnonymityConfiguration
|
||||||
|
from .l_diversity_configuration import LDiversityConfiguration
|
||||||
|
from .t_closeness_configuration import TClosenessConfiguration
|
||||||
|
from .base_config import BaseConfiguration
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"BaseConfiguration",
|
||||||
|
"KAnonymityConfiguration",
|
||||||
|
"LDiversityConfiguration",
|
||||||
|
"TClosenessConfiguration",
|
||||||
|
]
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
from typing import Dict, List
|
||||||
|
from dagster import Config
|
||||||
|
from pydantic import Field, field_validator, model_validator
|
||||||
|
|
||||||
|
|
||||||
|
class BaseConfiguration(Config):
|
||||||
|
ident: List[str] = Field(default=["Name"], description="List of identifier column names.")
|
||||||
|
quasi_identifiers: List[str] = Field(default=["Age"], description="List of quasi-identifier column names.")
|
||||||
|
supp_level: float = Field(default=50.0, ge=0.0, le=100.0, description="Max suppression allowed (0–100).")
|
||||||
|
generalisation_hierarchies: Dict[str, str] = Field(
|
||||||
|
default={"Age": "simpl_age"}, description="Hierarchies used to generalize quasi-identifiers."
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator("quasi_identifiers")
|
||||||
|
def validate_quasi_identifiers(cls, value):
|
||||||
|
if not value:
|
||||||
|
raise ValueError("At least one quasi-identifier must be provided.")
|
||||||
|
return value
|
||||||
|
|
||||||
|
@field_validator("ident")
|
||||||
|
def validate_ident(cls, value):
|
||||||
|
if not value:
|
||||||
|
raise ValueError("At least one identifier must be provided.")
|
||||||
|
return value
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def check_no_overlap(self):
|
||||||
|
ident = set(self.ident)
|
||||||
|
quasi = set(self.quasi_identifiers)
|
||||||
|
overlap = ident & quasi
|
||||||
|
if overlap:
|
||||||
|
raise ValueError(f"Fields cannot be both identifiers and quasi-identifiers: {overlap}")
|
||||||
|
return self
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
from anjana.anonymity.utils import utils
|
||||||
|
|
||||||
|
simpl_age = {
|
||||||
|
0: [age for age in range(0, 100)],
|
||||||
|
1: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 5),
|
||||||
|
2: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 10),
|
||||||
|
3: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 20),
|
||||||
|
4: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 100),
|
||||||
|
}
|
||||||
|
simpl_age2 = {
|
||||||
|
0: [age for age in range(0, 100)],
|
||||||
|
1: utils.generate_intervals([age for age in range(0, 100)], 0, 100, 5),
|
||||||
|
}
|
||||||
|
simpl_gender = {0: ["M", "F", "O"], 1: ["*", "*", "*"]}
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_hierarchies():
|
||||||
|
return {name: obj for name, obj in globals().items() if isinstance(obj, dict)}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
from typing import List
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from .base_config import BaseConfiguration
|
||||||
|
|
||||||
|
|
||||||
|
class KAnonymityConfiguration(BaseConfiguration):
|
||||||
|
k: int = Field(default=3, ge=2, description="Desired level of k-anonymity (must be >= 2).")
|
||||||
|
sensitive_attributes: List[str] = Field(
|
||||||
|
default=["Disease"], description="List of sensitive attribute column names."
|
||||||
|
)
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
from pydantic import Field
|
||||||
|
from .base_config import BaseConfiguration
|
||||||
|
|
||||||
|
|
||||||
|
class LDiversityConfiguration(BaseConfiguration):
|
||||||
|
k: int = Field(default=2, ge=2, description="Desired level of k-anonymity (must be >= 2).")
|
||||||
|
l: int = Field(default=3, ge=1, description="L-diversity level (must be >= 1)")
|
||||||
|
sensitive_attribute: str = Field(default="Disease", description="Sensitive attribute name.")
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
from pydantic import Field
|
||||||
|
from .base_config import BaseConfiguration
|
||||||
|
|
||||||
|
|
||||||
|
class TClosenessConfiguration(BaseConfiguration):
|
||||||
|
k: int = Field(default=2, ge=2, description="Desired level of k-anonymity (must be >= 2).")
|
||||||
|
t: float = Field(default=0.5, ge=0.0, le=1.0, description="Maximum t-distance threshold.")
|
||||||
|
sensitive_attribute: str = Field(default="Disease", description="Sensitive attribute name.")
|
||||||
@@ -0,0 +1,86 @@
|
|||||||
|
from dagster import job
|
||||||
|
from util_services.util_ops import (
|
||||||
|
preview_dataframe,
|
||||||
|
read_structured_to_df,
|
||||||
|
write_df_to_local,
|
||||||
|
read_structured_from_s3,
|
||||||
|
write_df_to_s3,
|
||||||
|
write_semistructured_to_s3,
|
||||||
|
read_semistructured_from_s3
|
||||||
|
)
|
||||||
|
|
||||||
|
from .ops import apply_k_anonymity, apply_l_diversity, apply_t_closeness
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION"
|
||||||
|
})
|
||||||
|
def k_anonymity_job():
|
||||||
|
org_df = read_structured_to_df()
|
||||||
|
anon_df, _ = apply_k_anonymity(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_df_to_local(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION"
|
||||||
|
})
|
||||||
|
def l_diversity_job():
|
||||||
|
org_df = read_structured_to_df()
|
||||||
|
anon_df, _ = apply_l_diversity(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_df_to_local(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION"
|
||||||
|
})
|
||||||
|
def t_closeness_job():
|
||||||
|
org_df = read_structured_to_df()
|
||||||
|
anon_df, _ = apply_t_closeness(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_df_to_local(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def k_anonymity_job_s3():
|
||||||
|
org_df = read_structured_from_s3()
|
||||||
|
anon_df, _ = apply_k_anonymity(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_df_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def l_diversity_job_s3():
|
||||||
|
org_df = read_structured_from_s3()
|
||||||
|
anon_df, _ = apply_l_diversity(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_df_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def t_closeness_job_s3():
|
||||||
|
org_df = read_structured_from_s3()
|
||||||
|
anon_df, _ = apply_t_closeness(org_df)
|
||||||
|
preview_dataframe(org_df)
|
||||||
|
write_df_to_s3(anon_df)
|
||||||
|
preview_dataframe(anon_df)
|
||||||
|
|
||||||
|
@job()
|
||||||
|
def read_write_semistructured_job_s3():
|
||||||
|
semistruct_data = read_semistructured_from_s3()
|
||||||
|
write_semistructured_to_s3(semistruct_data)
|
||||||
187
src/template_code_location/dataframe_level_anonymisation/ops.py
Normal file
187
src/template_code_location/dataframe_level_anonymisation/ops.py
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
import json
|
||||||
|
from textwrap import dedent
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from anjana.anonymity import k_anonymity, l_diversity, t_closeness
|
||||||
|
from dagster import (
|
||||||
|
DagsterInvalidInvocationError,
|
||||||
|
MarkdownMetadataValue,
|
||||||
|
Out,
|
||||||
|
Output,
|
||||||
|
get_dagster_logger,
|
||||||
|
op,
|
||||||
|
)
|
||||||
|
from pycanon import anonymity
|
||||||
|
|
||||||
|
from template_code_location.dataframe_level_anonymisation.config_models import (
|
||||||
|
KAnonymityConfiguration,
|
||||||
|
LDiversityConfiguration,
|
||||||
|
TClosenessConfiguration,
|
||||||
|
)
|
||||||
|
from template_code_location.dataframe_level_anonymisation.config_models.hierarchies import get_all_hierarchies
|
||||||
|
|
||||||
|
|
||||||
|
def _calc_dataframe_metrics(df_anon, df_org, quasi_identifiers, sensitive_atttributes):
|
||||||
|
# --- Metrics ---
|
||||||
|
# Anonymization metrics
|
||||||
|
k_anon = anonymity.k_anonymity(df_anon, quasi_identifiers)
|
||||||
|
l_div = anonymity.l_diversity(df_anon, quasi_identifiers, sensitive_atttributes, True)
|
||||||
|
t_clos = anonymity.t_closeness(df_anon, quasi_identifiers, sensitive_atttributes, True)
|
||||||
|
|
||||||
|
# Data Utilization metrics
|
||||||
|
supression_rate = 1 - len(df_anon) / len(df_org)
|
||||||
|
grouped = df_anon.groupby(quasi_identifiers)
|
||||||
|
mean_equivalence_class_size = len(df_anon) / len(grouped) if len(grouped) else 0
|
||||||
|
|
||||||
|
# flake8: noqa
|
||||||
|
anon_report = dedent(
|
||||||
|
f"""
|
||||||
|
### Anonymization & Data Utilization Metrics
|
||||||
|
|
||||||
|
| Metric | Value | Description |
|
||||||
|
|--------|-------|-------------|
|
||||||
|
| **k-anonymity** | `k = {k_anon}` | Minimum number of records sharing the same quasi-identifier values. |
|
||||||
|
| **l-diversity** | `l = {l_div}` | Diversity of sensitive attributes within each equivalence class. |
|
||||||
|
| **t-closeness** | `t = {round(t_clos, 2)}` | Distance between sensitive attribute distribution in a group and the overall dataset. |
|
||||||
|
| **Suppression rate** | `{round(supression_rate, 2)}` | Fraction of records or attributes suppressed to meet privacy requirements. |
|
||||||
|
| **Mean equivalence class size** | `{round(mean_equivalence_class_size, 2)}` | Average size of equivalence classes for quasi-identifiers, indicates data grouping. |
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
# flake8: enable
|
||||||
|
metrics = {
|
||||||
|
"k_anon": k_anon,
|
||||||
|
"l_div": l_div,
|
||||||
|
"t_clos": t_clos,
|
||||||
|
"supp_rate": supression_rate,
|
||||||
|
"mean_equivalence_class": mean_equivalence_class_size,
|
||||||
|
}
|
||||||
|
return anon_report, metrics
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_and_get_hierarchies(config, df: pd.DataFrame):
|
||||||
|
hierarchies = get_all_hierarchies()
|
||||||
|
|
||||||
|
# Dataset smaller than k
|
||||||
|
if len(df) < config.k:
|
||||||
|
raise DagsterInvalidInvocationError(
|
||||||
|
f"Cannot apply k-anonymity: dataset has {len(df)} records, but k={config.k}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Missing or incomplete generalisation hierarchies
|
||||||
|
for qi in config.quasi_identifiers:
|
||||||
|
if qi not in config.generalisation_hierarchies or not config.generalisation_hierarchies[qi]:
|
||||||
|
raise DagsterInvalidInvocationError(
|
||||||
|
f"Generalisation hierarchy for quasi-identifier '{qi}' is missing or incomplete"
|
||||||
|
)
|
||||||
|
if config.generalisation_hierarchies[qi] not in hierarchies:
|
||||||
|
raise DagsterInvalidInvocationError(
|
||||||
|
f"Generalisation hierarchy '{config.generalisation_hierarchies[qi]}' is missing in the code basis"
|
||||||
|
)
|
||||||
|
|
||||||
|
hier = {
|
||||||
|
qi: hierarchies[config.generalisation_hierarchies[qi]] for qi in config.quasi_identifiers
|
||||||
|
}
|
||||||
|
return hier
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def apply_k_anonymity(context, config: KAnonymityConfiguration, df: pd.DataFrame):
|
||||||
|
|
||||||
|
hier = _validate_and_get_hierarchies(config, df)
|
||||||
|
|
||||||
|
data_anon = k_anonymity(
|
||||||
|
df, config.ident, config.quasi_identifiers, config.k, config.supp_level, hier
|
||||||
|
)
|
||||||
|
if "index" in data_anon.columns and "index" not in df.columns:
|
||||||
|
data_anon.drop(columns="index", inplace=True)
|
||||||
|
anon_report, metrics = _calc_dataframe_metrics(
|
||||||
|
data_anon, df, config.quasi_identifiers, config.sensitive_attributes
|
||||||
|
)
|
||||||
|
yield Output(
|
||||||
|
value=data_anon,
|
||||||
|
metadata={
|
||||||
|
"metric_report": MarkdownMetadataValue(anon_report),
|
||||||
|
"metric_json": json.dumps(metrics),
|
||||||
|
},
|
||||||
|
output_name="data",
|
||||||
|
)
|
||||||
|
yield Output(value=metrics, output_name="metrics")
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def apply_l_diversity(context, config: LDiversityConfiguration, df: pd.DataFrame):
|
||||||
|
|
||||||
|
hier = _validate_and_get_hierarchies(config, df)
|
||||||
|
|
||||||
|
data_anon = l_diversity(
|
||||||
|
df,
|
||||||
|
config.ident,
|
||||||
|
config.quasi_identifiers,
|
||||||
|
config.sensitive_attribute,
|
||||||
|
config.k,
|
||||||
|
config.l,
|
||||||
|
config.supp_level,
|
||||||
|
hier,
|
||||||
|
)
|
||||||
|
if data_anon.empty:
|
||||||
|
raise DagsterInvalidInvocationError(
|
||||||
|
"Could not tranform the data to l-diversity, empty dataset returned!"
|
||||||
|
)
|
||||||
|
anon_report, metrics = _calc_dataframe_metrics(
|
||||||
|
data_anon, df, config.quasi_identifiers, [config.sensitive_attribute]
|
||||||
|
)
|
||||||
|
yield Output(
|
||||||
|
value=data_anon,
|
||||||
|
metadata={
|
||||||
|
"metric_report": MarkdownMetadataValue(anon_report),
|
||||||
|
"metric_json": json.dumps(metrics),
|
||||||
|
},
|
||||||
|
output_name="data",
|
||||||
|
)
|
||||||
|
yield Output(value=metrics, output_name="metrics")
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def apply_t_closeness(context, config: TClosenessConfiguration, df: pd.DataFrame):
|
||||||
|
|
||||||
|
hier = _validate_and_get_hierarchies(config, df)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data_anon = t_closeness(
|
||||||
|
df,
|
||||||
|
config.ident,
|
||||||
|
config.quasi_identifiers,
|
||||||
|
config.sensitive_attribute,
|
||||||
|
config.k,
|
||||||
|
config.t,
|
||||||
|
config.supp_level,
|
||||||
|
hier,
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
if "Cannot be quasi-identifiers" in str(e):
|
||||||
|
raise DagsterInvalidInvocationError(
|
||||||
|
f"T-closeness failed: k-anonymity parameter = {config.k} is too small "
|
||||||
|
f"for existing hierarchies of {config.quasi_identifiers} in inner k-anonymity call."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Re-raise other ValueError types with context
|
||||||
|
raise DagsterInvalidInvocationError(f"T-closeness failed with error: {str(e)}")
|
||||||
|
|
||||||
|
if data_anon.empty:
|
||||||
|
raise DagsterInvalidInvocationError(
|
||||||
|
f"Could not transform the data to t-closeness, empty dataset returned! "
|
||||||
|
f"This may indicate that the t-closeness constraint (t={config.t}) is too strict for the given data."
|
||||||
|
)
|
||||||
|
|
||||||
|
anon_report, metrics = _calc_dataframe_metrics(
|
||||||
|
data_anon, df, config.quasi_identifiers, [config.sensitive_attribute]
|
||||||
|
)
|
||||||
|
yield Output(
|
||||||
|
value=data_anon,
|
||||||
|
metadata={
|
||||||
|
"metric_report": MarkdownMetadataValue(anon_report),
|
||||||
|
"metric_json": json.dumps(metrics),
|
||||||
|
},
|
||||||
|
output_name="data",
|
||||||
|
)
|
||||||
|
yield Output(value=metrics, output_name="metrics")
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def parse_value_list(values):
|
||||||
|
return [int(v) if isinstance(v, str) and v.isdigit() else v for v in values]
|
||||||
|
|
||||||
|
|
||||||
|
# Hierarchy normalization for Anjana
|
||||||
|
def normalize_hierarchy_levels(hierarchy_dict):
|
||||||
|
normalized = {}
|
||||||
|
for column, levels in hierarchy_dict.items():
|
||||||
|
normalized[column] = {}
|
||||||
|
for level_str, mapping_list in levels.items():
|
||||||
|
level = int(level_str)
|
||||||
|
if level == 0:
|
||||||
|
normalized[column][level] = np.array(parse_value_list(mapping_list))
|
||||||
|
else:
|
||||||
|
normalized[column][level] = mapping_list
|
||||||
|
return normalized
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
from .structured_config import ( # noqa: F401
|
||||||
|
HashConfig,
|
||||||
|
EncryptConfig,
|
||||||
|
RedactConfig,
|
||||||
|
ReplaceConfig,
|
||||||
|
PseudoTechniqueConfig,
|
||||||
|
AnonymisePseudonymizeStructuredConfig,
|
||||||
|
DecryptConfig,
|
||||||
|
DepseudoTechniqueConfig,
|
||||||
|
DepseudonymizeStructuredConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
from .unstructured_config import ( # noqa: F401, F811
|
||||||
|
HashConfig,
|
||||||
|
EncryptConfig,
|
||||||
|
RedactConfig,
|
||||||
|
ReplaceConfig,
|
||||||
|
RetainConfig,
|
||||||
|
PseudoTechniqueConfig,
|
||||||
|
AnonymisePseudonymizeUnstructuredConfig,
|
||||||
|
DecryptConfig,
|
||||||
|
DepseudoTechniqueConfig,
|
||||||
|
DepseudonymizeUnstructuredConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
from .languages import SupportedLanguages, LanguageEnum # noqa: F401
|
||||||
|
|
||||||
|
from .pii_entities import PIIEntityEnum, PII_MAPPING # noqa: F401
|
||||||
@@ -0,0 +1,72 @@
|
|||||||
|
from enum import Enum
|
||||||
|
from typing import ClassVar
|
||||||
|
|
||||||
|
|
||||||
|
class SupportedLanguages:
|
||||||
|
LANGUAGES: ClassVar[dict[str, str]] = {
|
||||||
|
"hr": "hr_HR", # Croatian
|
||||||
|
"da": "da_DK", # Danish
|
||||||
|
"nl": "nl_NL", # Dutch
|
||||||
|
"en": "en_US", # English
|
||||||
|
"fi": "fi_FI", # Finnish
|
||||||
|
"fr": "fr_FR", # French
|
||||||
|
"de": "de_DE", # German
|
||||||
|
"el": "el_GR", # Greek
|
||||||
|
"it": "it_IT", # Italian
|
||||||
|
"lt": "lt_LT", # Lithuanian
|
||||||
|
"pl": "pl_PL", # Polish
|
||||||
|
"pt": "pt_PT", # Portuguese
|
||||||
|
"ro": "ro_RO", # Romanian
|
||||||
|
"sl": "sl_SI", # Slovenian
|
||||||
|
"es": "es_ES", # Spanish
|
||||||
|
"sv": "sv_SE", # Swedish
|
||||||
|
}
|
||||||
|
LANGUAGE_MODELS = {
|
||||||
|
"en": "en_core_web_sm",
|
||||||
|
"it": "it_core_news_sm",
|
||||||
|
"de": "de_core_news_sm",
|
||||||
|
"fr": "fr_core_news_sm",
|
||||||
|
"es": "es_core_news_sm",
|
||||||
|
"nl": "nl_core_news_sm",
|
||||||
|
"da": "da_core_news_sm",
|
||||||
|
"sv": "sv_core_news_sm",
|
||||||
|
"fi": "fi_core_news_sm",
|
||||||
|
"pl": "pl_core_news_sm",
|
||||||
|
"el": "el_core_news_sm",
|
||||||
|
"hr": "hr_core_news_sm",
|
||||||
|
"lt": "lt_core_news_sm",
|
||||||
|
"pt": "pt_core_news_sm",
|
||||||
|
"ro": "ro_core_news_sm",
|
||||||
|
"sl": "sl_core_news_sm",
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def codes(cls) -> list[str]:
|
||||||
|
return list(cls.LANGUAGES.keys())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_locale(cls, code: str) -> str:
|
||||||
|
return cls.LANGUAGES[code]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_language_model(cls, code: str) -> str:
|
||||||
|
return cls.LANGUAGE_MODELS[code]
|
||||||
|
|
||||||
|
|
||||||
|
class LanguageEnum(str, Enum):
|
||||||
|
hr = "hr"
|
||||||
|
da = "da"
|
||||||
|
nl = "nl"
|
||||||
|
en = "en"
|
||||||
|
fi = "fi"
|
||||||
|
fr = "fr"
|
||||||
|
de = "de"
|
||||||
|
el = "el"
|
||||||
|
it = "it"
|
||||||
|
lt = "lt"
|
||||||
|
pl = "pl"
|
||||||
|
pt = "pt"
|
||||||
|
ro = "ro"
|
||||||
|
sl = "sl"
|
||||||
|
es = "es"
|
||||||
|
sv = "sv"
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
class PIIEntityEnum(str, Enum):
|
||||||
|
PERSON = "Person"
|
||||||
|
EMAIL = "Email"
|
||||||
|
CREDIT_CARD = "Credit card"
|
||||||
|
DATE_OF_BIRTH = "Date of birth"
|
||||||
|
URL = "URLs"
|
||||||
|
PHONE_NUMBERS = "Phone numbers"
|
||||||
|
CREDENTIALS = "Credentials"
|
||||||
|
X_SOCIAL = "X (formally known as Twitter) username"
|
||||||
|
|
||||||
|
|
||||||
|
PII_MAPPING: dict[PIIEntityEnum, str] = {
|
||||||
|
PIIEntityEnum.PERSON: "NameFilth",
|
||||||
|
PIIEntityEnum.EMAIL: "EmailFilth",
|
||||||
|
PIIEntityEnum.CREDIT_CARD: "CreditCardFilth",
|
||||||
|
PIIEntityEnum.DATE_OF_BIRTH: "DateOfBirthFilth",
|
||||||
|
PIIEntityEnum.URL: "UrlFilth",
|
||||||
|
PIIEntityEnum.PHONE_NUMBERS: "PhoneFilth",
|
||||||
|
PIIEntityEnum.CREDENTIALS: "CredentialFilth",
|
||||||
|
PIIEntityEnum.X_SOCIAL: "TwitterFilth",
|
||||||
|
}
|
||||||
@@ -0,0 +1,110 @@
|
|||||||
|
from typing import List, Literal, Optional, Union
|
||||||
|
|
||||||
|
from dagster import Config
|
||||||
|
from pydantic import Field as PydanticField, model_validator, field_validator
|
||||||
|
|
||||||
|
|
||||||
|
class HashConfig(Config):
|
||||||
|
type: Literal["hash"] = "hash"
|
||||||
|
columns: List[str] = PydanticField(default=["example_column"], description="Columns to hash")
|
||||||
|
algorithm: str = PydanticField(default="sha256", description="Hashing algorithm")
|
||||||
|
|
||||||
|
class EncryptConfig(Config):
|
||||||
|
type: Literal["encrypt"] = "encrypt"
|
||||||
|
columns: List[str] = PydanticField(default=["example_column"], description="Columns to encrypt")
|
||||||
|
key_name: str = PydanticField(default="my_key", description="Key identifier used for encryption")
|
||||||
|
|
||||||
|
class RedactConfig(Config):
|
||||||
|
type: Literal["redact"] = "redact"
|
||||||
|
columns: List[str] = PydanticField(default=["example_column"], description="Columns to redact")
|
||||||
|
|
||||||
|
class ReplaceConfig(Config):
|
||||||
|
type: Literal["replace"] = "replace"
|
||||||
|
columns: List[str] = PydanticField(default=["example_column"], description="Columns to replace")
|
||||||
|
new_value: str = PydanticField(default="REPLACED", description="Replacement value")
|
||||||
|
|
||||||
|
class PseudoTechniqueConfig(Config):
|
||||||
|
technique: Union[HashConfig, EncryptConfig, RedactConfig, ReplaceConfig] = PydanticField(
|
||||||
|
default={"hash": HashConfig().model_dump(exclude={"type"})},
|
||||||
|
discriminator="type"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AnonymisePseudonymizeStructuredConfig(Config):
|
||||||
|
used_function: List[PseudoTechniqueConfig] = PydanticField(
|
||||||
|
default=[{"technique": {"hash": HashConfig().model_dump(exclude={"type"})}}],
|
||||||
|
description=("List of functions to be used on column"),
|
||||||
|
)
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def ensure_unique_columns(self):
|
||||||
|
column_to_techniques = self._collect_column_to_techniques()
|
||||||
|
duplicates = {
|
||||||
|
col: techs for col, techs in column_to_techniques.items() if len(techs) > 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if duplicates:
|
||||||
|
formatted = "; ".join(
|
||||||
|
f"{col} -> {', '.join(techs)}" for col, techs in duplicates.items()
|
||||||
|
)
|
||||||
|
raise ValueError(f"Duplicate column(s) across techniques not allowed:\n{formatted}")
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _collect_column_to_techniques(self):
|
||||||
|
"""Extract column-to-techniques mapping from used_function list."""
|
||||||
|
column_to_techniques = {}
|
||||||
|
for f in self.used_function:
|
||||||
|
technique_type, cols = self._extract_technique_and_columns(f)
|
||||||
|
for col in cols:
|
||||||
|
column_to_techniques.setdefault(col, []).append(technique_type)
|
||||||
|
return column_to_techniques
|
||||||
|
|
||||||
|
def _extract_technique_and_columns(self, item):
|
||||||
|
"""Extract technique type and columns list from a PseudoTechniqueConfig item (dict or model instance)."""
|
||||||
|
if isinstance(item, dict):
|
||||||
|
tech = item.get("technique") or {}
|
||||||
|
if isinstance(tech, dict):
|
||||||
|
if "type" in tech:
|
||||||
|
return tech.get("type"), tech.get("columns") or []
|
||||||
|
elif len(tech) == 1:
|
||||||
|
# variant-key mapping: {'hash': {...}}
|
||||||
|
technique_type, inner = next(iter(tech.items()))
|
||||||
|
return technique_type, inner.get("columns") or []
|
||||||
|
return None, []
|
||||||
|
else:
|
||||||
|
# item is a PseudoTechniqueConfig instance
|
||||||
|
technique_type = item.technique.type
|
||||||
|
cols = getattr(item.technique, "columns", [])
|
||||||
|
return technique_type, cols
|
||||||
|
|
||||||
|
class DecryptConfig(Config):
|
||||||
|
type: Literal["decrypt"] = "decrypt"
|
||||||
|
columns: List[str] = PydanticField(default=["example_column"], description="Columns to decrypt")
|
||||||
|
key_name: str = PydanticField(default="my_key", description="Key identifier used for decryption")
|
||||||
|
|
||||||
|
class DepseudoTechniqueConfig(Config):
|
||||||
|
technique: DecryptConfig = PydanticField(default={"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})})
|
||||||
|
|
||||||
|
|
||||||
|
class DepseudonymizeStructuredConfig(Config):
|
||||||
|
used_function: List[DepseudoTechniqueConfig] = PydanticField(
|
||||||
|
default=[{"technique": {"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})}}],
|
||||||
|
description=("Decryption functions to be used on column"),
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator("used_function", mode="before")
|
||||||
|
def _normalize_depseudo_used_function(cls, v):
|
||||||
|
normalized = []
|
||||||
|
for item in v:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
normalized.append(DepseudoTechniqueConfig.model_validate(item))
|
||||||
|
else:
|
||||||
|
normalized.append(item)
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def ensure_unique_columns(self):
|
||||||
|
# For depseudonymize, we don't have per-column uniqueness constraints,
|
||||||
|
# but keep a no-op validator to preserve API parity.
|
||||||
|
return self
|
||||||
@@ -0,0 +1,115 @@
|
|||||||
|
from typing import List, Literal, Optional, Union
|
||||||
|
|
||||||
|
from dagster import Config
|
||||||
|
from pydantic import Field as PydanticField, model_validator, field_validator
|
||||||
|
from .languages import LanguageEnum
|
||||||
|
from .pii_entities import PIIEntityEnum
|
||||||
|
|
||||||
|
|
||||||
|
class HashConfig(Config):
|
||||||
|
type: Literal["hash"] = "hash"
|
||||||
|
pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to hash")
|
||||||
|
algorithm: str = PydanticField(default="sha256", description="Hashing algorithm")
|
||||||
|
|
||||||
|
class EncryptConfig(Config):
|
||||||
|
type: Literal["encrypt"] = "encrypt"
|
||||||
|
pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to encrypt")
|
||||||
|
key_name: str = PydanticField(default="my_key", description="Key identifier used for encryption")
|
||||||
|
|
||||||
|
|
||||||
|
class RedactConfig(Config):
|
||||||
|
type: Literal["redact"] = "redact"
|
||||||
|
pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to redact")
|
||||||
|
|
||||||
|
class ReplaceConfig(Config):
|
||||||
|
type: Literal["replace"] = "replace"
|
||||||
|
pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to replace")
|
||||||
|
new_value: str = PydanticField(default="REPLACED", description="Replacement value")
|
||||||
|
|
||||||
|
class RetainConfig(Config):
|
||||||
|
type: Literal["retain"] = "retain"
|
||||||
|
pii: List[PIIEntityEnum] = PydanticField(default=[PIIEntityEnum.EMAIL.name], description="PII entities to retain")
|
||||||
|
|
||||||
|
class PseudoTechniqueConfig(Config):
|
||||||
|
technique: Union[HashConfig, EncryptConfig, RedactConfig, ReplaceConfig, RetainConfig] = PydanticField(
|
||||||
|
default={"hash": HashConfig().model_dump(exclude={"type"})},
|
||||||
|
discriminator="type"
|
||||||
|
)
|
||||||
|
|
||||||
|
class AnonymisePseudonymizeUnstructuredConfig(Config):
|
||||||
|
language: LanguageEnum = PydanticField(
|
||||||
|
default=LanguageEnum.en,
|
||||||
|
description="Language code (must be one of: hr, da, nl, en, fi, fr, de, el, it, lt, pl, pt, ro, sl, es, sv)"
|
||||||
|
|
||||||
|
)
|
||||||
|
used_function: List[PseudoTechniqueConfig] = PydanticField(
|
||||||
|
default=[{"technique": {"hash": HashConfig().model_dump(exclude={"type"})}}],
|
||||||
|
description=("List of functions to be used on PIIs"),
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator("used_function", mode="before")
|
||||||
|
def _normalize_used_function(cls, v):
|
||||||
|
normalized = []
|
||||||
|
for item in v:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
normalized.append(PseudoTechniqueConfig.model_validate(item))
|
||||||
|
else:
|
||||||
|
normalized.append(item)
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def ensure_unique_pii(self):
|
||||||
|
pii_to_techniques = self._collect_pii_to_techniques()
|
||||||
|
duplicates = {
|
||||||
|
pii: techs for pii, techs in pii_to_techniques.items() if len(techs) > 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if duplicates:
|
||||||
|
formatted = "; ".join(
|
||||||
|
f"{pii} -> {', '.join(techs)}" for pii, techs in duplicates.items()
|
||||||
|
)
|
||||||
|
raise ValueError(f"Duplicate PII(s) across techniques not allowed:\n{formatted}")
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _collect_pii_to_techniques(self):
|
||||||
|
"""Extract PII-to-techniques mapping from used_function list."""
|
||||||
|
pii_to_techniques = {}
|
||||||
|
for f in self.used_function:
|
||||||
|
technique_type, piis = self._extract_technique_and_pii(f)
|
||||||
|
for pii in piis:
|
||||||
|
pii_to_techniques.setdefault(pii, []).append(technique_type)
|
||||||
|
return pii_to_techniques
|
||||||
|
|
||||||
|
def _extract_technique_and_pii(self, item):
|
||||||
|
"""Extract technique type and PII list from a PseudoTechniqueConfig item (dict or model instance)."""
|
||||||
|
if isinstance(item, dict):
|
||||||
|
tech = item.get("technique") or {}
|
||||||
|
if isinstance(tech, dict):
|
||||||
|
if "type" in tech:
|
||||||
|
return tech.get("type"), tech.get("pii") or tech.get("columns") or []
|
||||||
|
elif len(tech) == 1:
|
||||||
|
# variant-key mapping: {'hash': {...}}
|
||||||
|
technique_type, inner = next(iter(tech.items()))
|
||||||
|
return technique_type, inner.get("pii") or inner.get("columns") or []
|
||||||
|
return None, []
|
||||||
|
else:
|
||||||
|
# item is a PseudoTechniqueConfig instance
|
||||||
|
technique_type = item.technique.type
|
||||||
|
piis = getattr(item.technique, "pii", []) or getattr(item.technique, "columns", [])
|
||||||
|
return technique_type, piis
|
||||||
|
|
||||||
|
class DecryptConfig(Config):
|
||||||
|
type: Literal["decrypt"] = "decrypt"
|
||||||
|
key_name: str = PydanticField(default="my_key", description="Key identifier used for decryption")
|
||||||
|
|
||||||
|
class DepseudoTechniqueConfig(Config):
|
||||||
|
technique: DecryptConfig = PydanticField(
|
||||||
|
default={"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})},
|
||||||
|
)
|
||||||
|
|
||||||
|
class DepseudonymizeUnstructuredConfig(Config):
|
||||||
|
used_function: List[DepseudoTechniqueConfig] = PydanticField(
|
||||||
|
default=[{"technique": {"type": "decrypt", **DecryptConfig().model_dump(exclude={"type"})}}],
|
||||||
|
description=("Decryption function"),
|
||||||
|
)
|
||||||
@@ -0,0 +1,126 @@
|
|||||||
|
from dagster import job
|
||||||
|
from util_services.util_ops import (
|
||||||
|
preview_dataframe,
|
||||||
|
read_structured_to_df,
|
||||||
|
write_df_to_local,
|
||||||
|
write_string_to_txt,
|
||||||
|
read_txt_to_string,
|
||||||
|
preview_txt,
|
||||||
|
read_structured_from_s3,
|
||||||
|
write_df_to_s3,
|
||||||
|
read_txt_from_s3,
|
||||||
|
write_text_to_s3,
|
||||||
|
)
|
||||||
|
from .ops import (
|
||||||
|
anonymize_pseudonymize_structured,
|
||||||
|
depseudonymize_structured,
|
||||||
|
)
|
||||||
|
from .unstructured_ops import (
|
||||||
|
anonymize_pseudonymize_unstructured,
|
||||||
|
depseudonymize_unstructured,
|
||||||
|
)
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION_PSEUDONYMISATION"
|
||||||
|
})
|
||||||
|
def anonymize_pseudonymize_structured_job():
|
||||||
|
df = read_structured_to_df()
|
||||||
|
preview_dataframe(df)
|
||||||
|
df_anon, metrics = anonymize_pseudonymize_structured(df)
|
||||||
|
preview_dataframe(df_anon)
|
||||||
|
write_df_to_local(df_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION_PSEUDONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def anonymize_pseudonymize_structured_job_s3():
|
||||||
|
df = read_structured_from_s3()
|
||||||
|
preview_dataframe(df)
|
||||||
|
df_anon, metrics = anonymize_pseudonymize_structured(df)
|
||||||
|
preview_dataframe(df_anon)
|
||||||
|
write_df_to_s3(df_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "DEPSEUDONYMISATION"
|
||||||
|
})
|
||||||
|
def depseudonymize_structured_job():
|
||||||
|
df = read_structured_to_df()
|
||||||
|
preview_dataframe(df)
|
||||||
|
df_anon, metrics = depseudonymize_structured(df)
|
||||||
|
preview_dataframe(df_anon)
|
||||||
|
write_df_to_local(df_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "DEPSEUDONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def depseudonymize_structured_job_s3():
|
||||||
|
df = read_structured_from_s3()
|
||||||
|
preview_dataframe(df)
|
||||||
|
df_anon, metrics = depseudonymize_structured(df)
|
||||||
|
preview_dataframe(df_anon)
|
||||||
|
write_df_to_s3(df_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION_PSEUDONYMISATION"
|
||||||
|
})
|
||||||
|
def anonymize_pseudonymize_depseudonymize_structured_job():
|
||||||
|
df = read_structured_to_df()
|
||||||
|
preview_dataframe(df)
|
||||||
|
df_pseduo, metrics = anonymize_pseudonymize_structured(df)
|
||||||
|
preview_dataframe(df_pseduo)
|
||||||
|
df_depseduo, metrics = depseudonymize_structured(df_pseduo)
|
||||||
|
preview_dataframe(df_depseduo)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION_PSEUDONYMISATION"
|
||||||
|
})
|
||||||
|
def anonymize_pseudonymize_unstructured_job():
|
||||||
|
text = read_txt_to_string()
|
||||||
|
preview_txt(text)
|
||||||
|
text_anon, metrics = anonymize_pseudonymize_unstructured(text)
|
||||||
|
preview_txt(text_anon)
|
||||||
|
preview_txt(metrics)
|
||||||
|
write_string_to_txt(text_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "ANONYMISATION_PSEUDONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def anonymize_pseudonymize_unstructured_job_s3():
|
||||||
|
text = read_txt_from_s3()
|
||||||
|
preview_txt(text)
|
||||||
|
text_anon, metrics = anonymize_pseudonymize_unstructured(text)
|
||||||
|
preview_txt(text_anon)
|
||||||
|
preview_txt(metrics)
|
||||||
|
write_text_to_s3(text_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "DEPSEUDONYMISATION"
|
||||||
|
})
|
||||||
|
def depseudonymize_unstructured_job():
|
||||||
|
text = read_txt_to_string()
|
||||||
|
preview_txt(text)
|
||||||
|
text_anon, metrics = depseudonymize_unstructured(text)
|
||||||
|
preview_txt(text_anon)
|
||||||
|
write_string_to_txt(text_anon)
|
||||||
|
|
||||||
|
|
||||||
|
@job(tags={
|
||||||
|
"business_operation": "DEPSEUDONYMISATION",
|
||||||
|
"resource_type": "RD_DATA"
|
||||||
|
})
|
||||||
|
def depseudonymize_unstructured_job_s3():
|
||||||
|
text = read_txt_from_s3()
|
||||||
|
preview_txt(text)
|
||||||
|
text_anon, metrics = depseudonymize_unstructured(text)
|
||||||
|
preview_txt(text_anon)
|
||||||
|
write_text_to_s3(text_anon)
|
||||||
@@ -0,0 +1,77 @@
|
|||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
from dagster import Out, Output, op
|
||||||
|
from cryptography.fernet import InvalidToken
|
||||||
|
from template_code_location.field_level_pseudo_anonymisation.config_models import (
|
||||||
|
AnonymisePseudonymizeStructuredConfig,
|
||||||
|
DepseudonymizeStructuredConfig,
|
||||||
|
)
|
||||||
|
from template_code_location.field_level_pseudo_anonymisation.techniques import (
|
||||||
|
anonymisation_pseudonymisation_techniques as anon_pseudo_funcs,
|
||||||
|
)
|
||||||
|
import template_code_location.field_level_pseudo_anonymisation.techniques.depseudonymisation_techniques as depseudo_funcs
|
||||||
|
from .utils import create_get_encryption_key
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_column_wise_function(config, df, funcs):
|
||||||
|
for used_function in config.used_function:
|
||||||
|
func_name = used_function.technique.type
|
||||||
|
columns = used_function.technique.columns
|
||||||
|
func = getattr(funcs, func_name)
|
||||||
|
params = used_function.technique.model_dump()
|
||||||
|
del params["type"]
|
||||||
|
del params["columns"]
|
||||||
|
|
||||||
|
if func_name in ["encrypt", "decrypt"]:
|
||||||
|
key_name = used_function.technique.key_name
|
||||||
|
del params["key_name"]
|
||||||
|
params["key"] = create_get_encryption_key(func_name, key_name)
|
||||||
|
|
||||||
|
missing = [col for col in columns if col not in df.columns]
|
||||||
|
if missing:
|
||||||
|
raise ValueError(
|
||||||
|
f"The following columns required by technique '{func_name}' "
|
||||||
|
f"are not present in the DataFrame: {', '.join(missing)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Skip processing if DataFrame is empty
|
||||||
|
if len(df) == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for column in columns:
|
||||||
|
try:
|
||||||
|
vectorized_func = np.vectorize(lambda x: func(x, **params))
|
||||||
|
df[column] = vectorized_func(df[column].to_numpy())
|
||||||
|
except InvalidToken:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid Fernet token while decrypting column '{column}' "
|
||||||
|
f"using key '{key_name}'. The data may not be encrypted "
|
||||||
|
f"or the key may be incorrect. "
|
||||||
|
)
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def anonymize_pseudonymize_structured(
|
||||||
|
context, config: AnonymisePseudonymizeStructuredConfig, df: pd.DataFrame
|
||||||
|
):
|
||||||
|
|
||||||
|
df = _apply_column_wise_function(config, df, anon_pseudo_funcs)
|
||||||
|
yield Output(
|
||||||
|
value=df,
|
||||||
|
metadata={},
|
||||||
|
output_name="data",
|
||||||
|
)
|
||||||
|
yield Output(value={}, output_name="metrics")
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def depseudonymize_structured(context, config: DepseudonymizeStructuredConfig, df: pd.DataFrame):
|
||||||
|
|
||||||
|
df = _apply_column_wise_function(config, df, depseudo_funcs)
|
||||||
|
yield Output(
|
||||||
|
value=df,
|
||||||
|
metadata={},
|
||||||
|
output_name="data",
|
||||||
|
)
|
||||||
|
yield Output(value={}, output_name="metrics")
|
||||||
@@ -0,0 +1,3 @@
|
|||||||
|
from .anonymisation_pseudonymisation_techniques import hash, redact, replace, encrypt # noqa: F401
|
||||||
|
|
||||||
|
from .depseudonymisation_techniques import decrypt # noqa: F401
|
||||||
@@ -0,0 +1,42 @@
|
|||||||
|
import hashlib
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
|
|
||||||
|
def hash(value: str, algorithm: str = "sha256") -> str:
|
||||||
|
"""
|
||||||
|
Hash the value using the specified algorithm (default: SHA-256).
|
||||||
|
"""
|
||||||
|
value = str(value)
|
||||||
|
hash_func = hashlib.new(algorithm)
|
||||||
|
hash_func.update(value.encode("utf-8"))
|
||||||
|
return hash_func.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def redact(value: str) -> str:
|
||||||
|
"""
|
||||||
|
Redact the column and return an empty string
|
||||||
|
"""
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def replace(value: str, new_value) -> str:
|
||||||
|
"""
|
||||||
|
Replace the value column with the provided value
|
||||||
|
"""
|
||||||
|
return new_value
|
||||||
|
|
||||||
|
|
||||||
|
def encrypt(value: str, key: bytes) -> str:
|
||||||
|
"""
|
||||||
|
Encrypt the value using the provided Fernet key.
|
||||||
|
"""
|
||||||
|
value = str(value)
|
||||||
|
f = Fernet(key)
|
||||||
|
return f.encrypt(value.encode()).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def retain(value: str) -> str:
|
||||||
|
"""
|
||||||
|
Retain the original value without any changes.
|
||||||
|
"""
|
||||||
|
return value
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
|
|
||||||
|
def decrypt(value: str, key: bytes) -> str:
|
||||||
|
"""
|
||||||
|
Decrypt a string using the provided Fernet key.
|
||||||
|
"""
|
||||||
|
f = Fernet(key)
|
||||||
|
return f.decrypt(value.encode()).decode()
|
||||||
@@ -0,0 +1,428 @@
|
|||||||
|
import importlib
|
||||||
|
import importlib.abc
|
||||||
|
import importlib.machinery
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Stub out the `transformers` and `spacy_transformers` packages before any
|
||||||
|
# other import triggers spaCy's entry-point scan or scrubadub_spacy's runtime
|
||||||
|
# import of spacy_transformers.pipeline_component.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
_STUB_PACKAGES = ("transformers", "spacy_transformers")
|
||||||
|
|
||||||
|
|
||||||
|
class _StubModule(types.ModuleType):
|
||||||
|
"""Module that returns a dummy class for any attribute access."""
|
||||||
|
|
||||||
|
def __getattr__(self, name: str):
|
||||||
|
return type(name, (), {})
|
||||||
|
|
||||||
|
|
||||||
|
class _StubFinder(importlib.abc.MetaPathFinder):
|
||||||
|
"""Intercept any import under the stubbed packages and return a stub module."""
|
||||||
|
|
||||||
|
def find_spec(self, fullname, path=None, target=None): # noqa: ANN001
|
||||||
|
for pkg in _STUB_PACKAGES:
|
||||||
|
if fullname == pkg or fullname.startswith(pkg + "."):
|
||||||
|
return importlib.machinery.ModuleSpec(fullname, _StubLoader())
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class _StubLoader(importlib.abc.Loader):
|
||||||
|
def create_module(self, spec): # noqa: ANN001
|
||||||
|
mod = _StubModule(spec.name)
|
||||||
|
mod.__path__ = [] # mark as package
|
||||||
|
mod.__spec__ = spec
|
||||||
|
return mod
|
||||||
|
|
||||||
|
def exec_module(self, module): # noqa: ANN001
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Install the finder once, before scrubadub / spacy are imported.
|
||||||
|
if not any(isinstance(f, _StubFinder) for f in sys.meta_path):
|
||||||
|
sys.meta_path.insert(0, _StubFinder())
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
import scrubadub # noqa: E402
|
||||||
|
import scrubadub_spacy # noqa: E402
|
||||||
|
from cryptography.fernet import InvalidToken # noqa: E402
|
||||||
|
from dagster import Out, Output, get_dagster_logger, op # noqa: E402
|
||||||
|
from scrubadub.detectors import RegexDetector # noqa: E402
|
||||||
|
from scrubadub.filth import CredentialFilth, NameFilth # noqa: E402
|
||||||
|
|
||||||
|
from template_code_location.field_level_pseudo_anonymisation.techniques import (
|
||||||
|
anonymisation_pseudonymisation_techniques as anon_pseudo_funcs,
|
||||||
|
)
|
||||||
|
from template_code_location.field_level_pseudo_anonymisation.techniques import (
|
||||||
|
depseudonymisation_techniques as depseudo_funcs,
|
||||||
|
)
|
||||||
|
|
||||||
|
from .config_models import (
|
||||||
|
PII_MAPPING,
|
||||||
|
AnonymisePseudonymizeUnstructuredConfig,
|
||||||
|
DepseudonymizeUnstructuredConfig,
|
||||||
|
PIIEntityEnum,
|
||||||
|
PseudoTechniqueConfig,
|
||||||
|
SupportedLanguages,
|
||||||
|
)
|
||||||
|
from .utils import create_get_encryption_key
|
||||||
|
|
||||||
|
|
||||||
|
def _initialize_scrubber(language: str) -> scrubadub.Scrubber:
|
||||||
|
class SIMPLCredentialDetector(RegexDetector):
|
||||||
|
"""
|
||||||
|
Remove username/password combinations from dirty ``text``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
filth_cls = CredentialFilth
|
||||||
|
name = "credential"
|
||||||
|
autoload = True
|
||||||
|
|
||||||
|
regex = re.compile(
|
||||||
|
r"""
|
||||||
|
(?:username|login|u:)\s*(?::\s*)?
|
||||||
|
(?P<username>[\w.\-@+]+)
|
||||||
|
[\s\S]{0,500}?
|
||||||
|
(?:password|pw|p:)\s*(?::\s*)?
|
||||||
|
(?P<password>[^\s]+)
|
||||||
|
""",
|
||||||
|
re.MULTILINE | re.VERBOSE | re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
locale = SupportedLanguages.get_locale(language)
|
||||||
|
scrubber = scrubadub.Scrubber(locale=locale)
|
||||||
|
|
||||||
|
model_name = SupportedLanguages.get_language_model(language)
|
||||||
|
spacy_detector = scrubadub_spacy.detectors.SpacyEntityDetector(model=model_name)
|
||||||
|
spacy_detector.named_entities = {
|
||||||
|
"PERSON",
|
||||||
|
"PER",
|
||||||
|
"ORG",
|
||||||
|
"persName",
|
||||||
|
"PRS",
|
||||||
|
} # Need to set it after the constructor because scrubadub_spacy uses upper on all entries
|
||||||
|
spacy_detector.filth_cls_map["persName"] = NameFilth # Required because PL uses persName
|
||||||
|
spacy_detector.filth_cls_map["PRS"] = NameFilth # Required for swedish that uses PRS
|
||||||
|
scrubber.add_detector(spacy_detector)
|
||||||
|
if language in ["en", "de"]:
|
||||||
|
scrubber.add_detector(
|
||||||
|
scrubadub.detectors.DateOfBirthDetector
|
||||||
|
) # add optional data of birth detector
|
||||||
|
scrubber.remove_detector(
|
||||||
|
scrubadub.detectors.CredentialDetector
|
||||||
|
) # remove the not so great credentials detector and replace with custom SIMPL one
|
||||||
|
scrubber.add_detector(SIMPLCredentialDetector())
|
||||||
|
return scrubber
|
||||||
|
|
||||||
|
|
||||||
|
def _map_filth_to_pii_enum(filth) -> PIIEntityEnum | None:
|
||||||
|
cls_name = filth.__class__.__name__
|
||||||
|
for pii_enum, filth_name in PII_MAPPING.items():
|
||||||
|
if filth_name == cls_name:
|
||||||
|
return pii_enum
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_metrics(metrics_dict: dict, language: str) -> str:
|
||||||
|
# Format metrics as Markdown table
|
||||||
|
metrics_report = f"""
|
||||||
|
## PII Anonymization Report
|
||||||
|
|
||||||
|
### Summary
|
||||||
|
- **Total PII Detected**: {metrics_dict['total_pii_detected']}
|
||||||
|
- **Original Length**: {metrics_dict['text_length_original']} chars
|
||||||
|
- **Anonymized Length**: {metrics_dict['text_length_anonymised']} chars
|
||||||
|
- **Language**: {language}
|
||||||
|
|
||||||
|
### PII by Type
|
||||||
|
| Entity Type | Count |
|
||||||
|
|-------------|-------|
|
||||||
|
"""
|
||||||
|
for pii_type, count in metrics_dict["pii_by_type"].items():
|
||||||
|
metrics_report += f"| {pii_type} | {count} |\n"
|
||||||
|
|
||||||
|
metrics_report += "\n### Techniques Applied\n"
|
||||||
|
for pii, technique in metrics_dict["techniques_applied"].items():
|
||||||
|
metrics_report += f"- **{pii}**: {technique}\n"
|
||||||
|
|
||||||
|
return metrics_report
|
||||||
|
|
||||||
|
|
||||||
|
def _build_metrics_dict(
|
||||||
|
pii_counts: dict[str, int],
|
||||||
|
text: str,
|
||||||
|
anon_text: str,
|
||||||
|
technique_map: dict[PIIEntityEnum, PseudoTechniqueConfig],
|
||||||
|
) -> dict:
|
||||||
|
metrics_dict = {
|
||||||
|
"total_pii_detected": sum(pii_counts.values()),
|
||||||
|
"pii_by_type": pii_counts,
|
||||||
|
"text_length_original": len(text),
|
||||||
|
"text_length_anonymised": len(anon_text),
|
||||||
|
"techniques_applied": {
|
||||||
|
pii.name: technique_map[pii].technique.type for pii in technique_map.keys()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics_dict
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def anonymize_pseudonymize_unstructured(
|
||||||
|
context, config: AnonymisePseudonymizeUnstructuredConfig, text: str
|
||||||
|
):
|
||||||
|
logger = get_dagster_logger()
|
||||||
|
|
||||||
|
if text is None or not text.strip():
|
||||||
|
raise ValueError("Input text cannot be None or empty")
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Starting unstructured PII anonymization | lang={config.language.value} "
|
||||||
|
f"| input_chars={len(text)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# --- Filth detection ---
|
||||||
|
try:
|
||||||
|
scrubber = _initialize_scrubber(config.language.value)
|
||||||
|
filths = list(scrubber.iter_filth(text))
|
||||||
|
logger.info(f"Detected {len(filths)} potential PII entities before filtering.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Scrubber initialization/detection failed | lang={config.language.value}")
|
||||||
|
raise RuntimeError(f"PII detection failed for language '{config.language.value}'") from e
|
||||||
|
|
||||||
|
# --- Build technique routing map ---
|
||||||
|
technique_map = _build_technique_map(config)
|
||||||
|
logger.debug(
|
||||||
|
"Technique map constructed: "
|
||||||
|
+ ", ".join(f"{pii.name}->{cfg.technique.type}" for pii, cfg in technique_map.items())
|
||||||
|
)
|
||||||
|
|
||||||
|
replacements = []
|
||||||
|
key_cache = {}
|
||||||
|
pii_counts = {}
|
||||||
|
|
||||||
|
# --- Process filths ---
|
||||||
|
for idx, filth in enumerate(filths, start=1):
|
||||||
|
pii_enum = _map_filth_to_pii_enum(filth)
|
||||||
|
|
||||||
|
if pii_enum is None:
|
||||||
|
logger.debug(f"[{idx}] Skipping unknown filth class={filth.__class__.__name__}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
start_idx, end_idx = _extract_span(filth, logger, idx)
|
||||||
|
if start_idx is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
original_value = text[start_idx:end_idx]
|
||||||
|
technique_cfg = technique_map.get(pii_enum)
|
||||||
|
|
||||||
|
# No technique configured
|
||||||
|
if technique_cfg is None:
|
||||||
|
_handle_missing_technique(
|
||||||
|
pii_enum,
|
||||||
|
start_idx,
|
||||||
|
end_idx,
|
||||||
|
text,
|
||||||
|
pii_counts,
|
||||||
|
replacements,
|
||||||
|
logger,
|
||||||
|
idx,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Apply configured technique
|
||||||
|
t = technique_cfg.technique
|
||||||
|
params = _prepare_params(t, key_cache, idx, logger)
|
||||||
|
replacement = _apply_technique(original_value, t.type, params, pii_enum, idx, logger)
|
||||||
|
|
||||||
|
replacements.append((start_idx, end_idx, replacement))
|
||||||
|
pii_counts[pii_enum.name] = pii_counts.get(pii_enum.name, 0) + 1
|
||||||
|
|
||||||
|
# --- Apply replacements ---
|
||||||
|
anon_text = _apply_replacements(text, replacements, logger)
|
||||||
|
|
||||||
|
logger.info(f"Anonymisation completed, total PII counts: {pii_counts}")
|
||||||
|
|
||||||
|
metrics_report = _get_metrics(
|
||||||
|
_build_metrics_dict(pii_counts, text, anon_text, technique_map),
|
||||||
|
config.language.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield Output(value=anon_text, output_name="data")
|
||||||
|
yield Output(value=metrics_report, output_name="metrics")
|
||||||
|
|
||||||
|
|
||||||
|
@op(out={"data": Out(), "metrics": Out()})
|
||||||
|
def depseudonymize_unstructured(context, config: DepseudonymizeUnstructuredConfig, input_text: str):
|
||||||
|
|
||||||
|
input_restored, metrics = _apply_depseudonimisation_function(config, input_text, depseudo_funcs)
|
||||||
|
yield Output(
|
||||||
|
value=input_restored,
|
||||||
|
metadata={},
|
||||||
|
output_name="data",
|
||||||
|
)
|
||||||
|
yield Output(value=metrics, output_name="metrics")
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_depseudonimisation_function(config, input_text: str, funcs_module):
|
||||||
|
"""
|
||||||
|
Searches and depseudonymizes text segments formatted as:
|
||||||
|
{technique:pseudonymized_value}
|
||||||
|
"""
|
||||||
|
|
||||||
|
total_depseudo_count = 0
|
||||||
|
depseudonimized_text = input_text # Initialize with input text
|
||||||
|
|
||||||
|
# Loop through each depseudonymisation technique defined in the config
|
||||||
|
for used_function in config.used_function:
|
||||||
|
func_name = used_function.technique.type
|
||||||
|
func = getattr(funcs_module, func_name)
|
||||||
|
pseudo_anon_func = ""
|
||||||
|
|
||||||
|
# Prepare parameters
|
||||||
|
params = used_function.technique.model_dump()
|
||||||
|
del params["type"]
|
||||||
|
|
||||||
|
if func_name == "decrypt":
|
||||||
|
key_name = used_function.technique.key_name
|
||||||
|
del params["key_name"]
|
||||||
|
pseudo_anon_func = "encrypt"
|
||||||
|
params["key"] = create_get_encryption_key(func_name, key_name)
|
||||||
|
|
||||||
|
# Regex pattern for this technique, e.g. {encrypt:...}
|
||||||
|
pattern = rf"\{{{pseudo_anon_func}:([^}}]+)\}}"
|
||||||
|
|
||||||
|
def replace_match(match):
|
||||||
|
nonlocal total_depseudo_count
|
||||||
|
pseudovalue = match.group(1)
|
||||||
|
total_depseudo_count += 1
|
||||||
|
try:
|
||||||
|
return func(pseudovalue, **params)
|
||||||
|
except InvalidToken:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid Fernet token while decrypting value using key '{key_name}'. "
|
||||||
|
f"The data may not be encrypted or the key may be incorrect."
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"Error during depseudonymisation with '{func_name}': {e}")
|
||||||
|
|
||||||
|
# Apply replacements for this technique
|
||||||
|
depseudonimized_text = re.sub(pattern, replace_match, depseudonimized_text)
|
||||||
|
|
||||||
|
yield depseudonimized_text
|
||||||
|
yield {"total_depseudo_count": total_depseudo_count}
|
||||||
|
|
||||||
|
|
||||||
|
def _build_technique_map(config):
|
||||||
|
technique_map = {}
|
||||||
|
for func_cfg in config.used_function:
|
||||||
|
for pii in func_cfg.technique.pii:
|
||||||
|
technique_map[pii] = func_cfg
|
||||||
|
return technique_map
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_span(filth, logger, idx):
|
||||||
|
start_idx = getattr(filth, "beg", getattr(filth, "start", None))
|
||||||
|
end_idx = getattr(filth, "end", None)
|
||||||
|
if start_idx is None or end_idx is None:
|
||||||
|
logger.debug(f"[{idx}] Filth missing span attributes; skipping.")
|
||||||
|
return None, None
|
||||||
|
return start_idx, end_idx
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_missing_technique(
|
||||||
|
pii_enum, start_idx, end_idx, text, pii_counts, replacements, logger, idx
|
||||||
|
):
|
||||||
|
original_value = text[start_idx:end_idx]
|
||||||
|
logger.debug(
|
||||||
|
f"[{idx}] PII={pii_enum.name} span=({start_idx},{end_idx}) value={original_value} "
|
||||||
|
f"- No technique configured, using placeholder"
|
||||||
|
)
|
||||||
|
placeholder = f"{{{{{pii_enum.name}}}}}"
|
||||||
|
replacements.append((start_idx, end_idx, placeholder))
|
||||||
|
pii_counts[pii_enum.name] = pii_counts.get(pii_enum.name, 0) + 1
|
||||||
|
|
||||||
|
|
||||||
|
def _prepare_params(t, key_cache, idx, logger):
|
||||||
|
params = t.model_dump()
|
||||||
|
del params["type"]
|
||||||
|
del params["pii"]
|
||||||
|
|
||||||
|
if t.type == "encrypt":
|
||||||
|
try:
|
||||||
|
if t.key_name not in key_cache:
|
||||||
|
logger.debug(
|
||||||
|
f"[{idx}] Retrieving/generating Vault key name={t.key_name} for encryption"
|
||||||
|
)
|
||||||
|
key_cache[t.key_name] = create_get_encryption_key("encrypt", t.key_name)
|
||||||
|
params["key"] = key_cache[t.key_name]
|
||||||
|
del params["key_name"]
|
||||||
|
logger.debug(f"[{idx}] Encryption key prepared")
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Encryption key retrieval failed for key '{t.key_name}': {type(e).__name__}"
|
||||||
|
) from e
|
||||||
|
|
||||||
|
return params
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_technique(original_value, t_type, params, pii_enum, idx, logger):
|
||||||
|
try:
|
||||||
|
func = getattr(anon_pseudo_funcs, t_type)
|
||||||
|
replacement = func(original_value, **params)
|
||||||
|
|
||||||
|
if t_type == "encrypt":
|
||||||
|
replacement = f"{{encrypt:{replacement}}}"
|
||||||
|
|
||||||
|
logger.debug(f"[{idx}] {t_type.capitalize()} complete")
|
||||||
|
return replacement
|
||||||
|
|
||||||
|
except AttributeError:
|
||||||
|
logger.warning(f"[{idx}] Technique '{t_type}' not recognized; inserting placeholder.")
|
||||||
|
return f"{{UNIMPL_{t_type}_{pii_enum.name}}}"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Technique '{t_type}' failed for PII type '{pii_enum.name}': {type(e).__name__}"
|
||||||
|
) from e
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_replacements(text, replacements, logger):
|
||||||
|
if not replacements:
|
||||||
|
logger.info("No PII detected; returning original text.")
|
||||||
|
return text
|
||||||
|
|
||||||
|
logger.debug(f"Applying {len(replacements)} replacements to text body.")
|
||||||
|
replacements.sort(key=lambda r: r[0])
|
||||||
|
|
||||||
|
# Detect overlaps
|
||||||
|
for i in range(len(replacements) - 1):
|
||||||
|
if replacements[i][1] > replacements[i + 1][0]:
|
||||||
|
logger.warning(
|
||||||
|
f"Overlapping PII detected at positions "
|
||||||
|
f"({replacements[i][0]},{replacements[i][1]}) "
|
||||||
|
f"and ({replacements[i+1][0]},{replacements[i+1][1]}). "
|
||||||
|
f"Using first match."
|
||||||
|
)
|
||||||
|
replacements[i + 1] = (
|
||||||
|
replacements[i][1],
|
||||||
|
replacements[i + 1][1],
|
||||||
|
replacements[i + 1][2],
|
||||||
|
)
|
||||||
|
|
||||||
|
result_parts = []
|
||||||
|
last = 0
|
||||||
|
for start, end, repl in replacements:
|
||||||
|
if start < last:
|
||||||
|
continue
|
||||||
|
result_parts.append(text[last:start])
|
||||||
|
result_parts.append(repl)
|
||||||
|
last = end
|
||||||
|
|
||||||
|
result_parts.append(text[last:])
|
||||||
|
return "".join(result_parts)
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
import os
|
||||||
|
import hvac
|
||||||
|
from hvac.exceptions import InvalidPath
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
|
|
||||||
|
def create_get_encryption_key(func_name: str, key_name: str) -> bytes:
|
||||||
|
client = hvac.Client(url=os.getenv("OPENBAO_URL"), token=os.getenv("OPENBAO_TOKEN"))
|
||||||
|
|
||||||
|
secret_folder = os.getenv("ENCRYPTION_KEYS_PATH")
|
||||||
|
secret_path = f"{secret_folder}/{key_name}" if secret_folder else key_name
|
||||||
|
mount_point = os.getenv("ENCRYPTION_KEYS_MOUNT_POINT")
|
||||||
|
|
||||||
|
try:
|
||||||
|
secret_response = client.secrets.kv.v2.read_secret_version(
|
||||||
|
path=secret_path, mount_point=mount_point
|
||||||
|
)
|
||||||
|
key_value = secret_response["data"]["data"]["value"]
|
||||||
|
|
||||||
|
except InvalidPath:
|
||||||
|
if func_name == "encrypt":
|
||||||
|
new_key = Fernet.generate_key().decode()
|
||||||
|
client.secrets.kv.v2.create_or_update_secret(
|
||||||
|
path=secret_path, mount_point=mount_point, secret={"value": new_key}
|
||||||
|
)
|
||||||
|
key_value = new_key
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Fernet key '{key_name}' not found in Vault for decrypt.")
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"Error while reading Fernet key '{key_name}': {e}")
|
||||||
|
|
||||||
|
return key_value.encode()
|
||||||
0
src/template_code_location/jobs/__init__.py
Normal file
0
src/template_code_location/jobs/__init__.py
Normal file
0
src/template_code_location/ops/__init__.py
Normal file
0
src/template_code_location/ops/__init__.py
Normal file
65
src/template_code_location/repository.py
Normal file
65
src/template_code_location/repository.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
from dagster import Definitions
|
||||||
|
from util_services.resources import s3_resource
|
||||||
|
from util_services.sensors import (
|
||||||
|
notify_success,
|
||||||
|
notify_failure,
|
||||||
|
notify_canceled
|
||||||
|
)
|
||||||
|
from util_services.custom_json_logger import simpl_json_logger
|
||||||
|
|
||||||
|
# Data processing jobs
|
||||||
|
from template_code_location.data_processing.jobs import (
|
||||||
|
remove_duplicates_job_s3,
|
||||||
|
fill_missing_values_job_s3,
|
||||||
|
standardize_categorical_values_job_s3,
|
||||||
|
correct_typos_job_s3,
|
||||||
|
normalize_numeric_min_max_job_s3,
|
||||||
|
normalize_datetime_job_s3,
|
||||||
|
normalize_coordinates_job_s3,
|
||||||
|
add_global_aggregations_job_s3,
|
||||||
|
filter_dataset_job_s3,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Dataframe-level anonymisation jobs
|
||||||
|
from template_code_location.dataframe_level_anonymisation.jobs import (
|
||||||
|
k_anonymity_job_s3,
|
||||||
|
l_diversity_job_s3,
|
||||||
|
t_closeness_job_s3,
|
||||||
|
read_write_semistructured_job_s3,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Field-level pseudo-anonymisation jobs
|
||||||
|
from template_code_location.field_level_pseudo_anonymisation.jobs import (
|
||||||
|
anonymize_pseudonymize_structured_job_s3,
|
||||||
|
depseudonymize_structured_job_s3,
|
||||||
|
anonymize_pseudonymize_unstructured_job_s3,
|
||||||
|
depseudonymize_unstructured_job_s3,
|
||||||
|
)
|
||||||
|
|
||||||
|
defs = Definitions(
|
||||||
|
jobs=[
|
||||||
|
# Data processing
|
||||||
|
remove_duplicates_job_s3,
|
||||||
|
fill_missing_values_job_s3,
|
||||||
|
standardize_categorical_values_job_s3,
|
||||||
|
correct_typos_job_s3,
|
||||||
|
normalize_numeric_min_max_job_s3,
|
||||||
|
normalize_datetime_job_s3,
|
||||||
|
normalize_coordinates_job_s3,
|
||||||
|
add_global_aggregations_job_s3,
|
||||||
|
filter_dataset_job_s3,
|
||||||
|
# Dataframe-level anonymisation
|
||||||
|
k_anonymity_job_s3,
|
||||||
|
l_diversity_job_s3,
|
||||||
|
t_closeness_job_s3,
|
||||||
|
read_write_semistructured_job_s3,
|
||||||
|
# Field-level pseudo-anonymisation
|
||||||
|
anonymize_pseudonymize_structured_job_s3,
|
||||||
|
depseudonymize_structured_job_s3,
|
||||||
|
anonymize_pseudonymize_unstructured_job_s3,
|
||||||
|
depseudonymize_unstructured_job_s3,
|
||||||
|
],
|
||||||
|
sensors=[notify_success, notify_failure, notify_canceled],
|
||||||
|
resources={"s3": s3_resource.configured({"resource_name": "selfS3"})},
|
||||||
|
loggers={"simpl": simpl_json_logger},
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user