import schema
import json
import time
import calendar
import aq_utils
import os
batch_id = os.getenv("BATCH_ID")
if batch_id is None:
raise Exception("Not able to find 'BATCH_ID' environment variable.")
timestamp_value = calendar.timegm(time.gmtime())
def append_dict_to_attributes_list(dictionary, output_rec):
"""
Appends the flattened key-value pairs from a dictionary to the 'attributes' list in the output record.
Args:
dictionary (dict): The dictionary containing the key-value pairs.
output_rec (dict): The output record to which the flattened key-value pairs are appended.
Returns:
None
"""
dictionary_flat = flatten_dict(dictionary)
## Used to convert Python types to Avro Schema Standards
type_mapping = {
bool: "boolean",
int: "int",
float: "float",
str: "string",
list: "array",
dict: "map",
tuple: "record",
set: "enum"
}
for key, value in dictionary_flat.items():
key_value_dict = {
"name": str(key),
"value": str(value),
"update_ts": timestamp_value,
"confidence": None,
"magnitude": None,
"data_type": type_mapping.get(type(value), str(type(value).__name__)),
"global_region": "NA",
"modeled": None,
"method": "",
"score": None,
"source": "acxiomDS",
"others": {
"documentType": str(key).split('.')[0]
}
}
output_rec["attributes"].append(key_value_dict)
def flatten_dict(dictionary, parent_key='', sep='.'):
"""
Recursively flattens a nested dictionary.
Args:
dictionary (dict): The dictionary to flatten.
parent_key (str): The parent key used for recursion.
sep (str): The separator used to join keys.
Returns:
dict: The flattened dictionary.
"""
flattened_dict = {}
for key, value in dictionary.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
flattened_dict.update(flatten_dict(value, new_key, sep=sep))
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
flattened_dict.update(flatten_dict(item, f"{new_key}{sep}{i}", sep=sep))
else:
flattened_dict[f"{new_key}{sep}{i}"] = item
else:
flattened_dict[new_key] = value
return flattened_dict
def dat_hnd(rec: any) -> any:
"""
Handles the data processing based on the provided record.
Args:
rec (any): The input record to process.
Returns:
any: The processed output record.
"""
output_rec = {}
output_rec["tenant_id"] = "tenant_id" # change tenant_id if required
output_rec["entity_type"] = "prsn" # change entity_type if required
output_rec["entity_domain"] = "_z__z__z__z_"
output_rec["run_id"] = batch_id
output_rec["entity_id"] = rec["document"]["clientReference"]
output_rec["others"] = {}
input_processing_key_dict = json.loads(rec['document']['inputProcessing'])
documents_key_dict = json.loads(rec['document']['documents'])
output_rec["attributes"] = []
append_dict_to_attributes_list(input_processing_key_dict, output_rec)
append_dict_to_attributes_list(documents_key_dict, output_rec)
return [output_rec]
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
"""
Handles the schema parsing and returns the parsed schema and data handling function.
Args:
sch (schema.Schema): The schema object to parse.
Returns:
tuple: A tuple containing the parsed schema and the data handling function.
"""
amdp_schema = aq_utils.load_schema("amdp_schemas-v5.0.2.yaml", "data_subject_attributes",
False)
# ! Adding partitions for amdp_schemas-v5.0.2 for it is loaded without them
schema.add_fields(amdp_schema, schema.Field(name="tenant_id", schema=schema.StringSchema()))
schema.add_fields(amdp_schema, schema.Field(name="entity_domain", schema=schema.StringSchema()))
schema.add_fields(amdp_schema, schema.Field(name="entity_type", schema=schema.StringSchema()))
schema.add_fields(amdp_schema, schema.Field(name="run_id", schema=schema.StringSchema()))
return amdp_schema, dat_hnd