collate-sort.yaml

############################
# SETUP
# 1) [OPS] set batch_id
# 2) [OPS] set bucket count for sharding
# 3) Set work area, inputs, outputs

# NOTES
# schema for interactions is complete but logic will be completed as needed
############################

work_area: ''
separate_transform_stage: true
# input keys should be among Attributes, EventGroups, Conversions, Views, GeoEvents POIVisit, OtherEvents, Impressions, Clicks, Engagements
inputs:
  # - name Clicks
    # locations
      # - folder 
    # capture_partition_keys
      # - global_region    
      # - entity_type
      # - entity_domain
    # key
      # - global_region    
      # - entity_type
      # - entity_domain
      # - entity_id
      # - advertiser_id
  # - name Conversions
    # locations
      # - folder 
    # capture_partition_keys
      # - entity_type
      # - entity_domain
    # key
      # - entity_type
      # - entity_domain
      # - entity_id
      # - advertiser_id
    # transform
    # - rename_schema
        # http_schema http_schema_conversions
        # top_level top_level_conversions             
        
transform:
- python:
    code: |
      import schema
      import time
      import hashlib

      invalid_ids = set(['NA'])
      current_ts = int(time.time()  1000)
      
      def isValid(cc):
          return cc is not None and cc != '' and cc != 'NA'
      
      def non_empty(list):
          return len(list)  0
      
      def sortByTimestamp(evt):
          evt.sort(key=lambda x x['event_timestamp'])
      
      def set_event_group(evt, result, retain_event_id  bool):
          event_groups = result['EventGroups']
          event_group = {}
          event_group['event_id'] = ''

          # these fields will be calculated at the end of collate-merge
          event_group['event_timestamp'] = 0
          event_group['first_event_id'] = ''
          event_group['last_timestamp'] = 0 
          event_group['events_count'] = 0 

          event_group['source'] = evt['source']
          event_group['browser'] = evt['browser']
          event_group['browser_version'] = evt['browser_version']
          event_group['device_type'] = evt['device_type']
          event_group['hashed_ip'] = evt['hashed_ip']
          event_group['network'] = evt['network']
          event_group['network_type'] = evt['network_type']
          event_group['os'] = evt['os']
          event_group['os_version'] = evt['os_version']
          event_group['throughput'] = evt['throughput']
          event_group['user_agent'] = evt['user_agent']
          event_group['area_code'] = evt['area_code']
          event_group['city'] = evt['city']
          event_group['country_code'] = evt['country_code']
          event_group['dma'] = evt['dma']
          event_group['fips'] = evt['fips']
          event_group['global_region'] = evt['global_region']          
          event_group['latitude'] = evt['latitude']
          event_group['longitude'] = evt['longitude']
          event_group['msa'] = evt['msa']
          event_group['region_code'] = evt['region_code']
          event_group['timezone'] = evt['timezone']
          event_group['zip'] = evt['zip']
          #event_policy = evt['policy']
          event_group['policy'] = [evt['policy']]
          event_group['other_geo'] = dict(sorted(evt['other_geo'].items()))
          if retain_event_id:
            event_group['event_id'] = evt['event_id']
          else:
            event_group['event_id'] = hashlib.sha256(str(event_group.items()).encode()).hexdigest()
          evt['event_group_id'] = event_group['event_id']
          if event_group not in event_groups:
              result['EventGroups'].append(event_group)    
      
      def update_events(rec, result, key, advertiser_id):
          result[key] = []
          input_events = rec.get(key, [])
          for input_event in input_events
              set_event_group(input_event, result, 0)
              input_event['advertiser_id'] = advertiser_id
              input_event['policy'] = []
              result[key].append(input_event)
          sortByTimestamp(result[key])     
      
      def get_clicks_dict(rec, result, subevents_dict, advertiser_id):
          invalid_clicks = []
          clicks = rec.get('clicks', [])
          for clk in clicks:
              set_event_group(clk, result, 0)
              clk['advertiser_id'] = advertiser_id
              imp_id = clk['parent_event_id']
              clk_id = clk['event_id']
              orhan_imp = {}
              orhan_imp = clk
              # orhan_imp['event_type'] = 'imp'
              orhan_imp['event_id'] = imp_id
              orhan_imp['has_click'] = True
              result['has_clicks'] = True
              orhan_imp['has_engagement'] = None
              orhan_imp['viewable'] = None
              orhan_imp['synthetic_event'] = True
      
              clk_subevent = {}
              clk_subevent['event_timestamp'] = orhan_imp['event_timestamp']
              clk_subevent['agency_id'] = orhan_imp['agency_id']
              clk_subevent['event_id'] = clk_id #changed from click_id to event_id
              clk_subevent['page_url'] = orhan_imp['page_url']
              clk_subevent['page_referer_url'] = orhan_imp['page_referer_url']
              clk_subevent['destination_url'] = orhan_imp['destination_url']
              clk_subevent['cpc_paid'] = orhan_imp['cpc_paid']
              clk_subevent['cpc_cost'] = orhan_imp['cpc_cost']
              clk_subevent['other_event_ids'] = orhan_imp['other_event_ids']
              clk_subevent['metrics'] = orhan_imp['metrics']
              clk_subevent['others'] = orhan_imp['others']
      
              orhan_imp['clicks'] = [clk_subevent]
              orhan_imp['engagements'] = []
      
              del orhan_imp['destination_url']
              del orhan_imp['cpc_paid']
              del orhan_imp['cpc_cost']
      
              if imp_id in invalid_ids:
                  invalid_clicks.append(orhan_imp)
              else:
                  ex_clk = subevents_dict.get(imp_id)
                  if ex_clk is not None:
                      ex_clk['clicks'].append(clk_subevent)
                      ex_clk['has_click'] = True
                      ex_clk['has_click_conversion'] = ex_clk['has_click_conversion'] or orhan_imp['has_click_conversion']
                      subevents_dict[imp_id] = ex_clk
                  else:
                      subevents_dict[imp_id] = orhan_imp
      
          return subevents_dict, invalid_clicks
      
      def get_engagement_dict(rec, result, subevents_dict, advertiser_id):
          invalid_engagements = []
          engagements = rec.get('engagements', [])
          for eng in engagements:
              set_event_group(eng, result, 0)
              eng['advertiser_id'] = advertiser_id
              imp_id = eng['parent_event_id']
              orhan_imp = {}
              orhan_imp = eng
              #orhan_imp['event_type'] = 'impr'
              orhan_imp['event_id'] = imp_id
              orhan_imp['has_click_conversion'] = None
              orhan_imp['has_click'] = None
              orhan_imp['has_engagement'] = True
              result['has_engagements'] = True
              orhan_imp['viewable'] = True
              orhan_imp['synthetic_event'] = True
      
              eng_subevent = {}
              eng_subevent['event_timestamp'] = orhan_imp['event_timestamp']
              #eng_subevent['event_type'] = orhan_imp['event_type']
              eng_subevent['event_subtype_1'] = orhan_imp['event_subtype_1']
              eng_subevent['cpc_paid'] = orhan_imp['cpc_paid']
              eng_subevent['cpc_cost'] = orhan_imp['cpc_cost']
              eng_subevent['cpe_paid'] = orhan_imp['cpe_paid']
              eng_subevent['cpe_cost'] = orhan_imp['cpe_cost']
              eng_subevent['engagement_actions'] = orhan_imp['engagement_actions']
              eng_subevent['metrics'] = orhan_imp['metrics']
              eng_subevent['others'] = orhan_imp['others']
      
              orhan_imp['clicks'] = []
              orhan_imp['engagements'] = [eng_subevent]
      
              del orhan_imp['engagement_actions']
              #del orhan_imp['event_type']
              del orhan_imp['event_subtype_1']
              del orhan_imp['cpc_paid']
              del orhan_imp['cpc_cost']
              del orhan_imp['cpe_paid']
              del orhan_imp['cpe_cost']
      
              if imp_id in invalid_ids:
                  invalid_engagements.append(orhan_imp)
              else:
                  ex_eng = subevents_dict.get(imp_id)
                  if ex_eng is not None:
                      ex_eng['engagements'].append(eng_subevent)
                  else:
                      subevents_dict[imp_id] = orhan_imp
      
          return subevents_dict, invalid_engagements
      
      def get_impressions_dict(rec, result, advertiser_id):
          impressions_dict = {}
          impressions = rec.get('Impressions', [])
          for inp_imp in impressions:
              inp_imp['advertiser_id'] = advertiser_id
              set_event_group(inp_imp, result, 0)
              imp_id = inp_imp['event_id']
              if imp_id not in impressions_dict:
                  inp_imp['clicks'] = []
                  inp_imp['engagements'] = []
                  inp_imp['viewable'] = None
                  impressions_dict[imp_id] = inp_imp
              else:
                  impressions_dict[imp_id]['others'].update(inp_imp['others'])
          return impressions_dict
      
      def subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id):
          result['has_clicks'] = False
          result['has_engagements'] = False
          impressions_dict = get_impressions_dict(rec, result, advertiser_id)
          [subevents_dict, invalid_subevents] = get_engagement_dict(rec, result, {}, advertiser_id)
          [subevents_dict, invalid_clicks] = get_clicks_dict(rec, result, subevents_dict, advertiser_id)
          invalid_subevents.extend(invalid_clicks)
          # insert subevents into parent impression
          impressions = []
          for imp_id in impressions_dict:
              impression = impressions_dict[imp_id]
              impression['synthetic_event'] = False
              se = subevents_dict.pop(imp_id, None)
              if se is not None:
                  impression['Clicks'].extend(se['Clicks'])
                  impression['Engagements'].extend(se['Engagements'])
                  impression['has_click'] = non_empty(impression['Clicks'])
                  result['has_clicks'] = result['has_clicks'] or impression['has_click']
                  impression['has_engagement'] = non_empty(impression['Engagements'])
                  result['has_engagements'] = result['has_engagements'] or impression['has_engagement']
                  if impression['has_click_conversion'] is None or not impression['has_click_conversion']:
                      impression['has_click_conversion'] = se['has_click_conversion']
                  if impression['viewable'] is None or not impression['viewable']:
                      impression['viewable'] = se['viewable']
              impressions.append(impression)
          impressions.extend(invalid_subevents)
          impressions.extend(subevents_dict.values())
          sortByTimestamp(impressions)
          result['Impressions'] = impressions
      
      def dat_hnd(rec any) -> any:
          #print(start rec)
          #print(rec)
          #print(end rec)
          result = {}
          advertiser_id = rec.get('advertiser_id)')
          result['entity_id'] = rec['entity_id']
          result['event_window_start'] = None # handled in collate-merge
          result['event_window_duration'] = '1 Hour'
          #result['has_clicks'] = None
          #result['has_engagements'] = None
          #result['has_conversion'] = None

          result['EventGroups'] = []
          # if event_groups are an input we use that data, 
          # otherwise calculate based on other inputs
          input_event_groups = rec.get('EventGroups', [])
          for input_event_group in input_event_groups: 
            set_event_group(input_event, result, 1)
      
          update_events(rec, result, 'Conversions', advertiser_id)
          update_events(rec, result, 'Views', advertiser_id)
          update_events(rec, result, 'GeoEvents', advertiser_id)
          update_events(rec, result, 'POIVisits', advertiser_id)
          update_events(rec, result, 'Attributes', advertiser_id)
          update_events(rec, result, 'Interactions', advertiser_id)
          update_events(rec, result, 'OtherEvents', advertiser_id)
           
          subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
          result['has_impressions'] = non_empty(result['Impressions'])
          result['has_conversions'] = non_empty(result['Conversions'])
          result['has_views'] = non_empty(result['Views'])
          result['has_geo_events'] = non_empty(result['GeoEvents'])
          result['has_poi_visits'] = non_empty(result['POIVisits'])
          result['has_interactions'] = non_empty(result['Interactions'])
          result['has_other_events'] = non_empty(result['OtherEvents'])

          print('start result ')
          print(result)
          print('end result ')
          return [result]

      def schema_handler_v1(sch schema.Schema) -> (schema.Schema, any):
        out_sch =  """
          { "type": "record", "name": "aqfer_data_subject_collated", "fields": [
          { "name": "entity_id", "type": "string" }, 
          { "name": "event_window_start", "type": ["long", "null"] },
          { "name": "event_window_duration", "type": ["string", "null"] },   
          { "name": "has_impressions", "type": "boolean" }, 
          { "name": "has_clicks", "type": "boolean" },
          { "name": "has_engagements", "type": "boolean" },
          { "name": "has_conversions", "type": "boolean" },  
          { "name": "has_views", "type": "boolean" },   
          { "name": "has_interactions", "type": "boolean" }, 
          { "name": "has_geo_events", "type": "boolean" },
          { "name": "has_poi_visits", "type": "boolean" },
          { "name": "has_other_events", "type": "boolean" },         
          { "name": "Attributes", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_attribute", "fields": [
                  { "name": "confidence", "type": ["int", "null"] },
                  { "name": "data_type", "type": "string" },
                  { "name": "global_region", "type": "string" },  
                  { "name": "magnitude", "type": ["int", "null"] },
                  { "name": "modeled", "type": "boolean" },
                  { "name": "method", "type": "boolean" },
                  { "name": "name", "type": "string" },  
                  { "name": "score", "type": ["int", "null"] },
                  { "name": "source", "type": "string" },
                  { "name": "source_attribute_name", "type": "string" },
                  { "name": "source_attribute_value", "type": "string" },
                  { "name": "value", "type": "string" },
                  { "name": "others", "type": { "type": "map", "values": "string" }}
              ]}
              }
          },
          { "name": "EventGroups", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_event_group", "fields": [
                  { "name": "event_id", "type": "string" },
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "first_event_id", "type": "string" },
                  { "name": "last_timestamp", "type": "long" },                  
                  { "name": "source", "type": "string" },
                  { "name": "events_count", "type": ["int", "null"] },
                  { "name": "browser", "type": "string" },
                  { "name": "browser_version", "type": "string" },
                  { "name": "device_type", "type": "string" },
                  { "name": "hashed_ip", "type": "string" },
                  { "name": "network", "type": "string" },
                  { "name": "network_type", "type": "string" },
                  { "name": "os", "type": "string" },
                  { "name": "os_version", "type": "string" },
                  { "name": "throughput", "type": "string" },
                  { "name": "user_agent", "type": "string" },
                  { "name": "area_code", "type": "string" },
                  { "name": "city", "type": "string" },
                  { "name": "country_code", "type": "string" },
                  { "name": "dma", "type": "string" },
                  { "name": "fips", "type": "string" },
                  { "name": "global_region", "type": "string" },
                  { "name": "latitude", "type": "string" },
                  { "name": "longitude", "type": "string" },
                  { "name": "msa", "type": "string" },
                  { "name": "region_code", "type": "string" },
                  { "name": "timezone", "type": "string" },
                  { "name": "zip", "type": "string" },
                  { "name": "policy", "type": { "type": "array", "items":
                    { "type": "record", "name": "policy", "fields": [
                        { "name": "policy_id", "type": "string" },
                        { "name": "policy_effective_date", "type": "string" },
                        { "name": "effectivity", "type": "string" },
                        { "name": "policy_detail", "type": "string" }]}}},        
                  { "name": "other_geo", "type":{ "type": "map", "values": "string" }}
              ]}
              }
          },
          { "name": "Conversions", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_conversion", "fields": [
                  { "name": "event_id", "type": "string" },
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_group_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                  { "name": "event_subtype_1", "type": [ "string", "null" ]},
                  { "name": "event_subtype_2", "type": [ "string", "null" ]},
                  { "name": "http_status_code", "type": "string" },
                  { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_1", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_2", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "synthetic_event", "type": "boolean" },
                  { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_3", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                  { "name": "other_entity_keys", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}}},                  
                  { "name": "metrics", "type": { "type": "map", "values": "string" }},
                  { "name": "others", "type":{ "type": "map", "values": "string" }},
                  { "name": "ad_group_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "campaign_id", "type": "string" },
                  { "name": "creative_id", "type": "string" },
                  { "name": "placement_id", "type": "string" },
                  { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_4", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string"}} ]}, "null"]},
                  { "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }},
                  { "name": "keywords", "type": "string" },
                  { "name": "inventory_partner", "type": "string" },
                  { "name": "media_partner", "type": "string" },
                  { "name": "search_phase", "type": "string" },
                  { "name": "search_terms", "type": "string" },
                  { "name": "site_id", "type": "string" },
                  { "name": "supply_vendor_publisher_id", "type": "string" },
                  { "name": "other_media_group_levels", "type":{ "type": "map", "values": "string" }}
              ]}
              }
          },
          { "name": "Views", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_view", "fields": [
                  { "name": "event_id", "type": "string" }, 
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_group_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                  { "name": "event_subtype_1", "type": [ "string", "null" ]},
                  { "name": "event_subtype_2", "type": [ "string", "null" ]},
                  { "name": "http_status_code", "type": "string" },
                  { "name": "page_url", "type": [{ "type": "record", "name": "url_struct_5", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_6", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "synthetic_event", "type": "boolean" },
                  { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_7", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                  { "name": "other_entity_keys", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}}},   
                  { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                  { "name": "others", "type":{ "type": "map", "values": "string" }},
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "has_interaction", "type": [ "boolean", "null" ] },
                  { "name": "Interactions", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_interaction", "fields": [
                      { "name": "event_id", "type": "string" }, 
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "event_group_id", "type": "string" },
                      { "name": "source", "type": "string" },
                      { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                      { "name": "event_subtype_1", "type": [ "string", "null" ]},
                      { "name": "event_subtype_2", "type": [ "string", "null" ]},
                      { "name": "http_status_code", "type": "string" },
                      { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_5", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }}]}, "null"]},
                      { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_6", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }}]}, "null"]},
                      { "name": "synthetic_event", "type": "boolean" },
                      { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_7", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }}]}, "null"]},
                      { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                      { "name": "other_entity_keys", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "entity_struct_1", "fields": [
                              { "name": "entity_type", "type": [ "string", "null" ] },
                              { "name": "entity_domain", "type": [ "string", "null" ] },
                              { "name": "entity_id", "type": [ "string", "null" ] }
                              ]}}},   
                      { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                      { "name": "others", "type":{ "type": "map", "values": "string" }},
                      { "name": "advertiser_id", "type": "string" },
                      { "name": "agency_id", "type": "string" }
                  ]}
              }
          }
              ]}
              }
          },          
          { "name": "GeoEvents", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_geo_event", "fields": [
                  { "name": "event_id", "type": "string" }, 
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_group_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                  { "name": "event_subtype_1", "type": [ "string", "null" ]},
                  { "name": "event_subtype_2", "type": [ "string", "null" ]},
                  { "name": "http_status_code", "type": "string" },
                  { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                  { "name": "other_entity_keys", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}}},
                  { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                  { "name": "others", "type":{ "type": "map", "values": "string" }},
                  { "name": "horizontal_accuracy", "type": ["int", "null"] },
                  { "name": "device_id_type", "type": "string" }
              ]}
              }
          },    
          { "name": "POIVisits", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_poi_visit", "fields": [
                  { "name": "event_id", "type": "string" }, 
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_group_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                  { "name": "event_subtype_1", "type": [ "string", "null" ]},
                  { "name": "event_subtype_2", "type": [ "string", "null" ]},
                  { "name": "http_status_code", "type": "string" },
                  { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                  { "name": "other_entity_keys", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}}},   
                  { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                  { "name": "others", "type":{ "type": "map", "values": "string" }},
                  { "name": "poi_id", "type": "string" },
                  { "name": "minimum_duration", "type": ["int", "null"] },
                  { "name": "device_id_type", "type": "string" }
              ]}
              }
          },      
          { "name": "OtherEvents", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_event", "fields": [
                  { "name": "event_id", "type": "string" },
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_group_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                  { "name": "event_subtype_1", "type": [ "string", "null" ]},
                  { "name": "event_subtype_2", "type": [ "string", "null" ]},
                  { "name": "http_status_code", "type": "string" },
                  { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_8", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_2", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "synthetic_event", "type": "boolean" },
                  { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_9", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                  { "name": "other_entity_keys", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}}},                  
                  { "name": "metrics", "type": { "type": "map", "values": "string" }},
                  { "name": "others", "type":{ "type": "map", "values": "string" }},
                  { "name": "ad_group_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "campaign_id", "type": "string" },
                  { "name": "creative_id", "type": "string" },
                  { "name": "placement_id", "type": "string" },
                  { "name": "ad_url", "type": [ { "type": "record", "name": "url_struct_4", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string"}} ]}, "null"]},
                  { "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }}
              ]}
              }
          },                     
          { "name": "Impressions", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_impression", "fields": [
                  { "name": "event_id", "type": "string" },
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_group_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                  { "name": "event_subtype_1", "type": [ "string", "null" ]},
                  { "name": "event_subtype_2", "type": [ "string", "null" ]},
                  { "name": "http_status_code", "type": "string" },
                  { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_10", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_9", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "synthetic_event", "type": "boolean" },
                  { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_11", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                  { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                  { "name": "other_entity_keys", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}}},                  
                  { "name": "metrics", "type": { "type": "map", "values": "string" }},
                  { "name": "others", "type":{ "type": "map", "values": "string" }},
                  { "name": "ad_group_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "campaign_id", "type": "string" },
                  { "name": "creative_id", "type": "string" },
                  { "name": "placement_id", "type": "string" },
                  { "name": "ad_url", "type": [ { "type": "record", "name": "url_struct_12", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string"}} ]}, "null"]},
                  { "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }},
                  { "name": "keywords", "type": "string" },
                  { "name": "inventory_partner", "type": "string" },
                  { "name": "media_partner", "type": "string" },
                  { "name": "search_phase", "type": "string" },
                  { "name": "search_terms", "type": "string" },
                  { "name": "site_id", "type": "string" },
                  { "name": "supply_vendor_publisher_id", "type": "string" },
                  { "name": "other_media_group_levels", "type":{ "type": "map", "values": "string" }},
                  { "name": "cpm_cost", "type": "int" },
                  { "name": "cpm_currency", "type": [ "string", "null" ] },
                  { "name": "cpm_paid", "type": "int" },
                  { "name": "has_click_conversion", "type": [ "boolean", "null" ] },
                  { "name": "has_click", "type": [ "boolean", "null" ] },
                  { "name": "has_engagement", "type": [ "boolean", "null" ] },
                  { "name": "viewable", "type": [ "boolean", "null" ] },
              { "name": "Clicks", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "aqfer_click", "fields": [
                          { "name": "event_id", "type": "string" },
                          { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                          { "name": "event_subtype_1", "type": [ "string", "null" ]},
                          { "name": "event_subtype_2", "type": [ "string", "null" ]},
                          { "name": "http_status_code", "type": "string" },
                          { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                          { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_13", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }}]}, "null"]},                                                  
                          { "name": "others", "type":{ "type": "map", "values": "string" }},                         
                          { "name": "metrics", "type":{ "type": "map", "values": "string" }},                          
                          { "name": "cpc_paid", "type": [ "int", "null" ] },
                          { "name": "cpc_cost", "type": [ "int", "null" ] },
                          { "name": "video_pct", "type": [ "int", "null" ] },
                          { "name": "destination_url", "type": [ { "type": "record", "name": "url_struct_14", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }}]}, "null"]}
                          ]}
                  }
                  },
                  { "name": "Engagements", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "aqfer_engagement", "fields": [
                          { "name": "event_id", "type": "string" },
                          { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                          { "name": "event_subtype_1", "type": [ "string", "null" ]},
                          { "name": "event_subtype_2", "type": [ "string", "null" ]},
                          { "name": "http_status_code", "type": "string" },
                          { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                          { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_15", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }}]}, "null"]},                                                  
                          { "name": "others", "type":{ "type": "map", "values": "string" }},                         
                          { "name": "metrics", "type":{ "type": "map", "values": "string" }},                          
                          { "name": "cpe_paid", "type": [ "int", "null" ] },
                          { "name": "cpe_cost", "type": [ "int", "null" ] },
                          { "name": "engagement_actions", "type":
                              { "type": "array", "items": "string" }
                          }
                          ]}
                  }
                  }                  
              ]}
              }
          }
          ]}
        """
        return schema.parse(out_sch), dat_hnd

outputs:
  - destination: 
split:
  bucket_size_bytes: 2147483648 # 2 GB
sort:
  num_partition_keys: 5
  key_names:
    - tenant_id
    - entity_type
    - entity_domain
    - batch_id
    - entity_id_bucket
  project:
    '.':
      field_names: [tenant_id, entity_type, entity_domain, entity_id]
  key:
    - template: '{{index . 0}}'
    - template: '{{index . 1}}'
    - template: '{{index . 2}}'
    - template: '123456' # need to add '{{env BATCH_ID}}' when productionizing
    - template: '{{bucket (index . 2) 3}}'

capacity:
  split_stage:
    in_proc_executor:
      num_workers: 12
  collate_stage:
    in_proc_executor:
      num_workers: 32
  sort_stage:
    in_proc_executor:
      num_workers: 3
  merge_plan_stage:
    in_proc_executor:
      num_workers: 20
  merge_do_stage:
    in_proc_executor:
      num_workers: 20
  transform_stage:
    in_proc_executor:
      num_workers: 30
  • Table of contents

Was this article helpful?