############################
# SETUP
# 1) Set work area, inputs, outputs
# NOTES
# schema for interactions is complete but logic will be completed as needed
# metrics, others, and other_entity_ids will be shared across all dates since we dont know which specific date they apply to
############################
work_area:
separate_transform_stage: true
inputs:
- name: events
locations:
# Output of this job (collate-merge)
- folder:
# Output of collate-sort
- folder:
capture_partition_keys:
- tenant_id
- entity_type
- entity_domain
- entity_id_bucket
key:
- tenant_id
- entity_id
- entity_type
- entity_domain
- entity_id_bucket
transform:
- python:
code: |
import schema
import time
import datetime
import copy
import os
invalid_ids = set(['NA'])
current_ts = int(time.time() * 1000)
run_id = os.environ.get('BATCH_ID')
if run_id is None:
raise Exception("batch_id not found")
lookback_days = 3 # variable to be set to match input lookback window
lookback_date = datetime.datetime.utcnow().date() - datetime.timedelta(days = lookback_days)
def isValid(e):
return e is not None and e != '' and e != 'NA'
def non_empty(list):
return len(list) > 0
def sortByTimestamp(evt):
evt.sort(key=lambda x: x['event_timestamp'])
def update_key_if_valid(result, key, value):
if not isValid(result.get(key)):
result[key] = value
def update_events(rec, result, event_groups):
event_types = set(['Attributes', 'Conversions', 'Views', 'GeoEvents', 'POIVisits', 'OtherEvents'])
result['advertiser_id'] = rec.get('advertiser_id)')
impressions_dict = {}
invalid_impressions = []
for e in event_types:
result[e] = []
input_events = rec['events']
for evt in input_events:
for e in event_types:
result[e].extend(evt[e])
for imp in evt['Impressions']:
imp_id = imp['event_id']
if imp_id in invalid_ids:
invalid_impressions.append(imp)
else:
if imp_id not in impressions_dict:
impressions_dict[imp_id] = imp
else:
imp_event = impressions_dict[imp_id]
if imp_event['synthetic_event'] == True and imp['synthetic_event'] == False:
imp['clicks'].extend(imp_event['clicks'])
imp['engagements'].extend(imp_event['engagements'])
imp_event = imp
impressions_dict[imp_id] = imp_event
else:
imp_event['clicks'].extend(imp['clicks'])
imp_event['engagements'].extend(imp['engagements'])
imp_event['others'].update(imp['others'])
impressions_dict[imp_id]['has_click'] = non_empty(impressions_dict[imp_id]['clicks'])
impressions_dict[imp_id]['has_engagement'] = non_empty(impressions_dict[imp_id]['engagements'])
if impressions_dict[imp_id]['has_click_conversion'] is None or not impressions_dict[imp_id]['has_click_conversion']:
impressions_dict[imp_id]['has_click_conversion'] = imp['has_click_conversion']
if impressions_dict[imp_id]['viewable'] is None or not impressions_dict[imp_id]['viewable']:
impressions_dict[imp_id]['viewable'] = imp['viewable']
sortByTimestamp(impressions_dict[imp_id]['clicks'])
sortByTimestamp(impressions_dict[imp_id]['engagements'])
for event_group in evt['EventGroups']:
event_id = event_group['event_id']
if event_id not in event_groups:
event_groups[event_id] = event_group
result['Impressions'] = list(impressions_dict.values())
result['Impressions'].extend(invalid_impressions)
sortByTimestamp(result['Impressions'])
for e in event_types:
sortByTimestamp(result[e])
def split_by_date(rec, event_groups):
event_types = set(['Impressions', 'Conversions', 'Views', 'GeoEvents', 'POIVisits', 'OtherEvents'])
results = []
recsdict = {}
# create a parent rec without events
parentrec = {}
for i in rec:
if i in event_types:
parentrec[i] = []
else:
parentrec[i] = rec[i]
# sort and split by event timestamp
for e in event_types:
for evt in rec[e]:
evt_date = datetime.datetime.utcfromtimestamp(int(str(evt['event_timestamp'])[0:10])).strftime('%Y%m%d')
if evt_date not in recsdict:
recsdict[evt_date] = copy.deepcopy(parentrec)
# add event group by event_group_id if not exists
if event_groups.get(evt['event_group_id']) not in recsdict[evt_date]['EventGroups']:
recsdict[evt_date]['EventGroups'].append(event_groups.get(evt['event_group_id']))
recsdict[evt_date][e].append(evt)
for dt, evt in recsdict.items():
evt['event_date'] = dt
# set flags
evt['has_impressions'] = non_empty(evt['Impressions'])
evt['has_conversions'] = non_empty(evt['Conversions'])
evt['has_views'] = non_empty(evt['Views'])
evt['has_geo_events'] = non_empty(evt['GeoEvents'])
evt['has_poi_visits'] = non_empty(evt['POIVisits'])
evt['has_other_events'] = non_empty(evt['OtherEvents'])
# if any of the impressions has a true 'has_click' or 'has_engagement' flag, we can set the global booleans = True
evt['has_clicks'] = False
evt['has_engagements'] = False
evt['has_interactions'] = False ## TODO calculate this later as part of interaction subevent
for i in evt['Impressions']:
if i['has_click'] == True:
evt['has_clicks'] = True
if i['has_engagement'] == True:
evt['has_engagements'] = True
if datetime.datetime.strptime(dt, "%Y%m%d").date() < lookback_date:
# dont overwrite - this is beyond the lookback window
evt['beyond_lookback'] = True
else:
# overwrite - this is within the lookback window
evt['beyond_lookback'] = False
results.append(evt)
update_event_groups(results)
return results
def update_event_groups(results):
for result in results:
event_types = set(['Impressions', 'Conversions', 'Views', 'GeoEvents', 'POIVisits', 'OtherEvents'])
events_dict = {}
for e in event_types:
for evt in result[e]:
if evt['event_group_id'] not in events_dict:
events_dict[evt['event_group_id']] = [evt]
else:
events_dict[evt['event_group_id']].append(evt)
for event_group in result['EventGroups']:
events = events_dict[event_group['event_id']]
sortByTimestamp(events)
event_group['event_timestamp'] = events[0]['event_timestamp']
event_group['first_event_id'] = events[0]['event_id']
event_group['last_timestamp'] = events[-1]['event_timestamp']
event_group['events_count'] = len(events)
# if this events ts is earlier than the data_subject
# level ts, replace data_subject with this event ts
if result['event_window_start'] is None or event_group['event_timestamp'] < result['event_window_start']:
result['event_window_start'] = event_group['event_timestamp']
def dat_hnd(rec: any) -> any:
print('start rec *********************************************')
print(rec)
print('end rec *********************************************')
result = {}
results = []
event_groups = {}
result['entity_id'] = rec['entity_id']
result['batch_id'] = run_id
result['event_window_start'] = None # calculated with event groups
result['event_window_duration'] = '1 Day'
result['tenant_id'] = rec['tenant_id']
result['entity_type'] = rec['entity_type']
result['entity_domain'] = rec['entity_domain']
result['entity_id_bucket'] = rec['entity_id_bucket']
result['EventGroups'] = []
update_events(rec, result, event_groups)
results = split_by_date(result, event_groups)
return results
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
out_sch = """
{ "type": "record", "name": "aqfer_data_subject_collated", "fields": [
{ "name": "event_date", "type": "string" },
{ "name": "batch_id", "type": "string" },
{ "name": "tenant_id", "type": "string" },
{ "name": "entity_type", "type": "string" },
{ "name": "entity_domain", "type": "string" },
{ "name": "entity_id_bucket", "type": "string" },
{ "name": "entity_id", "type": "string" },
{ "name": "event_window_start", "type": ["long", "null"] },
{ "name": "event_window_duration", "type": ["string", "null"] },
{ "name": "has_impressions", "type": "boolean" },
{ "name": "has_clicks", "type": "boolean" },
{ "name": "has_engagements", "type": "boolean" },
{ "name": "has_conversions", "type": "boolean" },
{ "name": "has_views", "type": "boolean" },
{ "name": "has_interactions", "type": "boolean" },
{ "name": "has_geo_events", "type": "boolean" },
{ "name": "has_poi_visits", "type": "boolean" },
{ "name": "has_other_events", "type": "boolean" },
{ "name": "Attributes", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_attribute", "fields": [
{ "name": "confidence", "type": ["int", "null"] },
{ "name": "data_type", "type": "string" },
{ "name": "global_region", "type": "string" },
{ "name": "magnitude", "type": ["int", "null"] },
{ "name": "modeled", "type": "boolean" },
{ "name": "method", "type": "boolean" },
{ "name": "name", "type": "string" },
{ "name": "score", "type": ["int", "null"] },
{ "name": "source", "type": "string" },
{ "name": "source_attribute_name", "type": "string" },
{ "name": "source_attribute_value", "type": "string" },
{ "name": "value", "type": "string" },
{ "name": "others", "type": { "type": "map", "values": "string" }}
]}
}
},
{ "name": "EventGroups", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_event_group", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "first_event_id", "type": "string" },
{ "name": "last_timestamp", "type": "long" },
{ "name": "source", "type": "string" },
{ "name": "events_count", "type": ["int", "null"] },
{ "name": "browser", "type": "string" },
{ "name": "browser_version", "type": "string" },
{ "name": "device_type", "type": "string" },
{ "name": "hashed_ip", "type": "string" },
{ "name": "network", "type": "string" },
{ "name": "network_type", "type": "string" },
{ "name": "os", "type": "string" },
{ "name": "os_version", "type": "string" },
{ "name": "throughput", "type": "string" },
{ "name": "user_agent", "type": "string" },
{ "name": "area_code", "type": "string" },
{ "name": "city", "type": "string" },
{ "name": "country_code", "type": "string" },
{ "name": "dma", "type": "string" },
{ "name": "fips", "type": "string" },
{ "name": "global_region", "type": "string" },
{ "name": "latitude", "type": "string" },
{ "name": "longitude", "type": "string" },
{ "name": "msa", "type": "string" },
{ "name": "region_code", "type": "string" },
{ "name": "timezone", "type": "string" },
{ "name": "zip", "type": "string" },
{ "name": "policy", "type": { "type": "array", "items":
{ "type": "record", "name": "policy", "fields": [
{ "name": "policy_id", "type": "string" },
{ "name": "policy_effective_date", "type": "string" },
{ "name": "effectivity", "type": "string" },
{ "name": "policy_detail", "type": "string" }]}}},
{ "name": "other_geo", "type":{ "type": "map", "values": "string" }}
]}
}
},
{ "name": "Conversions", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_conversion", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "page_url", "type": [ { "type": "record", "name": "url_struct_1", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_2", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "synthetic_event", "type": "boolean" },
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_3", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type": { "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "ad_group_id", "type": "string" },
{ "name": "advertiser_id", "type": "string" },
{ "name": "agency_id", "type": "string" },
{ "name": "campaign_id", "type": "string" },
{ "name": "creative_id", "type": "string" },
{ "name": "placement_id", "type": "string" },
{ "name": "ad_url", "type": [ { "type": "record", "name": "url_struct_4", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string"}} ]}, "null"]},
{ "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }},
{ "name": "keywords", "type": "string" },
{ "name": "inventory_partner", "type": "string" },
{ "name": "media_partner", "type": "string" },
{ "name": "search_phase", "type": "string" },
{ "name": "search_terms", "type": "string" },
{ "name": "site_id", "type": "string" },
{ "name": "supply_vendor_publisher_id", "type": "string" },
{ "name": "other_media_group_levels", "type":{ "type": "map", "values": "string" }}
]}
}
},
{ "name": "Views", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_view", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "page_url", "type": [ { "type": "record", "name": "url_struct_5", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_6", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "synthetic_event", "type": "boolean" },
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_7", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type":{ "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "advertiser_id", "type": "string" },
{ "name": "agency_id", "type": "string" },
{ "name": "has_interaction", "type": [ "boolean", "null" ] },
{ "name": "Interactions", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_interaction", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "page_url", "type": [ { "type": "record", "name": "url_struct_5", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_6", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "synthetic_event", "type": "boolean" },
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_7", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type":{ "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "advertiser_id", "type": "string" },
{ "name": "agency_id", "type": "string" }
]}
}
}
]}
}
},
{ "name": "GeoEvents", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_geo_event", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type":{ "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "horizontal_accuracy", "type": ["int", "null"] },
{ "name": "device_id_type", "type": "string" }
]}
}
},
{ "name": "POIVisits", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_poi_visit", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type":{ "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "poi_id", "type": "string" },
{ "name": "minimum_duration", "type": ["int", "null"] },
{ "name": "device_id_type", "type": "string" }
]}
}
},
{ "name": "OtherEvents", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_event", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "page_url", "type": [ { "type": "record", "name": "url_struct_8", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_2", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "synthetic_event", "type": "boolean" },
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_9", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type": { "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "ad_group_id", "type": "string" },
{ "name": "advertiser_id", "type": "string" },
{ "name": "agency_id", "type": "string" },
{ "name": "campaign_id", "type": "string" },
{ "name": "creative_id", "type": "string" },
{ "name": "placement_id", "type": "string" },
{ "name": "ad_url", "type": [ { "type": "record", "name": "url_struct_4", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string"}} ]}, "null"]},
{ "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }}
]}
}
},
{ "name": "Impressions", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_impression", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_timestamp", "type": "long" },
{ "name": "event_group_id", "type": "string" },
{ "name": "source", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "page_url", "type": [ { "type": "record", "name": "url_struct_8", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_9", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "synthetic_event", "type": "boolean" },
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_10", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "other_entity_keys", "type":
{ "type": "array", "items":
{ "type": "record", "name": "entity_struct_1", "fields": [
{ "name": "entity_type", "type": [ "string", "null" ] },
{ "name": "entity_domain", "type": [ "string", "null" ] },
{ "name": "entity_id", "type": [ "string", "null" ] }
]}}},
{ "name": "metrics", "type": { "type": "map", "values": "string" }},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "ad_group_id", "type": "string" },
{ "name": "advertiser_id", "type": "string" },
{ "name": "agency_id", "type": "string" },
{ "name": "campaign_id", "type": "string" },
{ "name": "creative_id", "type": "string" },
{ "name": "placement_id", "type": "string" },
{ "name": "ad_url", "type": [ { "type": "record", "name": "url_struct_11", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string"}} ]}, "null"]},
{ "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }},
{ "name": "keywords", "type": "string" },
{ "name": "inventory_partner", "type": "string" },
{ "name": "media_partner", "type": "string" },
{ "name": "search_phase", "type": "string" },
{ "name": "search_terms", "type": "string" },
{ "name": "site_id", "type": "string" },
{ "name": "supply_vendor_publisher_id", "type": "string" },
{ "name": "other_media_group_levels", "type":{ "type": "map", "values": "string" }},
{ "name": "cpm_cost", "type": "int" },
{ "name": "cpm_currency", "type": [ "string", "null" ] },
{ "name": "cpm_paid", "type": "int" },
{ "name": "has_click_conversion", "type": [ "boolean", "null" ] },
{ "name": "has_click", "type": [ "boolean", "null" ] },
{ "name": "has_engagement", "type": [ "boolean", "null" ] },
{ "name": "viewable", "type": [ "boolean", "null" ] },
{ "name": "Clicks", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_click", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_12", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "metrics", "type":{ "type": "map", "values": "string" }},
{ "name": "cpc_paid", "type": [ "int", "null" ] },
{ "name": "cpc_cost", "type": [ "int", "null" ] },
{ "name": "video_pct", "type": [ "int", "null" ] },
{ "name": "destination_url", "type": [ { "type": "record", "name": "url_struct_13", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]}
]}
}
},
{ "name": "Engagements", "type":
{ "type": "array", "items":
{ "type": "record", "name": "aqfer_engagement", "fields": [
{ "name": "event_id", "type": "string" },
{ "name": "event_actions", "type":{ "type": "array", "items": "string" }},
{ "name": "event_subtype_1", "type": [ "string", "null" ]},
{ "name": "event_subtype_2", "type": [ "string", "null" ]},
{ "name": "http_status_code", "type": "string" },
{ "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
{ "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_14", "fields": [
{ "name": "scheme", "type": [ "string", "null" ] },
{ "name": "host", "type": [ "string", "null" ] },
{ "name": "path", "type": [ "string", "null" ] },
{ "name": "query", "type":
{ "type": "map", "values": "string" }}]}, "null"]},
{ "name": "others", "type":{ "type": "map", "values": "string" }},
{ "name": "metrics", "type":{ "type": "map", "values": "string" }},
{ "name": "cpe_paid", "type": [ "int", "null" ] },
{ "name": "cpe_cost", "type": [ "int", "null" ] },
{ "name": "engagement_actions", "type":
{ "type": "array", "items": "string" }
}
]}
}
}
]}
}
},
{ "name": "beyond_lookback", "type": "boolean" }
]}
"""
return schema.parse(out_sch), dat_hnd
outputs:
## Output Enhancements:
# 1) After AQ-7888 is completed, remove field name list from project
## UPDATE AVRO DESTINATION TO VTMS FOR PROD ##
# event date is within lookback window - avro
- destination: /new-schema-pipeline-test/collate-merge/collate-merge/avro
project:
'.':
omit_if: beyond_lookback
field_names: [event_date, tenant_id, entity_type, entity_domain, entity_id_bucket, entity_id, event_window_start, event_window_duration, has_impressions, has_clicks, has_engagements, has_conversions, has_views, has_interactions, has_geo_events, has_poi_visits, has_other_events, Attributes, EventGroups, Conversions, Views, GeoEvents, POIVisits, OtherEvents, Impressions]
split_by:
key:
- tenant_id
- entity_type
- entity_domain
- event_date
- entity_id_bucket
strip_key: true
# event date is within lookback window - parquet
- destination: /new-schema-pipeline-test/collate-merge/parquet
format: parquet
project:
'.':
omit_if: beyond_lookback
field_names: [event_date, tenant_id, entity_type, entity_domain, entity_id, event_window_start, event_window_duration, has_impressions, has_clicks, has_engagements, has_conversions, has_views, has_interactions, has_geo_events, has_poi_visits, has_other_events, Attributes, EventGroups, Conversions, Views, GeoEvents, POIVisits, OtherEvents, Impressions]
split_by:
key:
- tenant_id
- entity_type
- entity_domain
- event_date
strip_key: true
# event date is beyond lookback window - parquet
- destination: /new-schema-pipeline-test/collate-merge/parquet_beyond
format: parquet
project:
'.':
omit_unless: beyond_lookback
field_names: [event_date, tenant_id, entity_type, entity_domain, batch_id, entity_id, event_window_start, event_window_duration, has_impressions, has_clicks, has_engagements, has_conversions, has_views, has_interactions, has_geo_events, has_poi_visits, has_other_events, Attributes, EventGroups, Conversions, Views, GeoEvents, POIVisits, OtherEvents, Impressions]
split_by:
key:
- tenant_id
- entity_type
- entity_domain
- event_date
- batch_id
strip_key: true
split:
bucket_size_bytes: 2147483648 # 2 GB
sort:
num_partition_keys: 0
key_names:
- tenant_id
- entity_type
- entity_domain
- batch_id
- entity_id
project:
'.':
field_names: [tenant_id, entity_type, entity_domain, entity_id]
capacity:
split_stage:
in_proc_executor:
num_workers: 12
collate_stage:
in_proc_executor:
num_workers: 32
sort_stage:
in_proc_executor:
num_workers: 4
merge_plan_stage:
in_proc_executor:
num_workers: 30
merge_do_stage:
in_proc_executor:
num_workers: 100
transform_stage:
in_proc_executor:
num_workers: 40