collate-sort.yaml

############################
# SETUP
# 1) [OPS] set batch_id
# 2) [OPS] set bucket count for sharding
# 3) Set work area, inputs, outputs

# NOTES
# schema for interactions is complete but logic will be completed as needed
############################

work_area 
separate_transform_stage true
# input keys should be among Attributes, EventGroups, Conversions, Views, GeoEvents POIVisit, OtherEvents, Impressions, Clicks, Engagements
inputs
  # - name Clicks
    # locations
      # - folder 
    # capture_partition_keys
      # - global_region    
      # - entity_type
      # - entity_domain
    # key
      # - global_region    
      # - entity_type
      # - entity_domain
      # - entity_id
      # - advertiser_id
  # - name Conversions
    # locations
      # - folder 
    # capture_partition_keys
      # - entity_type
      # - entity_domain
    # key
      # - entity_type
      # - entity_domain
      # - entity_id
      # - advertiser_id
    # transform
    # - rename_schema
        # http_schema http_schema_conversions
        # top_level top_level_conversions             
        
transform
- python
    code 
      import schema
      import time
      import hashlib

      invalid_ids = set(['NA'])
      current_ts = int(time.time()  1000)
      
      def isValid(cc)
          return cc is not None and cc != '' and cc != 'NA'
      
      def non_empty(list)
          return len(list)  0
      
      def sortByTimestamp(evt)
          evt.sort(key=lambda x x['event_timestamp'])
      
      def set_event_group(evt, result, retain_event_id  bool)
          event_groups = result['EventGroups']
          event_group = {}
          event_group['event_id'] = ''

          # these fields will be calculated at the end of collate-merge
          event_group['event_timestamp'] = 0
          event_group['first_event_id'] = ''
          event_group['last_timestamp'] = 0 
          event_group['events_count'] = 0 

          event_group['source'] = evt['source']
          event_group['browser'] = evt['browser']
          event_group['browser_version'] = evt['browser_version']
          event_group['device_type'] = evt['device_type']
          event_group['hashed_ip'] = evt['hashed_ip']
          event_group['network'] = evt['network']
          event_group['network_type'] = evt['network_type']
          event_group['os'] = evt['os']
          event_group['os_version'] = evt['os_version']
          event_group['throughput'] = evt['throughput']
          event_group['user_agent'] = evt['user_agent']
          event_group['area_code'] = evt['area_code']
          event_group['city'] = evt['city']
          event_group['country_code'] = evt['country_code']
          event_group['dma'] = evt['dma']
          event_group['fips'] = evt['fips']
          event_group['global_region'] = evt['global_region']          
          event_group['latitude'] = evt['latitude']
          event_group['longitude'] = evt['longitude']
          event_group['msa'] = evt['msa']
          event_group['region_code'] = evt['region_code']
          event_group['timezone'] = evt['timezone']
          event_group['zip'] = evt['zip']
          #event_policy = evt['policy']
          event_group['policy'] = [evt['policy']]
          event_group['other_geo'] = dict(sorted(evt['other_geo'].items()))
          if retain_event_id
            event_group['event_id'] = evt['event_id']
          else 
            event_group['event_id'] = hashlib.sha256(str(event_group.items()).encode()).hexdigest()
          evt['event_group_id'] = event_group['event_id']
          if event_group not in event_groups
              result['EventGroups'].append(event_group)    
      
      def update_events(rec, result, key, advertiser_id)
          result[key] = []
          input_events = rec.get(key, [])
          for input_event in input_events
              set_event_group(input_event, result, 0)
              input_event['advertiser_id'] = advertiser_id
              input_event['policy'] = []
              result[key].append(input_event)
          sortByTimestamp(result[key])     
      
      def get_clicks_dict(rec, result, subevents_dict, advertiser_id)
          invalid_clicks = []
          clicks = rec.get('clicks', [])
          for clk in clicks
              set_event_group(clk, result, 0)
              clk['advertiser_id'] = advertiser_id
              imp_id = clk['parent_event_id']
              clk_id = clk['event_id']
              orhan_imp = {}
              orhan_imp = clk
              # orhan_imp['event_type'] = 'imp'
              orhan_imp['event_id'] = imp_id
              orhan_imp['has_click'] = True
              result['has_clicks'] = True
              orhan_imp['has_engagement'] = None
              orhan_imp['viewable'] = None
              orhan_imp['synthetic_event'] = True
      
              clk_subevent = {}
              clk_subevent['event_timestamp'] = orhan_imp['event_timestamp']
              clk_subevent['agency_id'] = orhan_imp['agency_id']
              clk_subevent['event_id'] = clk_id #changed from click_id to event_id
              clk_subevent['page_url'] = orhan_imp['page_url']
              clk_subevent['page_referer_url'] = orhan_imp['page_referer_url']
              clk_subevent['destination_url'] = orhan_imp['destination_url']
              clk_subevent['cpc_paid'] = orhan_imp['cpc_paid']
              clk_subevent['cpc_cost'] = orhan_imp['cpc_cost']
              clk_subevent['other_event_ids'] = orhan_imp['other_event_ids']
              clk_subevent['metrics'] = orhan_imp['metrics']
              clk_subevent['others'] = orhan_imp['others']
      
              orhan_imp['clicks'] = [clk_subevent]
              orhan_imp['engagements'] = []
      
              del orhan_imp['destination_url']
              del orhan_imp['cpc_paid']
              del orhan_imp['cpc_cost']
      
              if imp_id in invalid_ids
                  invalid_clicks.append(orhan_imp)
              else
                  ex_clk = subevents_dict.get(imp_id)
                  if ex_clk is not None
                      ex_clk['clicks'].append(clk_subevent)
                      ex_clk['has_click'] = True
                      ex_clk['has_click_conversion'] = ex_clk['has_click_conversion'] or orhan_imp['has_click_conversion']
                      subevents_dict[imp_id] = ex_clk
                  else
                      subevents_dict[imp_id] = orhan_imp
      
          return subevents_dict, invalid_clicks
      
      def get_engagement_dict(rec, result, subevents_dict, advertiser_id)
          invalid_engagements = []
          engagements = rec.get('engagements', [])
          for eng in engagements
              set_event_group(eng, result, 0)
              eng['advertiser_id'] = advertiser_id
              imp_id = eng['parent_event_id']
              orhan_imp = {}
              orhan_imp = eng
              #orhan_imp['event_type'] = 'impr'
              orhan_imp['event_id'] = imp_id
              orhan_imp['has_click_conversion'] = None
              orhan_imp['has_click'] = None
              orhan_imp['has_engagement'] = True
              result['has_engagements'] = True
              orhan_imp['viewable'] = True
              orhan_imp['synthetic_event'] = True
      
              eng_subevent = {}
              eng_subevent['event_timestamp'] = orhan_imp['event_timestamp']
              #eng_subevent['event_type'] = orhan_imp['event_type']
              eng_subevent['event_subtype_1'] = orhan_imp['event_subtype_1']
              eng_subevent['cpc_paid'] = orhan_imp['cpc_paid']
              eng_subevent['cpc_cost'] = orhan_imp['cpc_cost']
              eng_subevent['cpe_paid'] = orhan_imp['cpe_paid']
              eng_subevent['cpe_cost'] = orhan_imp['cpe_cost']
              eng_subevent['engagement_actions'] = orhan_imp['engagement_actions']
              eng_subevent['metrics'] = orhan_imp['metrics']
              eng_subevent['others'] = orhan_imp['others']
      
              orhan_imp['clicks'] = []
              orhan_imp['engagements'] = [eng_subevent]
      
              del orhan_imp['engagement_actions']
              #del orhan_imp['event_type']
              del orhan_imp['event_subtype_1']
              del orhan_imp['cpc_paid']
              del orhan_imp['cpc_cost']
              del orhan_imp['cpe_paid']
              del orhan_imp['cpe_cost']
      
              if imp_id in invalid_ids
                  invalid_engagements.append(orhan_imp)
              else
                  ex_eng = subevents_dict.get(imp_id)
                  if ex_eng is not None
                      ex_eng['engagements'].append(eng_subevent)
                  else
                      subevents_dict[imp_id] = orhan_imp
      
          return subevents_dict, invalid_engagements
      
      def get_impressions_dict(rec, result, advertiser_id)
          impressions_dict = {}
          impressions = rec.get('Impressions', [])
          for inp_imp in impressions
              inp_imp['advertiser_id'] = advertiser_id
              set_event_group(inp_imp, result, 0)
              imp_id = inp_imp['event_id']
              if imp_id not in impressions_dict
                  inp_imp['clicks'] = []
                  inp_imp['engagements'] = []
                  inp_imp['viewable'] = None
                  impressions_dict[imp_id] = inp_imp
              else
                  impressions_dict[imp_id]['others'].update(inp_imp['others'])
          return impressions_dict
      
      def subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
          result['has_clicks'] = False
          result['has_engagements'] = False
          impressions_dict = get_impressions_dict(rec, result, advertiser_id)
          [subevents_dict, invalid_subevents] = get_engagement_dict(rec, result, {}, advertiser_id)
          [subevents_dict, invalid_clicks] = get_clicks_dict(rec, result, subevents_dict, advertiser_id)
          invalid_subevents.extend(invalid_clicks)
          # insert subevents into parent impression
          impressions = []
          for imp_id in impressions_dict
              impression = impressions_dict[imp_id]
              impression['synthetic_event'] = False
              se = subevents_dict.pop(imp_id, None)
              if se is not None
                  impression['Clicks'].extend(se['Clicks'])
                  impression['Engagements'].extend(se['Engagements'])
                  impression['has_click'] = non_empty(impression['Clicks'])
                  result['has_clicks'] = result['has_clicks'] or impression['has_click']
                  impression['has_engagement'] = non_empty(impression['Engagements'])
                  result['has_engagements'] = result['has_engagements'] or impression['has_engagement']
                  if impression['has_click_conversion'] is None or not impression['has_click_conversion']
                      impression['has_click_conversion'] = se['has_click_conversion']
                  if impression['viewable'] is None or not impression['viewable']
                      impression['viewable'] = se['viewable']
              impressions.append(impression)
          impressions.extend(invalid_subevents)
          impressions.extend(subevents_dict.values())
          sortByTimestamp(impressions)
          result['Impressions'] = impressions
      
      def dat_hnd(rec any) - any
          print(start rec)
          print(rec)
          print(end rec)
          result = {}
          advertiser_id = rec.get('advertiser_id)')
          result['entity_id'] = rec['entity_id']
          result['event_window_start'] = None # handled in collate-merge
          result['event_window_duration'] = '1 Hour'
          #result['has_clicks'] = None
          #result['has_engagements'] = None
          #result['has_conversion'] = None

          result['EventGroups'] = []
          # if event_groups are an input we use that data, 
          # otherwise calculate based on other inputs
          input_event_groups = rec.get('EventGroups', [])
          for input_event_group in input_event_groups 
            set_event_group(input_event, result, 1)
      
          update_events(rec, result, 'Conversions', advertiser_id)
          update_events(rec, result, 'Views', advertiser_id)
          update_events(rec, result, 'GeoEvents', advertiser_id)
          update_events(rec, result, 'POIVisits', advertiser_id)
          update_events(rec, result, 'Attributes', advertiser_id)
          update_events(rec, result, 'Interactions', advertiser_id)
          update_events(rec, result, 'OtherEvents', advertiser_id)
           
          subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
          result['has_impressions'] = non_empty(result['Impressions'])
          result['has_conversions'] = non_empty(result['Conversions'])
          result['has_views'] = non_empty(result['Views'])
          result['has_geo_events'] = non_empty(result['GeoEvents'])
          result['has_poi_visits'] = non_empty(result['POIVisits'])
          result['has_interactions'] = non_empty(result['Interactions'])
          result['has_other_events'] = non_empty(result['OtherEvents'])

          print('start result ')
          print(result)
          print('end result ')
          return [result]

      def schema_handler_v1(sch schema.Schema) - (schema.Schema, any)
        out_sch = 
          { type record, name aqfer_data_subject_collated, fields [
          { name entity_id, type string }, 
          { name event_window_start, type [long, null] },
          { name event_window_duration, type [string, null] },   
          { name has_impressions, type boolean }, 
          { name has_clicks, type boolean },
          { name has_engagements, type boolean },
          { name has_conversions, type boolean },  
          { name has_views, type boolean },   
          { name has_interactions, type boolean }, 
          { name has_geo_events, type boolean },
          { name has_poi_visits, type boolean },
          { name has_other_events, type boolean },         
          { name Attributes, type
              { type array, items
              { type record, name aqfer_attribute, fields [
                  { name confidence, type [int, null] },
                  { name data_type, type string },
                  { name global_region, type string },  
                  { name magnitude, type [int, null] },
                  { name modeled, type boolean },
                  { name method, type boolean },
                  { name name, type string },  
                  { name score, type [int, null] },
                  { name source, type string },
                  { name source_attribute_name, type string },
                  { name source_attribute_value, type string },
                  { name value, type string },
                  { name others, type { type map, values string }}
              ]}
              }
          },
          { name EventGroups, type
              { type array, items
              { type record, name aqfer_event_group, fields [
                  { name event_id, type string },
                  { name event_timestamp, type long },
                  { name first_event_id, type string },
                  { name last_timestamp, type long },                  
                  { name source, type string },
                  { name events_count, type [int, null] },
                  { name browser, type string },
                  { name browser_version, type string },
                  { name device_type, type string },
                  { name hashed_ip, type string },
                  { name network, type string },
                  { name network_type, type string },
                  { name os, type string },
                  { name os_version, type string },
                  { name throughput, type string },
                  { name user_agent, type string },
                  { name area_code, type string },
                  { name city, type string },
                  { name country_code, type string },
                  { name dma, type string },
                  { name fips, type string },
                  { name global_region, type string },
                  { name latitude, type string },
                  { name longitude, type string },
                  { name msa, type string },
                  { name region_code, type string },
                  { name timezone, type string },
                  { name zip, type string },
                  { name policy, type { type array, items
                    { type record, name policy, fields [
                        { name policy_id, type string },
                        { name policy_effective_date, type string },
                        { name effectivity, type string },
                        { name policy_detail, type string }]}}},        
                  { name other_geo, type{ type map, values string }}
              ]}
              }
          },
          { name Conversions, type
              { type array, items
              { type record, name aqfer_conversion, fields [
                  { name event_id, type string },
                  { name event_timestamp, type long },
                  { name event_group_id, type string },
                  { name source, type string },
                  { name event_actions, type{ type array, items string }},
                  { name event_subtype_1, type [ string, null ]},
                  { name event_subtype_2, type [ string, null ]},
                  { name http_status_code, type string },
                  { name page_url, type [ { type record, name url_struct_1, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name page_referer_url, type [ { type record, name url_struct_2, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name synthetic_event, type boolean },
                  { name tag_url, type [ { type record, name url_struct_3, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name other_event_ids, type{ type map, values string }},
                  { name other_entity_keys, type
                  { type array, items
                          { type record, name entity_struct_1, fields [
                          { name entity_type, type [ string, null ] },
                          { name entity_domain, type [ string, null ] },
                          { name entity_id, type [ string, null ] }
                          ]}}},                  
                  { name metrics, type { type map, values string }},
                  { name others, type{ type map, values string }},
                  { name ad_group_id, type string },
                  { name advertiser_id, type string },
                  { name agency_id, type string },
                  { name campaign_id, type string },
                  { name creative_id, type string },
                  { name placement_id, type string },
                  { name ad_url, type [      { type record, name url_struct_4, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type
                          { type map, values string}} ]}, null]},
                  { name other_marketing_program_levels, type{ type map, values string }},
                  { name keywords, type string },
                  { name inventory_partner, type string },
                  { name media_partner, type string },
                  { name search_phase, type string },
                  { name search_terms, type string },
                  { name site_id, type string },
                  { name supply_vendor_publisher_id, type string },
                  { name other_media_group_levels, type{ type map, values string }}
              ]}
              }
          },
          { name Views, type
              { type array, items
              { type record, name aqfer_view, fields [
                  { name event_id, type string }, 
                  { name event_timestamp, type long },
                  { name event_group_id, type string },
                  { name source, type string },
                  { name event_actions, type{ type array, items string }},
                  { name event_subtype_1, type [ string, null ]},
                  { name event_subtype_2, type [ string, null ]},
                  { name http_status_code, type string },
                  { name page_url, type [      { type record, name url_struct_5, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type
                          { type map, values string }}]}, null]},
                  { name page_referer_url, type [      { type record, name url_struct_6, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type
                          { type map, values string }}]}, null]},
                  { name synthetic_event, type boolean },
                  { name tag_url, type [      { type record, name url_struct_7, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type
                          { type map, values string }}]}, null]},
                  { name other_event_ids, type{ type map, values string }},
                  { name other_entity_keys, type
                  { type array, items
                          { type record, name entity_struct_1, fields [
                          { name entity_type, type [ string, null ] },
                          { name entity_domain, type [ string, null ] },
                          { name entity_id, type [ string, null ] }
                          ]}}},   
                  { name metrics, type{ type map, values string }},
                  { name others, type{ type map, values string }},
                  { name advertiser_id, type string },
                  { name agency_id, type string },
                  { name has_interaction, type [ boolean, null ] },
                  { name Interactions, type
                  { type array, items
                  { type record, name aqfer_interaction, fields [
                      { name event_id, type string }, 
                      { name event_timestamp, type long },
                      { name event_group_id, type string },
                      { name source, type string },
                      { name event_actions, type{ type array, items string }},
                      { name event_subtype_1, type [ string, null ]},
                      { name event_subtype_2, type [ string, null ]},
                      { name http_status_code, type string },
                      { name page_url, type [      { type record, name url_struct_5, fields [
                              { name scheme, type [ string, null ] },
                              { name host, type [ string, null ] },
                              { name path, type [ string, null ] },
                              { name query, type
                              { type map, values string }}]}, null]},
                      { name page_referer_url, type [      { type record, name url_struct_6, fields [
                              { name scheme, type [ string, null ] },
                              { name host, type [ string, null ] },
                              { name path, type [ string, null ] },
                              { name query, type
                              { type map, values string }}]}, null]},
                      { name synthetic_event, type boolean },
                      { name tag_url, type [      { type record, name url_struct_7, fields [
                              { name scheme, type [ string, null ] },
                              { name host, type [ string, null ] },
                              { name path, type [ string, null ] },
                              { name query, type
                              { type map, values string }}]}, null]},
                      { name other_event_ids, type{ type map, values string }},
                      { name other_entity_keys, type
                      { type array, items
                              { type record, name entity_struct_1, fields [
                              { name entity_type, type [ string, null ] },
                              { name entity_domain, type [ string, null ] },
                              { name entity_id, type [ string, null ] }
                              ]}}},   
                      { name metrics, type{ type map, values string }},
                      { name others, type{ type map, values string }},
                      { name advertiser_id, type string },
                      { name agency_id, type string }
                  ]}
              }
          }
              ]}
              }
          },          
          { name GeoEvents, type
              { type array, items
              { type record, name aqfer_geo_event, fields [
                  { name event_id, type string }, 
                  { name event_timestamp, type long },
                  { name event_group_id, type string },
                  { name source, type string },
                  { name event_actions, type{ type array, items string }},
                  { name event_subtype_1, type [ string, null ]},
                  { name event_subtype_2, type [ string, null ]},
                  { name http_status_code, type string },
                  { name other_event_ids, type{ type map, values string }},
                  { name other_entity_keys, type
                  { type array, items
                          { type record, name entity_struct_1, fields [
                          { name entity_type, type [ string, null ] },
                          { name entity_domain, type [ string, null ] },
                          { name entity_id, type [ string, null ] }
                          ]}}},
                  { name metrics, type{ type map, values string }},
                  { name others, type{ type map, values string }},
                  { name horizontal_accuracy, type [int, null] },
                  { name device_id_type, type string }
              ]}
              }
          },    
          { name POIVisits, type
              { type array, items
              { type record, name aqfer_poi_visit, fields [
                  { name event_id, type string }, 
                  { name event_timestamp, type long },
                  { name event_group_id, type string },
                  { name source, type string },
                  { name event_actions, type{ type array, items string }},
                  { name event_subtype_1, type [ string, null ]},
                  { name event_subtype_2, type [ string, null ]},
                  { name http_status_code, type string },
                  { name other_event_ids, type{ type map, values string }},
                  { name other_entity_keys, type
                  { type array, items
                          { type record, name entity_struct_1, fields [
                          { name entity_type, type [ string, null ] },
                          { name entity_domain, type [ string, null ] },
                          { name entity_id, type [ string, null ] }
                          ]}}},   
                  { name metrics, type{ type map, values string }},
                  { name others, type{ type map, values string }},
                  { name poi_id, type string },
                  { name minimum_duration, type [int, null] },
                  { name device_id_type, type string }
              ]}
              }
          },      
          { name OtherEvents, type
              { type array, items
              { type record, name aqfer_event, fields [
                  { name event_id, type string },
                  { name event_timestamp, type long },
                  { name event_group_id, type string },
                  { name source, type string },
                  { name event_actions, type{ type array, items string }},
                  { name event_subtype_1, type [ string, null ]},
                  { name event_subtype_2, type [ string, null ]},
                  { name http_status_code, type string },
                  { name page_url, type [ { type record, name url_struct_8, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name page_referer_url, type [ { type record, name url_struct_2, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name synthetic_event, type boolean },
                  { name tag_url, type [ { type record, name url_struct_9, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name other_event_ids, type{ type map, values string }},
                  { name other_entity_keys, type
                  { type array, items
                          { type record, name entity_struct_1, fields [
                          { name entity_type, type [ string, null ] },
                          { name entity_domain, type [ string, null ] },
                          { name entity_id, type [ string, null ] }
                          ]}}},                  
                  { name metrics, type { type map, values string }},
                  { name others, type{ type map, values string }},
                  { name ad_group_id, type string },
                  { name advertiser_id, type string },
                  { name agency_id, type string },
                  { name campaign_id, type string },
                  { name creative_id, type string },
                  { name placement_id, type string },
                  { name ad_url, type [      { type record, name url_struct_4, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type
                          { type map, values string}} ]}, null]},
                  { name other_marketing_program_levels, type{ type map, values string }}
              ]}
              }
          },                     
          { name Impressions, type
              { type array, items
              { type record, name aqfer_impression, fields [
                  { name event_id, type string },
                  { name event_timestamp, type long },
                  { name event_group_id, type string },
                  { name source, type string },
                  { name event_actions, type{ type array, items string }},
                  { name event_subtype_1, type [ string, null ]},
                  { name event_subtype_2, type [ string, null ]},
                  { name http_status_code, type string },
                  { name page_url, type [ { type record, name url_struct_10, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name page_referer_url, type [ { type record, name url_struct_9, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name synthetic_event, type boolean },
                  { name tag_url, type [ { type record, name url_struct_11, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type { type map, values string }}]}, null]},
                  { name other_event_ids, type{ type map, values string }},
                  { name other_entity_keys, type
                  { type array, items
                          { type record, name entity_struct_1, fields [
                          { name entity_type, type [ string, null ] },
                          { name entity_domain, type [ string, null ] },
                          { name entity_id, type [ string, null ] }
                          ]}}},                  
                  { name metrics, type { type map, values string }},
                  { name others, type{ type map, values string }},
                  { name ad_group_id, type string },
                  { name advertiser_id, type string },
                  { name agency_id, type string },
                  { name campaign_id, type string },
                  { name creative_id, type string },
                  { name placement_id, type string },
                  { name ad_url, type [      { type record, name url_struct_12, fields [
                          { name scheme, type [ string, null ] },
                          { name host, type [ string, null ] },
                          { name path, type [ string, null ] },
                          { name query, type
                          { type map, values string}} ]}, null]},
                  { name other_marketing_program_levels, type{ type map, values string }},
                  { name keywords, type string },
                  { name inventory_partner, type string },
                  { name media_partner, type string },
                  { name search_phase, type string },
                  { name search_terms, type string },
                  { name site_id, type string },
                  { name supply_vendor_publisher_id, type string },
                  { name other_media_group_levels, type{ type map, values string }},
                  { name cpm_cost, type int },
                  { name cpm_currency, type [ string, null ] },
                  { name cpm_paid, type int },
                  { name has_click_conversion, type [ boolean, null ] },
                  { name has_click, type [ boolean, null ] },
                  { name has_engagement, type [ boolean, null ] },
                  { name viewable, type [ boolean, null ] },
              { name Clicks, type
                  { type array, items
                          { type record, name aqfer_click, fields [
                          { name event_id, type string },
                          { name event_actions, type{ type array, items string }},
                          { name event_subtype_1, type [ string, null ]},
                          { name event_subtype_2, type [ string, null ]},
                          { name http_status_code, type string },
                          { name other_event_ids, type{ type map, values string }},
                          { name tag_url, type [          { type record, name url_struct_13, fields [
                                      { name scheme, type [ string, null ] },
                                      { name host, type [ string, null ] },
                                      { name path, type [ string, null ] },
                                      { name query, type
                                          { type map, values string }}]}, null]},                                                  
                          { name others, type{ type map, values string }},                         
                          { name metrics, type{ type map, values string }},                          
                          { name cpc_paid, type [ int, null ] },
                          { name cpc_cost, type [ int, null ] },
                          { name video_pct, type [ int, null ] },
                          { name destination_url, type [          { type record, name url_struct_14, fields [
                                      { name scheme, type [ string, null ] },
                                      { name host, type [ string, null ] },
                                      { name path, type [ string, null ] },
                                      { name query, type
                                          { type map, values string }}]}, null]}
                          ]}
                  }
                  },
                  { name Engagements, type
                  { type array, items
                          { type record, name aqfer_engagement, fields [
                          { name event_id, type string },
                          { name event_actions, type{ type array, items string }},
                          { name event_subtype_1, type [ string, null ]},
                          { name event_subtype_2, type [ string, null ]},
                          { name http_status_code, type string },
                          { name other_event_ids, type{ type map, values string }},
                           { name tag_url, type [          { type record, name url_struct_15, fields [
                                      { name scheme, type [ string, null ] },
                                      { name host, type [ string, null ] },
                                      { name path, type [ string, null ] },
                                      { name query, type
                                          { type map, values string }}]}, null]},                                                  
                          { name others, type{ type map, values string }},                         
                          { name metrics, type{ type map, values string }},                          
                          { name cpe_paid, type [ int, null ] },
                          { name cpe_cost, type [ int, null ] },
                          { name engagement_actions, type
                              { type array, items string }
                          }
                          ]}
                  }
                  }                  
              ]}
              }
          }
          ]}
        
        return schema.parse(out_sch), dat_hnd

outputs
  - destination 
split
  bucket_size_bytes 2147483648 # 2 GB
sort
  num_partition_keys 5
  key_names
    - tenant_id
    - entity_type
    - entity_domain
    - batch_id
    - entity_id_bucket
  project
    '.'
      field_names [tenant_id, entity_type, entity_domain, entity_id]
  key
    - template '{{index . 0}}'
    - template '{{index . 1}}'
    - template '{{index . 2}}'
    - template '123456' # need to add '{{env BATCH_ID}}' when productionizing
    - template '{{bucket (index . 2) 3}}'

capacity
  split_stage
    in_proc_executor
      num_workers 12
  collate_stage
    in_proc_executor
      num_workers 32
  sort_stage
    in_proc_executor
      num_workers 3
  merge_plan_stage
    in_proc_executor
      num_workers 20
  merge_do_stage
    in_proc_executor
      num_workers 20
  transform_stage
    in_proc_executor
      num_workers 30
  • Table of contents

Was this article helpful?