transform.py

import schema
import json
import aq_utils
import os
import traceback
from urllib.parse import urlparse
import sys

sys.path.append(os.environ.get("USER_DIR"))
from py_config_vars import regions_dict, entity_domain_attributes, puu_entity_type, puu_entity_domain

run_id = os.environ.get('BATCH_ID', "test_1")
if not run_id:
    raise Exception("BATCH_ID NOT SET")


def extract_event_id(string, key, delimeter):
    values = string.split(",")
    for value in values:
        if key in value:
            return value.split(delimeter)[1]
    return ""


def process_entity_domain(rec, id_type, id_format):
    entity_domain_format = "{owner}_z_{partner}_z_{property}_z_{id_type}_z_{id_format}"
    entity_domain_value = rec.get('entity_domain')

    owner = entity_domain_attributes.get("owner", "")
    partner = entity_domain_attributes.get("partner", "")
    type = id_type
    format = id_format

    if "_z_" in entity_domain_value:
        # Split the entity_domain_value into individual variables
        splitted_owner, splitted_partner, splitted_property_value, splitted_id_type, splitted_id_format = entity_domain_value.split(
            "_z_")

        owner = owner or splitted_owner
        partner = partner or splitted_partner
        entity_domain_value = splitted_property_value
        type = id_type or splitted_id_type
        format = id_format or splitted_id_format

    return entity_domain_format.format(owner=owner, partner=partner, property=entity_domain_value, id_type=type,
                                       id_format=format)


def dat_hnd(rec: any) -> any:
    # ! tenant_id field
    rec['query']['tid'] = rec['query'].get('tid') or "tid"
    rec['runid'] = run_id  # need to get from env variable

    # ! Generating additional other_entity_keys
    obj_keys = {
        "entity_type": rec.get('entity_type'),
        "entity_domain": process_entity_domain(rec, rec.get('entity_type', ''), "raw"),
        "entity_id": rec.get('entity_id')
    }
    other_entity_items = [obj_keys]

    if rec['query'].get('suu'):
        suu_object_keys = {
            "entity_type": "statid",
            "entity_domain": process_entity_domain(rec, "statid", "raw"),
            "entity_id": rec['query'].get('suu')
        }
        other_entity_items.append(suu_object_keys)

    if rec['query'].get('puu'):
        puu_object_keys = {
            "entity_type": puu_entity_type,
            "entity_domain": puu_entity_domain,
            "entity_id": rec['query'].get('puu')
        }
        other_entity_items.append(puu_object_keys)

    for key in rec.get('cookie', {}).keys():
        if key != "none":
            other_entity_items.append({"entity_type": "ck",
                                       "entity_domain": process_entity_domain(rec, key, ""),
                                       "entity_id": rec['cookie'].get(key)
                                       })

    if rec.get("hashed_ip"):
        other_entity_items.append({"entity_type": "ip",
                                   "entity_domain": process_entity_domain(rec, "v4", "md5"),
                                   "entity_id": rec.get("hashed_ip")
                                   })

    if rec.get("raw_ip"):
        other_entity_items.append({"entity_type": "ip",
                                   "entity_domain": process_entity_domain(rec, "v4", "raw"),
                                   "entity_id": rec.get("raw_ip")
                                   })

    # Appending new generated entity keys to "other_entity_keys"
    rec['other_entity_keys'].extend(other_entity_items)

    rec['page_url'] = {}
    rec['page_url']['host'] = rec['query'].get('dmn')
    rec['page_url']['path'] = rec['query'].get('pn')
    # REMOVE IF ELSE AFTER FIXING THE SOURCE DATA
    rec['page_url']['query'] = rec['query'].get('pqs') if isinstance(rec['query'].get('pqs'), dict) else {}
    rec['page_url']['scheme'] = ''

    rec['page_referer_url'] = {
        "scheme": "",
        "host": rec['query'].get('rd') or "",
        "path": rec['query'].get('rpn') or "",
        "query": rec['query'].get('rqs') if rec['query'].get('rqs') and isinstance(rec['query'].get('pqs'),
                                                                                   dict) else {}
    }

    rec['other_media_group_levels'] = {}
    media_groups = rec['query'].get('ch')
    if media_groups:
        if isinstance(media_groups, dict):
            for key in rec['query']['ch']:
                rec['other_media_group_levels'][key] = rec['query']['ch'][key]
        # GET CONFIRMATION ON THIS.
        else:
            rec['other_media_group_levels'][media_groups] = media_groups

    country_code = rec['country_code']
    rec['global_region'] = "Unknown"
    for region, countries in regions_dict.items():
        if country_code in countries:
            rec['global_region'] = region
            break

    rec['other_geo'] = {}
    rec['policy'] = [{"policy_id": "", "policy_effective_date": "", "effectivity": "", "policy_detail": []}]

    rec['entity_domain'] = process_entity_domain(rec, rec.get('entity_type', ''), "raw")
    # REMOVE IF CERTAIN THAT FIELDS WILL ALWAYS BE PRESENT.
    rec['query']['pevid'] = rec['query'].get("pevid") or extract_event_id(rec['aqfer_var'], "z_evid", "=")
    rec['query']['src'] = rec['query'].get('src') or "aUT"
    rec["event_actions"] = []

    return [rec]


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    schema_file = 'schemas.yaml'
    url_schema = aq_utils.load_schema(schema_file, 'url_schema', True)
    schema.add_fields(sch, schema.Field(name="global_region", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="runid", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="page_url", schema=url_schema))
    schema.add_fields(sch, schema.Field(name="page_referer_url", schema=url_schema))
    schema.add_fields(sch, schema.Field(name="event_actions", schema=schema.ArraySchema(items=schema.StringSchema())))
    schema.add_fields(sch, schema.Field(name="policy", schema=schema.ArraySchema(
        items=schema.RecordSchema(name='policy', fields=[schema.Field(name="policy_id", schema=schema.StringSchema()),
                                                         schema.Field(name="policy_effective_date",
                                                                      schema=schema.StringSchema()),
                                                         schema.Field(name="effectivity", schema=schema.StringSchema()),
                                                         schema.Field(name="policy_detail", schema=schema.ArraySchema(
                                                             items=schema.StringSchema()))]))))
    schema.add_fields(sch,
                      schema.Field(name="other_media_group_levels", schema=schema.MapSchema(schema.StringSchema())))
    schema.add_fields(sch, schema.Field(name="other_geo", schema=schema.MapSchema(schema.StringSchema())))
    return sch, dat_hnd
  • Table of contents

Was this article helpful?