transform.py

import schema
import os
import aq_utils

run_id = os.environ.get('BATCH_ID')
if not run_id:
    raise Exception("BATCH_ID NOT SET")


def setup():
    global regions_dict
    regions_dict = aq_utils.load("")[0]


def extract_event_id(string, key, delimiter):
    values = string.split(",")
    for value in values:
        if key in value:
            return value.split(delimiter)[1]
    return ""


def dat_hnd(rec: any) -> any:
    # ! tenant_id field
    rec['query']['tid'] = rec['query'].get('tid') or "tid"
    rec['runid'] = run_id

    country_code = rec['country_code']
    rec['global_region'] = "Unknown"
    for region, countries in regions_dict.items():
        if country_code in countries:
            rec['global_region'] = region
            break

    rec['query']['pevid'] = rec['query'].get("pevid") or extract_event_id(rec['aqfer_var'], "z_evid", "=")
    rec['query']['src'] = rec['query'].get('src') or "aUT"
    
    return [rec]


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    setup()
    schema.add_fields(sch, schema.Field(name="global_region", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="runid", schema=schema.StringSchema()))
    return sch, dat_hnd
  • Table of contents

Was this article helpful?