import schema
import json
import aq_utils
import os
import traceback
from urllib.parse import urlparse
import sys
sys.path.append(os.environ.get("USER_DIR"))
from py_config_vars import regions_dict, entity_domain, id_format, partner, owner, puu_entity_type, puu_entity_domain
run_id = os.environ.get('BATCH_ID', "1234")
if not run_id:
raise Exception("BATCH_ID NOT SET")
def extract_event_id(string, key, delimeter):
values = string.split(",")
for value in values:
if key in value:
return value.split(delimeter)[1]
return ""
def dat_hnd(rec: any) -> any:
rec['query']['tid'] = rec['query'].get('tid') or "tid"
rec['runid'] = run_id # need to get from env variable
rec['other_entity_keys'] = []
obj_keys = {
"entity_type": rec.get('entity_type'),
"entity_domain": entity_domain.format(owner=owner, partner=partner, property=rec.get('entity_domain', ''),
id_type=rec.get('entity_type', ''), id_format=id_format),
"entity_id": rec.get('entity_id')
}
other_entity_items = [obj_keys]
if rec['query'].get('suu'):
suu_object_keys = {
"entity_type": "statid",
"entity_domain": entity_domain.format(owner=owner, partner=partner, property=rec.get('entity_domain', ''),
id_type="statid", id_format=id_format),
"entity_id": rec['query'].get('suu')
}
other_entity_items.append(suu_object_keys)
if rec['query'].get('puu'):
puu_object_keys = {
"entity_type": puu_entity_type,
"entity_domain": puu_entity_domain,
"entity_id": rec['query'].get('puu')
}
other_entity_items.append(puu_object_keys)
for key in rec.get('cookie', {}).keys():
if key != "none":
other_entity_items.append({"entity_type": "ck",
"entity_domain": entity_domain.format(owner=owner, partner=partner,
property=rec.get('entity_domain', ''),
id_type=key, id_format=""),
"entity_id": rec['cookie'].get(key)
})
if rec.get("hashed_ip"):
other_entity_items.append({"entity_type": "ip",
"entity_domain": entity_domain.format(owner=owner, partner=partner,
property=rec.get('entity_domain', ''),
id_type="v4", id_format="md5"),
"entity_id": rec.get("hashed_ip")
})
if rec.get("raw_ip"):
other_entity_items.append({"entity_type": "ip",
"entity_domain": entity_domain.format(owner=owner, partner=partner,
property=rec.get('entity_domain', ''),
id_type="v4", id_format="raw"),
"entity_id": rec.get("raw_ip")
})
rec['other_entity_keys'].extend(other_entity_items)
rec['page_url'] = {}
rec['page_url']['host'] = rec['query'].get('dmn')
rec['page_url']['path'] = rec['query'].get('pn')
# REMOVE IF ELSE AFTER FIXING THE SOURCE DATA
rec['page_url']['query'] = rec['query'].get('pqs') if isinstance(rec['query'].get('pqs'), dict) else {}
rec['page_url']['scheme'] = ''
rec['page_referer_url'] = {
"scheme": "",
"host": rec['query'].get('rd') or "",
"path": rec['query'].get('rpn') or "",
"query": rec['query'].get('rqs') if rec['query'].get('rqs') and isinstance(rec['query'].get('pqs'),
dict) else {}
}
rec['other_media_group_levels'] = {}
media_groups = rec['query'].get('ch')
if media_groups:
if isinstance(media_groups, dict):
for key in rec['query']['ch']:
rec['other_media_group_levels'][key] = rec['query']['ch'][key]
# GET CONFIRMATION ON THIS.
else:
rec['other_media_group_levels'][media_groups] = media_groups
country_code = rec['country_code']
rec['global_region'] = "Unknown"
for region, countries in regions_dict.items():
if country_code in countries:
rec['global_region'] = region
break
rec['other_geo'] = {}
rec['policy'] = [{"policy_id": "", "policy_effective_date": "", "effectivity": "", "policy_detail": []}]
rec['entity_domain'] = entity_domain.format(owner=owner, partner=partner, property=rec.get('entity_domain', ''), id_type=rec.get('entity_type', ''), id_format=id_format)
# REMOVE IF CERTAIN THAT FIELDS WILL ALWAYS BE PRESENT.
rec['query']['pevid'] = rec['query'].get("pevid") or extract_event_id(rec['aqfer_var'], "z_evid", "=")
rec['query']['src'] = rec['query'].get('src') or "aUT"
rec["event_actions"] = []
return [rec]
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
schema_file = 'schemas.yaml'
url_schema = aq_utils.load_schema(schema_file, 'url_schema', True)
schema.add_fields(sch, schema.Field(name="global_region", schema=schema.StringSchema()))
schema.add_fields(sch, schema.Field(name="runid", schema=schema.StringSchema()))
schema.add_fields(sch, schema.Field(name="page_url", schema=url_schema))
schema.add_fields(sch, schema.Field(name="page_referer_url", schema=url_schema))
schema.add_fields(sch, schema.Field(name="event_actions", schema=schema.ArraySchema(items=schema.StringSchema())))
schema.add_fields(sch, schema.Field(name="policy", schema=schema.ArraySchema(
items=schema.RecordSchema(name='policy', fields=[schema.Field(name="policy_id", schema=schema.StringSchema()),
schema.Field(name="policy_effective_date",
schema=schema.StringSchema()),
schema.Field(name="effectivity", schema=schema.StringSchema()),
schema.Field(name="policy_detail", schema=schema.ArraySchema(
items=schema.StringSchema()))]))))
schema.add_fields(sch, schema.Field(name="other_entity_keys", schema=schema.ArraySchema(
items=schema.RecordSchema(name='other_entity_keys',
fields=[schema.Field(name="entity_type", schema=schema.StringSchema()),
schema.Field(name="entity_domain", schema=schema.StringSchema()),
schema.Field(name="entity_id", schema=schema.StringSchema())]))))
schema.add_fields(sch,
schema.Field(name="other_media_group_levels", schema=schema.MapSchema(schema.StringSchema())))
schema.add_fields(sch, schema.Field(name="other_geo", schema=schema.MapSchema(schema.StringSchema())))
return sch, dat_hnd