import aq_utils
import schema
import time
import hashlib
import json
def get_data_type_value(type):
types = {"int": 0, "float": 0, "double": 0, "string": "", "map": {}, "list": [], "null": None}
return types.get(type)
def get_formatted_schema(raw_schema):
formatted_schema = {}
for field in raw_schema['fields']:
formatted_schema[field['name']] = field['type']
return formatted_schema
def format_data(data, formatted_schema):
for element in formatted_schema:
if element not in data:
if isinstance(formatted_schema[element], list):
data[element] = get_data_type_value("null") if "null" in formatted_schema[element] else \
get_data_type_value(formatted_schema[element][0])
else:
data[element] = get_data_type_value(formatted_schema[element])
raw_clicks_schema = json.loads(aq_utils.load_schema("amdp_schemas-v6.1.1.yaml", 'click_collated').__str__())
formatted_clicks_schema = get_formatted_schema(raw_clicks_schema)
raw_engagement_schema = json.loads(aq_utils.load_schema("amdp_schemas-v6.1.1.yaml", 'engagement_collated').__str__())
formatted_engagement_schema = get_formatted_schema(raw_engagement_schema)
invalid_ids = set(['NA'])
current_ts = int(time.time() * 1000)
def isValid(cc):
return cc is not None and cc != '' and cc != 'NA'
def non_empty(list):
return len(list) != 0
def sortByTimestamp(evt):
evt.sort(key=lambda x: x['event_timestamp'])
def set_event_group(evt, result, retain_event_id: bool):
event_groups = result['event_groups']
event_group = {}
event_group['event_id'] = ''
# these fields will be calculated at the end of collate-merge
event_group['event_timestamp'] = 0
event_group['first_event_id'] = ''
event_group['last_timestamp'] = 0
event_group['events_count'] = 0
event_group['source'] = evt['source'] # if evt['source'] else ""
event_group['browser'] = evt['browser']
event_group['browser_version'] = evt['browser_version']
event_group['device_type'] = evt['device_type']
# event_group['hashed_ip'] = evt.get('hashed_ip', '')
event_group['network'] = evt['network']
event_group['network_type'] = evt['network_type']
event_group['os'] = evt['os']
event_group['os_version'] = evt['os_version']
event_group['throughput'] = evt['throughput']
event_group['user_agent'] = evt['user_agent']
event_group['area_code'] = evt['area_code']
event_group['city'] = evt['city']
event_group['country_code'] = evt['country_code']
event_group['dma'] = evt['dma']
event_group['fips'] = evt['fips']
event_group['global_region'] = evt.get('global_region') if evt.get('global_region') else ""
event_group['latitude'] = str(evt['latitude']) if evt['latitude'] else ""
event_group['longitude'] = str(evt['longitude']) if evt['longitude'] else ""
event_group['msa'] = evt['msa']
event_group['region_code'] = evt['region_code']
event_group['timezone'] = evt['timezone']
event_group['zip'] = evt['zip']
# event_policy = evt['policy']
event_group['policy'] = None #evt['policy'] if not evt['policy'] else []
event_group['other_geo'] = dict(sorted(evt['other_geo'].items())) if isinstance(evt['other_geo'], dict) else {}
if retain_event_id:
event_group['event_id'] = evt['event_id']
else:
event_group['event_id'] = hashlib.sha256(str(event_group.items()).encode()).hexdigest()
evt['event_group_id'] = event_group['event_id']
if event_group not in event_groups:
result['event_groups'].append(event_group)
def update_events(rec, result, key, advertiser_id):
result[key] = []
input_events = rec.get(key, [])
for input_event in input_events:
set_event_group(input_event, result, 0)
input_event['advertiser_id'] = advertiser_id
input_event['policy'] = []
input_event['content_bundle'] = None
result[key].append(input_event)
sortByTimestamp(result[key])
def get_clicks_dict(rec, result, subevents_dict, advertiser_id):
invalid_clicks = []
clicks = rec.get('clicks', [])
for clk in clicks:
set_event_group(clk, result, 0)
clk['advertiser_id'] = advertiser_id
imp_id = clk['parent_event_id']
clk_id = clk['event_id']
orhan_imp = {}
orhan_imp = clk
orhan_imp['event_id'] = imp_id if imp_id else ""
orhan_imp['has_click'] = True
result['has_clicks'] = True
orhan_imp['has_engagement'] = None
orhan_imp['viewable'] = None
orhan_imp['synthetic_event'] = True
clk_subevent = {}
clk_subevent['event_timestamp'] = orhan_imp['event_timestamp']
clk_subevent['agency_id'] = orhan_imp['agency_id']
clk_subevent['event_id'] = clk_id
clk_subevent['page_url'] = orhan_imp['page_url']
clk_subevent['page_referer_url'] = orhan_imp['page_referer_url']
clk_subevent['destination_url'] = orhan_imp['destination_url']
if 'cpc_paid' in orhan_imp:
clk_subevent['cpc_paid'] = orhan_imp['cpc_paid']
if 'cpc_cost' in orhan_imp:
clk_subevent['cpc_cost'] = orhan_imp['cpc_cost']
clk_subevent['other_event_ids'] = orhan_imp['other_event_ids']
clk_subevent['metrics'] = orhan_imp['metrics']
clk_subevent['others'] = orhan_imp['others']
orhan_imp['clicks'] = [clk_subevent]
orhan_imp['engagements'] = []
del orhan_imp['destination_url']
orhan_imp.pop('cpc_paid') if 'cpc_paid' in orhan_imp else None
orhan_imp.pop('cpc_cost') if 'cpc_cost' in orhan_imp else None
if imp_id in invalid_ids:
invalid_clicks.append(orhan_imp)
else:
ex_clk = subevents_dict.get(imp_id)
if ex_clk is not None:
ex_clk['clicks'].append(clk_subevent)
ex_clk['has_click'] = True
ex_clk['has_click_conversion'] = ex_clk.get('has_click_conversion') or orhan_imp.get(
'has_click_conversion')
subevents_dict[imp_id] = ex_clk
else:
subevents_dict[imp_id] = orhan_imp
return subevents_dict, invalid_clicks
def get_engagement_dict(rec, result, subevents_dict, advertiser_id):
invalid_engagements = []
engagements = rec.get('engagements', [])
for eng in engagements:
set_event_group(eng, result, 0)
eng['advertiser_id'] = advertiser_id
imp_id = eng['parent_event_id']
orhan_imp = {}
orhan_imp = eng
orhan_imp['event_id'] = imp_id
orhan_imp['has_click_conversion'] = None
orhan_imp['has_click'] = None
orhan_imp['has_engagement'] = True
result['has_engagements'] = True
orhan_imp['viewable'] = True
orhan_imp['synthetic_event'] = True
eng_subevent = {}
eng_subevent['event_timestamp'] = orhan_imp['event_timestamp']
eng_subevent['event_subtype_1'] = orhan_imp['event_subtype_1']
eng_subevent['event_subtype_2'] = orhan_imp.get('event_subtype_2')
if 'cpc_paid' in orhan_imp:
eng_subevent['cpc_paid'] = orhan_imp['cpc_paid']
if 'cpc_cost' in orhan_imp:
eng_subevent['cpc_cost'] = orhan_imp['cpc_cost']
if 'cpe_paid' in orhan_imp:
eng_subevent['cpe_paid'] = orhan_imp['cpe_paid']
if 'cpe_cost' in orhan_imp:
eng_subevent['cpe_cost'] = orhan_imp['cpe_cost']
if 'engagement_actions' in orhan_imp:
eng_subevent['engagement_actions'] = orhan_imp['engagement_actions']
eng_subevent['metrics'] = orhan_imp['metrics']
eng_subevent['others'] = orhan_imp['others']
eng_subevent['tag_url'] = orhan_imp['tag_url']
orhan_imp['clicks'] = []
orhan_imp['engagements'] = [eng_subevent]
orhan_imp.pop('engagement_actions') if 'engagement_actions' in orhan_imp else None
del orhan_imp['event_subtype_1']
orhan_imp.pop('cpc_paid') if 'cpc_paid' in orhan_imp else None
orhan_imp.pop('cpc_cost') if 'cpc_cost' in orhan_imp else None
orhan_imp.pop('cpe_paid') if 'cpe_paid' in orhan_imp else None
orhan_imp.pop('cpe_cost') if 'cpe_cost' in orhan_imp else None
if imp_id in invalid_ids:
invalid_engagements.append(orhan_imp)
else:
ex_eng = subevents_dict.get(imp_id)
if ex_eng is not None:
ex_eng['engagements'].append(eng_subevent)
else:
subevents_dict[imp_id] = orhan_imp
return subevents_dict, invalid_engagements
def get_impressions_dict(rec, result, advertiser_id):
impressions_dict = {}
impressions = rec.get('impressions', [])
for inp_imp in impressions:
inp_imp['advertiser_id'] = advertiser_id
set_event_group(inp_imp, result, 0)
imp_id = inp_imp['event_id']
if imp_id not in impressions_dict:
inp_imp['clicks'] = []
inp_imp['engagements'] = []
inp_imp['viewable'] = None
impressions_dict[imp_id] = inp_imp
else:
impressions_dict[imp_id]['others'].update(inp_imp['others'])
return impressions_dict
def subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id):
result['has_clicks'] = False
result['has_engagements'] = False
impressions_dict = get_impressions_dict(rec, result, advertiser_id)
[subevents_dict, invalid_subevents] = get_engagement_dict(rec, result, {}, advertiser_id)
[subevents_dict, invalid_clicks] = get_clicks_dict(rec, result, subevents_dict, advertiser_id)
invalid_subevents.extend(invalid_clicks)
# insert subevents into parent impression
impressions = []
for imp_id in impressions_dict:
impression = impressions_dict[imp_id]
impression['synthetic_event'] = False
se = subevents_dict.pop(imp_id, None)
if se is not None:
impression['clicks'].extend(se['clicks'])
impression['engagements'].extend(se['engagements'])
impression['has_click'] = non_empty(impression['clicks'])
result['has_clicks'] = result['has_clicks'] or impression['has_click']
impression['has_engagement'] = non_empty(impression['engagements'])
result['has_engagements'] = result['has_engagements'] or impression['has_engagement']
# if impression['has_click_conversion'] is None or not impression['has_click_conversion']:
if not impression.get('has_click_conversion'):
impression['has_click_conversion'] = se['has_click_conversion']
if impression['viewable'] is None or not impression['viewable']:
impression['viewable'] = se['viewable']
impressions.append(impression)
impressions.extend(invalid_subevents)
impressions.extend(subevents_dict.values())
sortByTimestamp(impressions)
result['impressions'] = impressions
def dat_hnd(rec: any) -> any:
result = {}
advertiser_id = rec.get('advertiser_id', "")
result['entity_id'] = rec['entity_id']
result['event_window_start'] = None # handled in collate-merge
result['event_window_duration'] = '1 Hour'
result['event_groups'] = []
# if event_groups are an input we use that data,
# otherwise calculate based on other inputs
input_event_groups = rec.get('event_groups', [])
for input_event_group in input_event_groups:
set_event_group(input_event_group, result, 1)
update_events(rec, result, 'conversions', advertiser_id)
update_events(rec, result, 'views', advertiser_id)
update_events(rec, result, 'geo_events', advertiser_id)
update_events(rec, result, 'poi_visits', advertiser_id)
update_events(rec, result, 'attributes', advertiser_id)
update_events(rec, result, 'other_event', advertiser_id)
update_events(rec, result, 'id_syncs', advertiser_id)
update_events(rec, result, 'text_clicks', advertiser_id)
# Logic Change for interaction as it's not required in output And Corrected names for elements in the schema
if rec.get("interactions"):
update_events(rec, result, 'interactions', advertiser_id)
subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
result['has_impressions'] = non_empty(result['impressions'])
result['has_conversions'] = non_empty(result['conversions'])
result['has_views'] = non_empty(result['views'])
result['has_geo_events'] = non_empty(result['geo_events'])
result['has_poi_visits'] = non_empty(result['poi_visits'])
result['has_interactions'] = False if not rec.get("interactions") else non_empty(result['interactions'])
result['has_other_events'] = non_empty(result['other_event'])
result['has_id_syncs'] = non_empty(result['id_syncs'])
result['has_text_clicks'] = non_empty(result['text_clicks'])
for element in result['impressions']:
element['has_click_conversion'] = element.get("has_click_conversion") or False
element['content_bundle'] = None
element['has_click'] = element.get("has_click") or False
element['has_engagement'] = element.get("has_click") or False
element['event_subtype_1'] = element.get("event_subtype_1")
for sub_elements in element['clicks']:
format_data(sub_elements, formatted_clicks_schema)
for engagement in element['engagements']:
format_data(engagement, formatted_engagement_schema)
# Remove after confirmation
result['event_window_start'] = result.get('event_window_start', 0) or 0
for conv in result['conversions']:
conv['synthetic_event'] = conv.get('synthetic_event') or False
for view in result['views']:
view['synthetic_event'] = view.get('synthetic_event') or False
view['has_interaction'] = view.get('has_interaction') or False
view['interaction'] = view.get('interaction') or []
for element in result['id_syncs']:
element['synthetic_event'] = element.get('synthetic_event') or False
#element['tag_url'] = element.get('tag_url')
#element['page_url'] = element.get('page_url')
#element['page_referer_url'] = element.get('page_referer_url')
for other in result['other_event']:
other['synthetic_event'] = other.get('synthetic_event') or False
return [result]
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
output_schema = aq_utils.load_schema("amdp_schemas-v6.1.1.yaml", 'datasubject_collated')
return output_schema, dat_hnd