acxiom_mapper_transform.py

import schema
import json
import time
import calendar
import aq_utils
import os

batch_id = os.getenv("BATCH_ID")
if batch_id is None:
    raise Exception("Not able to find 'BATCH_ID' environment variable.")

timestamp_value = calendar.timegm(time.gmtime())


def append_dict_to_attributes_list(dictionary, output_rec):
    """
    Appends the flattened key-value pairs from a dictionary to the 'attributes' list in the output record.

    Args:
        dictionary (dict): The dictionary containing the key-value pairs.
        output_rec (dict): The output record to which the flattened key-value pairs are appended.

    Returns:
        None
    """

    dictionary_flat = flatten_dict(dictionary)

    ## Used to convert Python types to Avro Schema Standards
    type_mapping = {
        bool: "boolean",
        int: "int",
        float: "float",
        str: "string",
        list: "array",
        dict: "map",
        tuple: "record",
        set: "enum"
    }

    for key, value in dictionary_flat.items():
        key_value_dict = {
            "name": str(key),
            "value": str(value),
            "update_ts": timestamp_value,
            "confidence": None,
            "magnitude": None,
            "data_type": type_mapping.get(type(value), str(type(value).__name__)),
            "global_region": "NA",
            "modeled": None,
            "method": "",
            "score": None,
            "source": "acxiomDS",
            "others": {
                "documentType": str(key).split('.')[0]
            }
        }

        output_rec["attributes"].append(key_value_dict)


def flatten_dict(dictionary, parent_key='', sep='.'):
    """
    Recursively flattens a nested dictionary.
    Args:
        dictionary (dict): The dictionary to flatten.
        parent_key (str): The parent key used for recursion.
        sep (str): The separator used to join keys.
    Returns:
        dict: The flattened dictionary.
    """

    flattened_dict = {}
    for key, value in dictionary.items():
        new_key = f"{parent_key}{sep}{key}" if parent_key else key
        if isinstance(value, dict):
            flattened_dict.update(flatten_dict(value, new_key, sep=sep))
        elif isinstance(value, list):
            for i, item in enumerate(value):
                if isinstance(item, dict):
                    flattened_dict.update(flatten_dict(item, f"{new_key}{sep}{i}", sep=sep))
                else:
                    flattened_dict[f"{new_key}{sep}{i}"] = item
        else:
            flattened_dict[new_key] = value
    return flattened_dict


def dat_hnd(rec: any) -> any:
    """
    Handles the data processing based on the provided record.
    Args:
        rec (any): The input record to process.
    Returns:
        any: The processed output record.
    """

    output_rec = {}
    output_rec["tenant_id"] = "tenant_id"  # change tenant_id if required
    output_rec["entity_type"] = "prsn"  # change entity_type if required
    output_rec["entity_domain"] = "_z__z__z__z_"
    output_rec["run_id"] = batch_id
    output_rec["entity_id"] = rec["document"]["clientReference"]
    output_rec["others"] = {}

    input_processing_key_dict = json.loads(rec['document']['inputProcessing'])
    documents_key_dict = json.loads(rec['document']['documents'])

    output_rec["attributes"] = []

    append_dict_to_attributes_list(input_processing_key_dict, output_rec)
    append_dict_to_attributes_list(documents_key_dict, output_rec)

    return [output_rec]


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    """
    Handles the schema parsing and returns the parsed schema and data handling function.
    Args:
        sch (schema.Schema): The schema object to parse.
    Returns:
        tuple: A tuple containing the parsed schema and the data handling function.
    """

    amdp_schema = aq_utils.load_schema("amdp_schemas-v5.0.2.yaml", "data_subject_attributes",
                                       False)

    # ! Adding partitions for amdp_schemas-v5.0.2 for it is loaded without them
    schema.add_fields(amdp_schema, schema.Field(name="tenant_id", schema=schema.StringSchema()))
    schema.add_fields(amdp_schema, schema.Field(name="entity_domain", schema=schema.StringSchema()))
    schema.add_fields(amdp_schema, schema.Field(name="entity_type", schema=schema.StringSchema()))
    schema.add_fields(amdp_schema, schema.Field(name="run_id", schema=schema.StringSchema()))

    return amdp_schema, dat_hnd
  • Table of contents

Was this article helpful?