transform.py

import schema, random
import aq_utils
import hashlib
import os

batchid = os.getenv("BATCH_ID")


def setup() -> any:
    global entity_tuple, gen_event_id, event_types, statid_algo, invalid_uus, invalid_suus, invalid_cks, ck_name
    event_types = set()

    options = aq_utils.load(os.getenv("USER_DIR") + '/options.yaml')

    event_types.update(options[0].get('filter_event_types', []))
    gen_event_id = options[0].get('gen_event_id', False)
    entity_tuple = options[0].get('infer_entity_tuple', {})
    ck_name = entity_tuple.get('cookie_name')
    statid_algo = entity_tuple.get('statid_algo', 'algo1')
    invalid_uus = entity_tuple.get('invalid_uus', ['NA'])
    invalid_suus = entity_tuple.get('invalid_suus', ['NA'])
    invalid_cks = entity_tuple.get('invalid_cookies', ['NA'])


def get_long(value):
    # Check if 'value' is not None and is an instance of int or str
    if value is not None and isinstance(value, (int, str)):
        return int(value)

    # Return 0 if 'value' is None or doesn't meet the criteria
    return 0


def process_partner_url(partner_url):
    return {
        "status": partner_url.get("status"),
        "latency": get_long(partner_url.get("latency_ms")),
        "beacon_id": partner_url.get("beacon_id"),
        "domain": partner_url.get("domain")
    }


def statid_algo1(res):
    # ! ALGO1 =  IP address + user agent + accept_encoding_header + accept_language_header
    raw_ip = res.get("raw_ip")
    hashed_ip = res.get("hashed_ip")

    if raw_ip in (None, "", "NA") and hashed_ip is not None:
        ip = hashed_ip
    else:
        ip = raw_ip

    # Get user_agent, use user_agent_unwrapped_components if available
    user_agent_unwrapped_components = res.get("user_agent_unwrapped_components", {})
    user_agent = res.get("user_agent") or user_agent_unwrapped_components.get("user_agent")

    # Extract other values
    accept_encoding = res.get("accept_encoding_header")
    accept_language = res.get("accept_language_header")

    # Check if any of the values is None
    if None in (ip, user_agent, accept_encoding, accept_language):
        return None

    # Concatenate values and compute the SHA-256 hash
    entity_id = f"{ip}{user_agent}{accept_encoding}{accept_language}"
    return hashlib.sha256(entity_id.encode('utf-8')).hexdigest().upper()


def compute_event_id(res):
    event_id, rand = "", ""
    is_rand = False

    aqEvtId = next((av.split('=')[1].strip() for av in res["aqfer_var"].split(',') if '=' in av and 'z_evid' in av), "")

    if res["event_id"] not in (None, "", "NA"):
        event_id = res["event_id"]
    elif "z_evid" in res["query"] and res["query"]["z_evid"] not in ("", "NA"):
        event_id = res["query"]["z_evid"]
    elif aqEvtId not in (None, "", "NA"):
        event_id = aqEvtId
    else:
        rand = str(random.random())
        is_rand = True
        evt_id = ""
        if res["hashed_ip"] not in (None, "", "NA"):
            evt_id = f"{res['hashed_ip']}.{res['event_timestamp']}.{rand}"
        elif res["raw_ip"] not in (None, "", "NA"):
            evt_id = f"{res['raw_ip']}.{res['event_timestamp']}.{rand}"
        elif res["event_timestamp"] not in (None, 0):
            evt_id = f"{res['event_timestamp']}.{rand}"
        else:
            evt_id = rand
        event_id = hashlib.sha256(evt_id.encode('utf-8')).hexdigest()

    return event_id, is_rand, rand


def infer_entity_tup(rec):
    # ck = rec["cookie"].get(ck_name)
    is_valid_uu = rec["uu"] not in invalid_uus
    is_valid_suu = rec["suu"] not in invalid_suus
    # is_valid_ck = ((ck != None) and (ck not in invalid_cks))
    is_valid_ck = ((rec.get("ck") != None) and (rec.get("ck") not in invalid_cks))

    if rec.get("prsn") != None:
        entity_type = "prsn"
        entity_domain = entity_tuple["entity_domain_config"]["prsn"]
        entity_id = rec["prsn"]
    elif is_valid_uu and rec["uu"] != "NA":
        entity_type = "prsn"
        entity_domain = entity_tuple["entity_domain_config"]["uu"]
        entity_id = rec["uu"]
    elif rec.get("hh") != None:
        entity_type = "hh"
        entity_domain = entity_tuple["entity_domain_config"]["hh"]
        entity_id = rec["hh"]
    elif rec.get("org") != None:
        entity_type = "org"
        entity_domain = entity_tuple["entity_domain_config"]["org"]
        entity_id = rec["org"]
    elif rec.get("rdid") != None:
        entity_type = "rdid"
        entity_domain = entity_tuple["entity_domain_config"]["rdid"]
        entity_id = rec["rdid"]
    elif rec.get("dev") != None:
        entity_type = "dev"
        entity_domain = entity_tuple["entity_domain_config"]["dev"]
        entity_id = rec["dev"]
    elif rec.get("ck") != None:
        entity_type = "ck"
        entity_domain = entity_tuple["entity_domain_config"]["ck"]
        entity_id = rec["ck"]
    elif is_valid_suu and rec["suu"] != "NA":
        entity_type = "ck"
        entity_domain = entity_tuple["entity_domain_config"]["suu"]
        entity_id = rec["suu"]
    elif rec.get("statid") != None:
        entity_type = "statid"
        entity_domain = entity_tuple["entity_domain_config"]["statid"]
        entity_id = rec["statid"]
    else:
        entity_type = "statid"
        entity_domain = f"_z__z_{rec.get('host', '')}_z_algo1_z_"
        entity_id = statid_algo1(rec)

    return entity_type, entity_domain, entity_id


def generate_other_entity_keys(rec):
    selected_algo = statid_algo

    other_entity_keys = []

    # Define the list of available algorithms
    available_algorithms = ["algo1"]

    # Function to generate entity information for a given algorithm
    def generate_entity_info(algo):
        entity_type = "statid"
        entity_domain = f"_z__z_{rec.get('host', '')}_z_{algo}_z_"
        entity_id = globals()[f"statid_{algo}"](rec)
        return {
            "entity_type": entity_type,
            "entity_domain": entity_domain,
            "entity_id": entity_id
        }

    # If the entity type is "statid", generate entity information for the non-selected algorithm, else generate all
    if rec.get("entity_type") == "statid":
        for algo in available_algorithms:
            if algo == selected_algo:
                continue  # Skip the selected algorithm

            algorithm_info = generate_entity_info(algo)
            other_entity_keys.append(algorithm_info)
    else:
        for algo in available_algorithms:
            algorithm_info = generate_entity_info(algo)
            other_entity_keys.append(algorithm_info)

    return other_entity_keys


def dat_hnd(input_rec: any) -> any:
    output_rec = {
        "hashed_ip": input_rec["hashed_ip"],
        "raw_ip": input_rec["raw_ip"],
        "event_timestamp": get_long(input_rec["event_timestamp"]),
        "event_date": input_rec["event_date"],
        "event_hour": input_rec["event_hour"],
        "host": input_rec["host"],
        "path": input_rec["path"],
        "query": input_rec["query"],
        "referer": input_rec["referer"],
        "cookie": input_rec["cookie"],
        "status_code": int(input_rec["status_code"]) if input_rec["status_code"] is not None else None,
        "aqfer_var": input_rec["aqfer_var"],
        "dnt": input_rec["dnt"],
        "accept_header": input_rec["accept_header"],
        "accept_language_header": input_rec["accept_language_header"],
        "accept_encoding_header": input_rec["accept_encoding_header"],
        "country_code": input_rec["country_code"],
        "region_code": input_rec["region_code"],
        "latitude": input_rec["latitude"],
        "longitude": input_rec["longitude"],
        "dma": input_rec["dma"],
        "msa": input_rec["msa"],
        "timezone": input_rec["timezone"],
        "area_code": input_rec["area_code"],
        "fips": input_rec["fips"],
        "city": input_rec["city"],
        "zip": input_rec["zip"],
        "network": input_rec["network"],
        "network_type": input_rec["network_type"],
        "throughput": input_rec["throughput"],
        "tag_type": input_rec["tag_type"],
        "cls": input_rec["class"],
        "event_id": input_rec["event_id"],
        "uu": input_rec["uu"],
        "suu": input_rec["suu"],
        "puu": input_rec["puu"],
        "domain": input_rec["domain"],
        "redirect_domain": input_rec["redirect_domain"],
        "partner_urls": [process_partner_url(partner_url) for partner_url in input_rec["partner_urls"]],
        "version": input_rec["version"],
    }

    # Unwrapping user_agent_unwrapped_components to top level record
    for key, value in input_rec["user_agent_unwrapped_components"].items():
        # To rename "device_type" key to "device" and replace null with an empty string
        if key == "device_type":
            output_rec["device"] = value
        else:
            output_rec[key] = value

    # Adding advertiser_id key if not present on the output record
    if not output_rec["query"].get("adv"):
        # If not present, add "adv" key to query map with an empty string value
        output_rec["query"]["adv"] = ""

    # Generating entity_type, entity_domain, entity_id
    output_rec["entity_type"], output_rec["entity_domain"], output_rec["entity_id"] = infer_entity_tup(input_rec)

    # Generating event_id
    if gen_event_id:
        output_rec["event_id"], is_rand, rand_num = compute_event_id(output_rec)
        if is_rand:
            output_rec["query"]["aqfer_rand"] = rand_num

    # Generating event_type
    event_type_value = output_rec["query"].get("aqet", "NA")
    output_rec["event_type"] = event_type_value if not event_types or event_type_value in event_types else "othev"

    # Generating other_entity_keys
    output_rec["other_entity_keys"] = generate_other_entity_keys(output_rec)

    output_rec["others"] = {}
    output_rec["run_id"] = batchid

    return [output_rec]


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    setup()

    output_schema = aq_utils.load_schema("internal_schemas-v1.0.yaml", 'parse_receipt')
    return output_schema, dat_hnd
  • Table of contents

Was this article helpful?