transform.py

import aq_utils
import hashlib
import os
import schema
from urllib.parse import urlparse


run_id = os.environ.get('BATCH_ID')
if not run_id:
    raise Exception("BATCH_ID NOT SET")


def setup():
    global regions_dict
    regions_dict = aq_utils.load("")[0]


def extract_event_id(string, key, delimiter):
    values = string.split(",")
    for value in values:
        if key in value:
            return value.split(delimiter)[1]
    return ""


def dat_hnd(rec: any) -> any:
    # ! tenant_id field
    rec['query']['tid'] = rec['query'].get('tid') or "tid"

    rec['runid'] = run_id

    rec['page_url'] = {}
    rec['page_url']['host'] = rec['query'].get('dmn') or rec.get('domain')
    rec['page_url']['path'] = rec['query'].get('pn')

    rec['page_url']['query'] = {"pqs": rec['query'].get('pqs')} if rec['query'].get('pqs') not in [None, "na", "NA"] else {}
    rec['page_url']['scheme'] = ''

    if rec['query'].get("ru"):
        destination_url_rp = urlparse(rec['query']['ru'])
        listquery = [element for element in destination_url_rp.query.split('&') if element != ""]
        query = {'': ''}
        if len(listquery):
            query = {}
            for sub_query in listquery:
                params_values = sub_query.split("=")
                if len(params_values) > 1:
                    query[params_values[0]] = params_values[1]
                else:
                    query[params_values[0]] = ""
        rec['destination_url'] = {"scheme": destination_url_rp.scheme or "", "host": destination_url_rp.hostname or "",
                                  "path": destination_url_rp.path or "", "query": query}
    else:
        rec['destination_url'] = {"scheme": "", "host": "", "path": "", "query": {}}

    rec['page_referer_url'] = {
        "scheme": "",
        "host": rec['query'].get('rd') or rec.get('referer'),
        "path": rec['query'].get('rpn') or "",
        "query": {"rqs": rec['query'].get('rqs')} if rec['query'].get('rqs') not in [None, "na", "NA"] else {}
    }

    rec['other_media_group_levels'] = {}
    media_groups = rec['query'].get('ch')
    if media_groups:
        if isinstance(media_groups, dict):
            for key in rec['query']['ch']:
                rec['other_media_group_levels'][key] = rec['query']['ch'][key]
        else:
            rec['other_media_group_levels'][media_groups] = media_groups

    country_code = rec['country_code']
    rec['global_region'] = "Unknown"
    for region, countries in regions_dict.items():
        if country_code in countries:
            rec['global_region'] = region
            break

    rec['query']['pevid'] = rec['query'].get("pevid") or extract_event_id(rec['aqfer_var'], "z_evid", "=")
    rec['query']['src'] = rec['query'].get('src') or "aUT"
    rec['query']['parent_pevid'] = rec['query'].get("parent_pevid") or "SYN-" + hashlib.sha256(rec['query']['pevid'].encode()).hexdigest()
    return [rec]


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    setup()
    url_schema = aq_utils.load_schema("amdp_schemas-v6.2.yaml", "url_struct")
    schema.add_fields(sch, schema.Field(name="global_region", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="runid", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="page_url", schema=url_schema))
    schema.add_fields(sch, schema.Field(name="page_referer_url", schema=url_schema))
    schema.add_fields(sch, schema.Field(name="other_media_group_levels", schema=schema.MapSchema(schema.StringSchema())))
    schema.add_fields(sch, schema.Field(name="destination_url", schema=url_schema))
    return sch, dat_hnd
  • Table of contents

Was this article helpful?