import schema
import aq_utils
import s3_utils
import os
import hashlib
run_id = os.environ.get('BATCH_ID', "test_1")
if not run_id:
raise Exception("BATCH_ID NOT SET")
def setup():
global regions_dict
regions_dict = s3_utils.get_s3_file_content("")[0]
def extract_event_id(string, key, delimiter):
values = string.split(",")
for value in values:
if key in value:
return value.split(delimiter)[1]
return ""
def dat_hnd(rec: any) -> any:
# ! tenant_id field
rec['query']['tid'] = rec['query'].get('tid') or "tid"
rec['runid'] = run_id # need to get from env variable
rec['page_url'] = {}
rec['page_url']['host'] = rec['query'].get('dmn') or rec.get('domain')
rec['page_url']['path'] = rec['query'].get('pn')
rec['page_url']['query'] = {"pqs": rec['query'].get('pqs')} if rec['query'].get('pqs') not in [None, "na", "NA"] else {}
rec['page_url']['scheme'] = ''
rec['page_referer_url'] = {
"scheme": "",
"host": rec['query'].get('rd') or rec.get('referer'),
"path": rec['query'].get('rpn') or "",
"query": {"rqs": rec['query'].get('rqs')} if rec['query'].get('rqs') not in [None, "na", "NA"] else {}
}
rec['other_media_group_levels'] = {}
media_groups = rec['query'].get('ch')
if media_groups:
if isinstance(media_groups, dict):
for key in rec['query']['ch']:
rec['other_media_group_levels'][key] = rec['query']['ch'][key]
else:
rec['other_media_group_levels'][media_groups] = media_groups
country_code = rec['country_code']
rec['global_region'] = "Unknown"
for region, countries in regions_dict.items():
if country_code in countries:
rec['global_region'] = region
break
rec['query']['pevid'] = rec['query'].get("pevid") or extract_event_id(rec['aqfer_var'], "z_evid", "=")
rec['query']['src'] = rec['query'].get('src') or "aUT"
rec['query']['parent_pevid'] = rec['query'].get("parent_pevid") or "SYN-" + hashlib.sha256(rec['query']['pevid'].encode()).hexdigest()
return [rec]
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
setup()
url_schema = aq_utils.load_schema("amdp_schemas-v6.2.yaml", "url_struct")
schema.add_fields(sch, schema.Field(name="global_region", schema=schema.StringSchema()))
schema.add_fields(sch, schema.Field(name="runid", schema=schema.StringSchema()))
schema.add_fields(sch, schema.Field(name="page_url", schema=url_schema))
schema.add_fields(sch, schema.Field(name="page_referer_url", schema=url_schema))
schema.add_fields(sch, schema.Field(name="other_media_group_levels", schema=schema.MapSchema(schema.StringSchema())))
return sch, dat_hnd