############################
# SETUP
# 1) [OPS] set batch_id
# 2) [OPS] set bucket count for sharding
# 3) Set work area, inputs, outputs
# NOTES
# schema for interactions is complete but logic will be completed as needed
############################
work_area
separate_transform_stage true
# input keys should be among Attributes, EventGroups, Conversions, Views, GeoEvents POIVisit, OtherEvents, Impressions, Clicks, Engagements
inputs
# - name Clicks
# locations
# - folder
# capture_partition_keys
# - global_region
# - entity_type
# - entity_domain
# key
# - global_region
# - entity_type
# - entity_domain
# - entity_id
# - advertiser_id
# - name Conversions
# locations
# - folder
# capture_partition_keys
# - entity_type
# - entity_domain
# key
# - entity_type
# - entity_domain
# - entity_id
# - advertiser_id
# transform
# - rename_schema
# http_schema http_schema_conversions
# top_level top_level_conversions
transform
- python
code
import schema
import time
import hashlib
invalid_ids = set(['NA'])
current_ts = int(time.time() 1000)
def isValid(cc)
return cc is not None and cc != '' and cc != 'NA'
def non_empty(list)
return len(list) 0
def sortByTimestamp(evt)
evt.sort(key=lambda x x['event_timestamp'])
def set_event_group(evt, result, retain_event_id bool)
event_groups = result['EventGroups']
event_group = {}
event_group['event_id'] = ''
# these fields will be calculated at the end of collate-merge
event_group['event_timestamp'] = 0
event_group['first_event_id'] = ''
event_group['last_timestamp'] = 0
event_group['events_count'] = 0
event_group['source'] = evt['source']
event_group['browser'] = evt['browser']
event_group['browser_version'] = evt['browser_version']
event_group['device_type'] = evt['device_type']
event_group['hashed_ip'] = evt['hashed_ip']
event_group['network'] = evt['network']
event_group['network_type'] = evt['network_type']
event_group['os'] = evt['os']
event_group['os_version'] = evt['os_version']
event_group['throughput'] = evt['throughput']
event_group['user_agent'] = evt['user_agent']
event_group['area_code'] = evt['area_code']
event_group['city'] = evt['city']
event_group['country_code'] = evt['country_code']
event_group['dma'] = evt['dma']
event_group['fips'] = evt['fips']
event_group['global_region'] = evt['global_region']
event_group['latitude'] = evt['latitude']
event_group['longitude'] = evt['longitude']
event_group['msa'] = evt['msa']
event_group['region_code'] = evt['region_code']
event_group['timezone'] = evt['timezone']
event_group['zip'] = evt['zip']
#event_policy = evt['policy']
event_group['policy'] = [evt['policy']]
event_group['other_geo'] = dict(sorted(evt['other_geo'].items()))
if retain_event_id
event_group['event_id'] = evt['event_id']
else
event_group['event_id'] = hashlib.sha256(str(event_group.items()).encode()).hexdigest()
evt['event_group_id'] = event_group['event_id']
if event_group not in event_groups
result['EventGroups'].append(event_group)
def update_events(rec, result, key, advertiser_id)
result[key] = []
input_events = rec.get(key, [])
for input_event in input_events
set_event_group(input_event, result, 0)
input_event['advertiser_id'] = advertiser_id
input_event['policy'] = []
result[key].append(input_event)
sortByTimestamp(result[key])
def get_clicks_dict(rec, result, subevents_dict, advertiser_id)
invalid_clicks = []
clicks = rec.get('clicks', [])
for clk in clicks
set_event_group(clk, result, 0)
clk['advertiser_id'] = advertiser_id
imp_id = clk['parent_event_id']
clk_id = clk['event_id']
orhan_imp = {}
orhan_imp = clk
# orhan_imp['event_type'] = 'imp'
orhan_imp['event_id'] = imp_id
orhan_imp['has_click'] = True
result['has_clicks'] = True
orhan_imp['has_engagement'] = None
orhan_imp['viewable'] = None
orhan_imp['synthetic_event'] = True
clk_subevent = {}
clk_subevent['event_timestamp'] = orhan_imp['event_timestamp']
clk_subevent['agency_id'] = orhan_imp['agency_id']
clk_subevent['event_id'] = clk_id #changed from click_id to event_id
clk_subevent['page_url'] = orhan_imp['page_url']
clk_subevent['page_referer_url'] = orhan_imp['page_referer_url']
clk_subevent['destination_url'] = orhan_imp['destination_url']
clk_subevent['cpc_paid'] = orhan_imp['cpc_paid']
clk_subevent['cpc_cost'] = orhan_imp['cpc_cost']
clk_subevent['other_event_ids'] = orhan_imp['other_event_ids']
clk_subevent['metrics'] = orhan_imp['metrics']
clk_subevent['others'] = orhan_imp['others']
orhan_imp['clicks'] = [clk_subevent]
orhan_imp['engagements'] = []
del orhan_imp['destination_url']
del orhan_imp['cpc_paid']
del orhan_imp['cpc_cost']
if imp_id in invalid_ids
invalid_clicks.append(orhan_imp)
else
ex_clk = subevents_dict.get(imp_id)
if ex_clk is not None
ex_clk['clicks'].append(clk_subevent)
ex_clk['has_click'] = True
ex_clk['has_click_conversion'] = ex_clk['has_click_conversion'] or orhan_imp['has_click_conversion']
subevents_dict[imp_id] = ex_clk
else
subevents_dict[imp_id] = orhan_imp
return subevents_dict, invalid_clicks
def get_engagement_dict(rec, result, subevents_dict, advertiser_id)
invalid_engagements = []
engagements = rec.get('engagements', [])
for eng in engagements
set_event_group(eng, result, 0)
eng['advertiser_id'] = advertiser_id
imp_id = eng['parent_event_id']
orhan_imp = {}
orhan_imp = eng
#orhan_imp['event_type'] = 'impr'
orhan_imp['event_id'] = imp_id
orhan_imp['has_click_conversion'] = None
orhan_imp['has_click'] = None
orhan_imp['has_engagement'] = True
result['has_engagements'] = True
orhan_imp['viewable'] = True
orhan_imp['synthetic_event'] = True
eng_subevent = {}
eng_subevent['event_timestamp'] = orhan_imp['event_timestamp']
#eng_subevent['event_type'] = orhan_imp['event_type']
eng_subevent['event_subtype_1'] = orhan_imp['event_subtype_1']
eng_subevent['cpc_paid'] = orhan_imp['cpc_paid']
eng_subevent['cpc_cost'] = orhan_imp['cpc_cost']
eng_subevent['cpe_paid'] = orhan_imp['cpe_paid']
eng_subevent['cpe_cost'] = orhan_imp['cpe_cost']
eng_subevent['engagement_actions'] = orhan_imp['engagement_actions']
eng_subevent['metrics'] = orhan_imp['metrics']
eng_subevent['others'] = orhan_imp['others']
orhan_imp['clicks'] = []
orhan_imp['engagements'] = [eng_subevent]
del orhan_imp['engagement_actions']
#del orhan_imp['event_type']
del orhan_imp['event_subtype_1']
del orhan_imp['cpc_paid']
del orhan_imp['cpc_cost']
del orhan_imp['cpe_paid']
del orhan_imp['cpe_cost']
if imp_id in invalid_ids
invalid_engagements.append(orhan_imp)
else
ex_eng = subevents_dict.get(imp_id)
if ex_eng is not None
ex_eng['engagements'].append(eng_subevent)
else
subevents_dict[imp_id] = orhan_imp
return subevents_dict, invalid_engagements
def get_impressions_dict(rec, result, advertiser_id)
impressions_dict = {}
impressions = rec.get('Impressions', [])
for inp_imp in impressions
inp_imp['advertiser_id'] = advertiser_id
set_event_group(inp_imp, result, 0)
imp_id = inp_imp['event_id']
if imp_id not in impressions_dict
inp_imp['clicks'] = []
inp_imp['engagements'] = []
inp_imp['viewable'] = None
impressions_dict[imp_id] = inp_imp
else
impressions_dict[imp_id]['others'].update(inp_imp['others'])
return impressions_dict
def subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
result['has_clicks'] = False
result['has_engagements'] = False
impressions_dict = get_impressions_dict(rec, result, advertiser_id)
[subevents_dict, invalid_subevents] = get_engagement_dict(rec, result, {}, advertiser_id)
[subevents_dict, invalid_clicks] = get_clicks_dict(rec, result, subevents_dict, advertiser_id)
invalid_subevents.extend(invalid_clicks)
# insert subevents into parent impression
impressions = []
for imp_id in impressions_dict
impression = impressions_dict[imp_id]
impression['synthetic_event'] = False
se = subevents_dict.pop(imp_id, None)
if se is not None
impression['Clicks'].extend(se['Clicks'])
impression['Engagements'].extend(se['Engagements'])
impression['has_click'] = non_empty(impression['Clicks'])
result['has_clicks'] = result['has_clicks'] or impression['has_click']
impression['has_engagement'] = non_empty(impression['Engagements'])
result['has_engagements'] = result['has_engagements'] or impression['has_engagement']
if impression['has_click_conversion'] is None or not impression['has_click_conversion']
impression['has_click_conversion'] = se['has_click_conversion']
if impression['viewable'] is None or not impression['viewable']
impression['viewable'] = se['viewable']
impressions.append(impression)
impressions.extend(invalid_subevents)
impressions.extend(subevents_dict.values())
sortByTimestamp(impressions)
result['Impressions'] = impressions
def dat_hnd(rec any) - any
print(start rec)
print(rec)
print(end rec)
result = {}
advertiser_id = rec.get('advertiser_id)')
result['entity_id'] = rec['entity_id']
result['event_window_start'] = None # handled in collate-merge
result['event_window_duration'] = '1 Hour'
#result['has_clicks'] = None
#result['has_engagements'] = None
#result['has_conversion'] = None
result['EventGroups'] = []
# if event_groups are an input we use that data,
# otherwise calculate based on other inputs
input_event_groups = rec.get('EventGroups', [])
for input_event_group in input_event_groups
set_event_group(input_event, result, 1)
update_events(rec, result, 'Conversions', advertiser_id)
update_events(rec, result, 'Views', advertiser_id)
update_events(rec, result, 'GeoEvents', advertiser_id)
update_events(rec, result, 'POIVisits', advertiser_id)
update_events(rec, result, 'Attributes', advertiser_id)
update_events(rec, result, 'Interactions', advertiser_id)
update_events(rec, result, 'OtherEvents', advertiser_id)
subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
result['has_impressions'] = non_empty(result['Impressions'])
result['has_conversions'] = non_empty(result['Conversions'])
result['has_views'] = non_empty(result['Views'])
result['has_geo_events'] = non_empty(result['GeoEvents'])
result['has_poi_visits'] = non_empty(result['POIVisits'])
result['has_interactions'] = non_empty(result['Interactions'])
result['has_other_events'] = non_empty(result['OtherEvents'])
print('start result ')
print(result)
print('end result ')
return [result]
def schema_handler_v1(sch schema.Schema) - (schema.Schema, any)
out_sch =
{ type record, name aqfer_data_subject_collated, fields [
{ name entity_id, type string },
{ name event_window_start, type [long, null] },
{ name event_window_duration, type [string, null] },
{ name has_impressions, type boolean },
{ name has_clicks, type boolean },
{ name has_engagements, type boolean },
{ name has_conversions, type boolean },
{ name has_views, type boolean },
{ name has_interactions, type boolean },
{ name has_geo_events, type boolean },
{ name has_poi_visits, type boolean },
{ name has_other_events, type boolean },
{ name Attributes, type
{ type array, items
{ type record, name aqfer_attribute, fields [
{ name confidence, type [int, null] },
{ name data_type, type string },
{ name global_region, type string },
{ name magnitude, type [int, null] },
{ name modeled, type boolean },
{ name method, type boolean },
{ name name, type string },
{ name score, type [int, null] },
{ name source, type string },
{ name source_attribute_name, type string },
{ name source_attribute_value, type string },
{ name value, type string },
{ name others, type { type map, values string }}
]}
}
},
{ name EventGroups, type
{ type array, items
{ type record, name aqfer_event_group, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name first_event_id, type string },
{ name last_timestamp, type long },
{ name source, type string },
{ name events_count, type [int, null] },
{ name browser, type string },
{ name browser_version, type string },
{ name device_type, type string },
{ name hashed_ip, type string },
{ name network, type string },
{ name network_type, type string },
{ name os, type string },
{ name os_version, type string },
{ name throughput, type string },
{ name user_agent, type string },
{ name area_code, type string },
{ name city, type string },
{ name country_code, type string },
{ name dma, type string },
{ name fips, type string },
{ name global_region, type string },
{ name latitude, type string },
{ name longitude, type string },
{ name msa, type string },
{ name region_code, type string },
{ name timezone, type string },
{ name zip, type string },
{ name policy, type { type array, items
{ type record, name policy, fields [
{ name policy_id, type string },
{ name policy_effective_date, type string },
{ name effectivity, type string },
{ name policy_detail, type string }]}}},
{ name other_geo, type{ type map, values string }}
]}
}
},
{ name Conversions, type
{ type array, items
{ type record, name aqfer_conversion, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name page_url, type [ { type record, name url_struct_1, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name page_referer_url, type [ { type record, name url_struct_2, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name synthetic_event, type boolean },
{ name tag_url, type [ { type record, name url_struct_3, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type { type map, values string }},
{ name others, type{ type map, values string }},
{ name ad_group_id, type string },
{ name advertiser_id, type string },
{ name agency_id, type string },
{ name campaign_id, type string },
{ name creative_id, type string },
{ name placement_id, type string },
{ name ad_url, type [ { type record, name url_struct_4, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string}} ]}, null]},
{ name other_marketing_program_levels, type{ type map, values string }},
{ name keywords, type string },
{ name inventory_partner, type string },
{ name media_partner, type string },
{ name search_phase, type string },
{ name search_terms, type string },
{ name site_id, type string },
{ name supply_vendor_publisher_id, type string },
{ name other_media_group_levels, type{ type map, values string }}
]}
}
},
{ name Views, type
{ type array, items
{ type record, name aqfer_view, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name page_url, type [ { type record, name url_struct_5, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name page_referer_url, type [ { type record, name url_struct_6, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name synthetic_event, type boolean },
{ name tag_url, type [ { type record, name url_struct_7, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type{ type map, values string }},
{ name others, type{ type map, values string }},
{ name advertiser_id, type string },
{ name agency_id, type string },
{ name has_interaction, type [ boolean, null ] },
{ name Interactions, type
{ type array, items
{ type record, name aqfer_interaction, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name page_url, type [ { type record, name url_struct_5, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name page_referer_url, type [ { type record, name url_struct_6, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name synthetic_event, type boolean },
{ name tag_url, type [ { type record, name url_struct_7, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type{ type map, values string }},
{ name others, type{ type map, values string }},
{ name advertiser_id, type string },
{ name agency_id, type string }
]}
}
}
]}
}
},
{ name GeoEvents, type
{ type array, items
{ type record, name aqfer_geo_event, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type{ type map, values string }},
{ name others, type{ type map, values string }},
{ name horizontal_accuracy, type [int, null] },
{ name device_id_type, type string }
]}
}
},
{ name POIVisits, type
{ type array, items
{ type record, name aqfer_poi_visit, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type{ type map, values string }},
{ name others, type{ type map, values string }},
{ name poi_id, type string },
{ name minimum_duration, type [int, null] },
{ name device_id_type, type string }
]}
}
},
{ name OtherEvents, type
{ type array, items
{ type record, name aqfer_event, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name page_url, type [ { type record, name url_struct_8, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name page_referer_url, type [ { type record, name url_struct_2, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name synthetic_event, type boolean },
{ name tag_url, type [ { type record, name url_struct_9, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type { type map, values string }},
{ name others, type{ type map, values string }},
{ name ad_group_id, type string },
{ name advertiser_id, type string },
{ name agency_id, type string },
{ name campaign_id, type string },
{ name creative_id, type string },
{ name placement_id, type string },
{ name ad_url, type [ { type record, name url_struct_4, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string}} ]}, null]},
{ name other_marketing_program_levels, type{ type map, values string }}
]}
}
},
{ name Impressions, type
{ type array, items
{ type record, name aqfer_impression, fields [
{ name event_id, type string },
{ name event_timestamp, type long },
{ name event_group_id, type string },
{ name source, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name page_url, type [ { type record, name url_struct_10, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name page_referer_url, type [ { type record, name url_struct_9, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name synthetic_event, type boolean },
{ name tag_url, type [ { type record, name url_struct_11, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type { type map, values string }}]}, null]},
{ name other_event_ids, type{ type map, values string }},
{ name other_entity_keys, type
{ type array, items
{ type record, name entity_struct_1, fields [
{ name entity_type, type [ string, null ] },
{ name entity_domain, type [ string, null ] },
{ name entity_id, type [ string, null ] }
]}}},
{ name metrics, type { type map, values string }},
{ name others, type{ type map, values string }},
{ name ad_group_id, type string },
{ name advertiser_id, type string },
{ name agency_id, type string },
{ name campaign_id, type string },
{ name creative_id, type string },
{ name placement_id, type string },
{ name ad_url, type [ { type record, name url_struct_12, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string}} ]}, null]},
{ name other_marketing_program_levels, type{ type map, values string }},
{ name keywords, type string },
{ name inventory_partner, type string },
{ name media_partner, type string },
{ name search_phase, type string },
{ name search_terms, type string },
{ name site_id, type string },
{ name supply_vendor_publisher_id, type string },
{ name other_media_group_levels, type{ type map, values string }},
{ name cpm_cost, type int },
{ name cpm_currency, type [ string, null ] },
{ name cpm_paid, type int },
{ name has_click_conversion, type [ boolean, null ] },
{ name has_click, type [ boolean, null ] },
{ name has_engagement, type [ boolean, null ] },
{ name viewable, type [ boolean, null ] },
{ name Clicks, type
{ type array, items
{ type record, name aqfer_click, fields [
{ name event_id, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name other_event_ids, type{ type map, values string }},
{ name tag_url, type [ { type record, name url_struct_13, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name others, type{ type map, values string }},
{ name metrics, type{ type map, values string }},
{ name cpc_paid, type [ int, null ] },
{ name cpc_cost, type [ int, null ] },
{ name video_pct, type [ int, null ] },
{ name destination_url, type [ { type record, name url_struct_14, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]}
]}
}
},
{ name Engagements, type
{ type array, items
{ type record, name aqfer_engagement, fields [
{ name event_id, type string },
{ name event_actions, type{ type array, items string }},
{ name event_subtype_1, type [ string, null ]},
{ name event_subtype_2, type [ string, null ]},
{ name http_status_code, type string },
{ name other_event_ids, type{ type map, values string }},
{ name tag_url, type [ { type record, name url_struct_15, fields [
{ name scheme, type [ string, null ] },
{ name host, type [ string, null ] },
{ name path, type [ string, null ] },
{ name query, type
{ type map, values string }}]}, null]},
{ name others, type{ type map, values string }},
{ name metrics, type{ type map, values string }},
{ name cpe_paid, type [ int, null ] },
{ name cpe_cost, type [ int, null ] },
{ name engagement_actions, type
{ type array, items string }
}
]}
}
}
]}
}
}
]}
return schema.parse(out_sch), dat_hnd
outputs
- destination
split
bucket_size_bytes 2147483648 # 2 GB
sort
num_partition_keys 5
key_names
- tenant_id
- entity_type
- entity_domain
- batch_id
- entity_id_bucket
project
'.'
field_names [tenant_id, entity_type, entity_domain, entity_id]
key
- template '{{index . 0}}'
- template '{{index . 1}}'
- template '{{index . 2}}'
- template '123456' # need to add '{{env BATCH_ID}}' when productionizing
- template '{{bucket (index . 2) 3}}'
capacity
split_stage
in_proc_executor
num_workers 12
collate_stage
in_proc_executor
num_workers 32
sort_stage
in_proc_executor
num_workers 3
merge_plan_stage
in_proc_executor
num_workers 20
merge_do_stage
in_proc_executor
num_workers 20
transform_stage
in_proc_executor
num_workers 30