import aq_utils
import schema
import time
import datetime
import copy
import os
invalid_ids = set(['NA'])
current_ts = int(time.time() * 1000)
run_id = os.environ.get('BATCH_ID', '1234')
if run_id is None:
raise Exception("batch_id not found")
lookback_days = 0 # variable to be set to match input lookback window
lookback_date = datetime.datetime.utcnow().date() - datetime.timedelta(days=lookback_days)
def isValid(e):
return e is not None and e != '' and e != 'NA'
def non_empty(list):
return len(list) > 0
def sortByTimestamp(evt):
evt.sort(key=lambda x: x['event_timestamp'])
def update_key_if_valid(result, key, value):
if not isValid(result.get(key)):
result[key] = value
def update_events(rec, result, event_groups):
event_types = {'attributes', 'conversions', 'views', 'geo_events', 'poi_visits', 'other_event'}
result['advertiser_id'] = rec.get('advertiser_id', "")
impressions_dict = {}
invalid_impressions = []
for e in event_types:
result[e] = []
input_events = rec['events']
for evt in input_events:
for e in event_types:
result[e].extend(evt[e])
for imp in evt['impressions']:
imp_id = imp['event_id']
if imp_id in invalid_ids:
invalid_impressions.append(imp)
else:
if imp_id not in impressions_dict:
impressions_dict[imp_id] = imp
else:
imp_event = impressions_dict[imp_id]
if imp_event['synthetic_event'] == True and imp['synthetic_event'] == False:
imp['clicks'].extend(imp_event['clicks'])
imp['engagements'].extend(imp_event['engagements'])
imp_event = imp
impressions_dict[imp_id] = imp_event
else:
imp_event['clicks'].extend(imp['clicks'])
imp_event['engagements'].extend(imp['engagements'])
imp_event['others'].update(imp['others'])
impressions_dict[imp_id]['has_click'] = non_empty(impressions_dict[imp_id]['clicks'])
impressions_dict[imp_id]['has_engagement'] = non_empty(impressions_dict[imp_id]['engagements'])
if impressions_dict[imp_id]['has_click_conversion'] is None or not impressions_dict[imp_id][
'has_click_conversion']:
impressions_dict[imp_id]['has_click_conversion'] = imp['has_click_conversion']
if impressions_dict[imp_id]['viewable'] is None or not impressions_dict[imp_id]['viewable']:
impressions_dict[imp_id]['viewable'] = imp['viewable']
sortByTimestamp(impressions_dict[imp_id]['clicks'])
if "engagements" in impressions_dict[imp_id]:
sortByTimestamp(impressions_dict[imp_id]['engagements'])
for event_group in evt['event_groups']:
event_id = event_group['event_id']
if event_id not in event_groups:
event_groups[event_id] = event_group
result['impressions'] = list(impressions_dict.values())
result['impressions'].extend(invalid_impressions)
sortByTimestamp(result['impressions'])
for e in event_types:
sortByTimestamp(result[e])
def split_by_date(rec, event_groups):
event_types = {'impressions', 'conversions', 'views', 'geo_events', 'poi_visits', 'other_event'}
results = []
recsdict = {}
# create a parent rec without events
parentrec = {}
for i in rec:
if i in event_types:
parentrec[i] = []
else:
parentrec[i] = rec[i]
# sort and split by event timestamp
for e in event_types:
for evt in rec[e]:
evt_date = datetime.datetime.utcfromtimestamp(int(str(evt['event_timestamp'])[0:10])).strftime('%Y%m%d')
if evt_date not in recsdict:
recsdict[evt_date] = copy.deepcopy(parentrec)
# add event group by event_group_id if not exists
if event_groups.get(evt['event_group_id']) not in recsdict[evt_date]['event_groups']:
recsdict[evt_date]['event_groups'].append(event_groups.get(evt['event_group_id']))
recsdict[evt_date][e].append(evt)
for dt, evt in recsdict.items():
evt['event_date'] = dt
# set flags
evt['has_impressions'] = non_empty(evt['impressions'])
evt['has_conversions'] = non_empty(evt['conversions'])
evt['has_views'] = non_empty(evt['views'])
evt['has_geo_events'] = non_empty(evt['geo_events'])
evt['has_poi_visits'] = non_empty(evt['poi_visits'])
evt['has_other_events'] = non_empty(evt['other_event'])
# if any of the impressions has a true 'has_click' or 'has_engagement' flag, we can set the global booleans = True
evt['has_clicks'] = False
evt['has_engagements'] = False
evt['has_interactions'] = False ## TODO calculate this later as part of interaction subevent
for i in evt['impressions']:
if i['has_click'] == True:
evt['has_clicks'] = True
if i['has_engagement'] == True:
evt['has_engagements'] = True
# If lookback_days == 0, Set beyond_lookback = True
if datetime.datetime.strptime(dt, "%Y%m%d").date() < lookback_date or lookback_days == 0:
# dont overwrite - this is beyond the lookback window
evt['beyond_lookback'] = True
else:
# overwrite - this is within the lookback window
evt['beyond_lookback'] = False
results.append(evt)
update_event_groups(results)
return results
def update_event_groups(results):
for result in results:
event_types = {'impressions', 'conversions', 'views', 'geo_events', 'poi_visits', 'other_event'}
events_dict = {}
for e in event_types:
for evt in result[e]:
if evt['event_group_id'] not in events_dict:
events_dict[evt['event_group_id']] = [evt]
else:
events_dict[evt['event_group_id']].append(evt)
for event_group in result['event_groups']:
events = events_dict[event_group['event_id']]
sortByTimestamp(events)
event_group['event_timestamp'] = events[0]['event_timestamp']
event_group['first_event_id'] = events[0]['event_id']
event_group['last_timestamp'] = events[-1]['event_timestamp']
event_group['events_count'] = len(events)
# if this events ts is earlier than the data_subject
# level ts, replace data_subject with this event ts
if result['event_window_start'] is None or event_group['event_timestamp'] < result['event_window_start']:
result['event_window_start'] = event_group['event_timestamp']
def dat_hnd(rec: any) -> any:
result = {}
results = []
event_groups = {}
result['entity_id'] = rec['entity_id']
result['batch_id'] = run_id
result['event_window_start'] = None # calculated with event groups
result['event_window_duration'] = '1 Day'
#result['entity_id_bucket'] = rec['entity_id_bucket']
result['event_groups'] = []
update_events(rec, result, event_groups)
results = split_by_date(result, event_groups)
return results
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
output_schema = aq_utils.load_schema("amdp_schemas-v5.0.1.yaml", 'datasubject_collated')
#schema.add_fields(output_schema, schema.Field(name="entity_id_bucket", schema=schema.StringSchema()))
schema.add_fields(output_schema, schema.Field(name="beyond_lookback", schema=schema.BooleanSchema()))
schema.add_fields(output_schema, schema.Field(name="batch_id", schema=schema.StringSchema()))
return output_schema, dat_hnd