collate-merge.yaml

############################
# SETUP
# 1) Set work area, inputs, outputs

# NOTES
# schema for interactions is complete but logic will be completed as needed
# metrics, others, and other_entity_ids will be shared across all dates since we dont know which specific date they apply to
############################

work_area: 
separate_transform_stage: true
inputs:
  - name: events
    locations:
      # Output of this job (collate-merge)
      - folder: 
      # Output of collate-sort
      - folder: 
    capture_partition_keys:
      - tenant_id
      - entity_type
      - entity_domain
      - entity_id_bucket
    key:
      - tenant_id
      - entity_id
      - entity_type
      - entity_domain
      - entity_id_bucket
transform:
  - python:
      code: |
        import schema
        import time
        import datetime
        import copy
        import os

        invalid_ids = set(['NA'])
        current_ts = int(time.time() * 1000)
          
        run_id = os.environ.get('BATCH_ID')
        if run_id is None:
          raise Exception("batch_id not found")

        lookback_days = 3 # variable to be set to match input lookback window
        lookback_date = datetime.datetime.utcnow().date() - datetime.timedelta(days = lookback_days)

        def isValid(e):
          return e is not None and e != '' and e != 'NA'
 
        def non_empty(list):
            return len(list) > 0              
 
        def sortByTimestamp(evt):
            evt.sort(key=lambda x: x['event_timestamp'])
 
        def update_key_if_valid(result, key, value):
            if not isValid(result.get(key)):
                    result[key] = value
 
        def update_events(rec, result, event_groups):
            event_types = set(['Attributes', 'Conversions', 'Views', 'GeoEvents', 'POIVisits', 'OtherEvents'])
            result['advertiser_id'] = rec.get('advertiser_id)')
            impressions_dict = {}
            invalid_impressions = []
          
            for e in event_types:
                result[e] = []
            input_events = rec['events']
            for evt in input_events:
                for e in event_types:
                    result[e].extend(evt[e])
                for imp in evt['Impressions']:
                    imp_id = imp['event_id']
                    if imp_id in invalid_ids:
                        invalid_impressions.append(imp)
                    else:
                        if imp_id not in impressions_dict:
                            impressions_dict[imp_id] = imp
                        else:
                            imp_event = impressions_dict[imp_id]
          
                            if imp_event['synthetic_event'] == True and imp['synthetic_event'] == False:
                                imp['clicks'].extend(imp_event['clicks'])
                                imp['engagements'].extend(imp_event['engagements'])
                                imp_event = imp
                                impressions_dict[imp_id] = imp_event                          
                            else:
                                imp_event['clicks'].extend(imp['clicks'])
                                imp_event['engagements'].extend(imp['engagements'])
                                imp_event['others'].update(imp['others'])
                    impressions_dict[imp_id]['has_click'] = non_empty(impressions_dict[imp_id]['clicks'])
                    impressions_dict[imp_id]['has_engagement'] = non_empty(impressions_dict[imp_id]['engagements'])
                    if impressions_dict[imp_id]['has_click_conversion'] is None or not impressions_dict[imp_id]['has_click_conversion']:
                        impressions_dict[imp_id]['has_click_conversion'] = imp['has_click_conversion']
                    if impressions_dict[imp_id]['viewable'] is None or not impressions_dict[imp_id]['viewable']:
                        impressions_dict[imp_id]['viewable'] = imp['viewable']
                    sortByTimestamp(impressions_dict[imp_id]['clicks'])
                    sortByTimestamp(impressions_dict[imp_id]['engagements'])
                for event_group in evt['EventGroups']:
                    event_id = event_group['event_id']
                    if event_id not in event_groups:
                         event_groups[event_id] = event_group
          
            result['Impressions'] = list(impressions_dict.values())
            result['Impressions'].extend(invalid_impressions)
            sortByTimestamp(result['Impressions'])
            for e in event_types:
                sortByTimestamp(result[e])
         
        def split_by_date(rec, event_groups):
            event_types = set(['Impressions',  'Conversions', 'Views', 'GeoEvents', 'POIVisits', 'OtherEvents'])
            results = []
            recsdict = {}
            # create a parent rec without events
            parentrec = {}
            for i in rec:
                if i in event_types:
                    parentrec[i] = []
                else:
                    parentrec[i] = rec[i]
      
            # sort and split by event timestamp
            for e in event_types:
                for evt in rec[e]:
                    evt_date = datetime.datetime.utcfromtimestamp(int(str(evt['event_timestamp'])[0:10])).strftime('%Y%m%d')
                    if evt_date not in recsdict:
                        recsdict[evt_date] = copy.deepcopy(parentrec)
                    # add event group by event_group_id if not exists
                    if event_groups.get(evt['event_group_id']) not in recsdict[evt_date]['EventGroups']:
                        recsdict[evt_date]['EventGroups'].append(event_groups.get(evt['event_group_id']))
                    recsdict[evt_date][e].append(evt)
      
            for dt, evt in recsdict.items():
                evt['event_date'] = dt
                # set flags
                evt['has_impressions'] = non_empty(evt['Impressions'])
                evt['has_conversions'] = non_empty(evt['Conversions'])
                evt['has_views'] = non_empty(evt['Views'])
                evt['has_geo_events'] = non_empty(evt['GeoEvents'])
                evt['has_poi_visits'] = non_empty(evt['POIVisits'])
                evt['has_other_events'] = non_empty(evt['OtherEvents'])
                
                # if any of the impressions has a true 'has_click' or 'has_engagement' flag, we can set the global booleans = True
                evt['has_clicks'] = False
                evt['has_engagements'] = False
                evt['has_interactions'] = False ## TODO calculate this later as part of interaction subevent
                for i in evt['Impressions']:
                    if i['has_click'] == True:
                        evt['has_clicks'] = True   
                    if i['has_engagement'] == True:
                        evt['has_engagements'] = True  
                if datetime.datetime.strptime(dt, "%Y%m%d").date() < lookback_date:
                  # dont overwrite - this is beyond the lookback window
                  evt['beyond_lookback'] = True
                else:
                  # overwrite - this is within the lookback window
                  evt['beyond_lookback'] = False 
                results.append(evt)
            update_event_groups(results)
            return results
        def update_event_groups(results):
          for result in results:
              event_types = set(['Impressions',  'Conversions', 'Views', 'GeoEvents', 'POIVisits', 'OtherEvents'])
              events_dict = {}
              for e in event_types:
                  for evt in result[e]:
                      if evt['event_group_id'] not in events_dict:
                          events_dict[evt['event_group_id']] = [evt]
                      else:
                          events_dict[evt['event_group_id']].append(evt)
 
              for event_group in result['EventGroups']:
                  events = events_dict[event_group['event_id']]
                  sortByTimestamp(events)
                  event_group['event_timestamp'] = events[0]['event_timestamp']
                  event_group['first_event_id'] = events[0]['event_id']
                  event_group['last_timestamp'] = events[-1]['event_timestamp']
                  event_group['events_count'] = len(events)
 
              # if this events ts is earlier than the data_subject 
              # level ts, replace data_subject with this event ts
              if  result['event_window_start'] is None or event_group['event_timestamp'] < result['event_window_start']:
                  result['event_window_start'] = event_group['event_timestamp']
 
        def dat_hnd(rec: any) -> any:
            print('start rec *********************************************')
            print(rec)
            print('end rec *********************************************')
            result = {}
            results = []
            event_groups = {}
            result['entity_id'] = rec['entity_id']
            result['batch_id'] = run_id
            result['event_window_start'] = None # calculated with event groups
            result['event_window_duration'] = '1 Day'
            result['tenant_id'] = rec['tenant_id']              
            result['entity_type'] = rec['entity_type']
            result['entity_domain'] = rec['entity_domain']
            result['entity_id_bucket'] = rec['entity_id_bucket']
            result['EventGroups'] = []
            update_events(rec, result, event_groups)
            results = split_by_date(result, event_groups)
            return results
          
 
        def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
          out_sch = """
            { "type": "record", "name": "aqfer_data_subject_collated", "fields": [
            { "name": "event_date", "type": "string" },
            { "name": "batch_id", "type": "string" }, 
            { "name": "tenant_id", "type": "string" },
            { "name": "entity_type", "type": "string" },
            { "name": "entity_domain", "type": "string" },
            { "name": "entity_id_bucket", "type": "string" },
            { "name": "entity_id", "type": "string" },  
            { "name": "event_window_start", "type": ["long", "null"] },
            { "name": "event_window_duration", "type": ["string", "null"] }, 
            { "name": "has_impressions", "type": "boolean" }, 
            { "name": "has_clicks", "type": "boolean" },
            { "name": "has_engagements", "type": "boolean" },
            { "name": "has_conversions", "type": "boolean" },  
            { "name": "has_views", "type": "boolean" },   
            { "name": "has_interactions", "type": "boolean" },
            { "name": "has_geo_events", "type": "boolean" },
            { "name": "has_poi_visits", "type": "boolean" },
            { "name": "has_other_events", "type": "boolean" },
            { "name": "Attributes", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_attribute", "fields": [
                    { "name": "confidence", "type": ["int", "null"] },
                    { "name": "data_type", "type": "string" },
                    { "name": "global_region", "type": "string" },  
                    { "name": "magnitude", "type": ["int", "null"] },
                    { "name": "modeled", "type": "boolean" },
                    { "name": "method", "type": "boolean" },
                    { "name": "name", "type": "string" },  
                    { "name": "score", "type": ["int", "null"] },
                    { "name": "source", "type": "string" },
                    { "name": "source_attribute_name", "type": "string" },
                    { "name": "source_attribute_value", "type": "string" },
                    { "name": "value", "type": "string" },
                    { "name": "others", "type": { "type": "map", "values": "string" }}
                ]}
                }
            },
            { "name": "EventGroups", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_event_group", "fields": [
                    { "name": "event_id", "type": "string" },
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "first_event_id", "type": "string" },
                    { "name": "last_timestamp", "type": "long" },                  
                    { "name": "source", "type": "string" },
                    { "name": "events_count", "type": ["int", "null"] },
                    { "name": "browser", "type": "string" },
                    { "name": "browser_version", "type": "string" },
                    { "name": "device_type", "type": "string" },
                    { "name": "hashed_ip", "type": "string" },
                    { "name": "network", "type": "string" },
                    { "name": "network_type", "type": "string" },
                    { "name": "os", "type": "string" },
                    { "name": "os_version", "type": "string" },
                    { "name": "throughput", "type": "string" },
                    { "name": "user_agent", "type": "string" },
                    { "name": "area_code", "type": "string" },
                    { "name": "city", "type": "string" },
                    { "name": "country_code", "type": "string" },
                    { "name": "dma", "type": "string" },
                    { "name": "fips", "type": "string" },
                    { "name": "global_region", "type": "string" },
                    { "name": "latitude", "type": "string" },
                    { "name": "longitude", "type": "string" },
                    { "name": "msa", "type": "string" },
                    { "name": "region_code", "type": "string" },
                    { "name": "timezone", "type": "string" },
                    { "name": "zip", "type": "string" },
                    { "name": "policy", "type": { "type": "array", "items":
                      { "type": "record", "name": "policy", "fields": [
                          { "name": "policy_id", "type": "string" },
                          { "name": "policy_effective_date", "type": "string" },
                          { "name": "effectivity", "type": "string" },
                          { "name": "policy_detail", "type": "string" }]}}},        
                    { "name": "other_geo", "type":{ "type": "map", "values": "string" }}
                ]}
                }
            },
            { "name": "Conversions", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_conversion", "fields": [
                    { "name": "event_id", "type": "string" },
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "event_group_id", "type": "string" },
                    { "name": "source", "type": "string" },
                    { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                    { "name": "event_subtype_1", "type": [ "string", "null" ]},
                    { "name": "event_subtype_2", "type": [ "string", "null" ]},
                    { "name": "http_status_code", "type": "string" },
                    { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_1", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_2", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "synthetic_event", "type": "boolean" },
                    { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_3", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                    { "name": "other_entity_keys", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "entity_struct_1", "fields": [
                            { "name": "entity_type", "type": [ "string", "null" ] },
                            { "name": "entity_domain", "type": [ "string", "null" ] },
                            { "name": "entity_id", "type": [ "string", "null" ] }
                            ]}}},                  
                    { "name": "metrics", "type": { "type": "map", "values": "string" }},
                    { "name": "others", "type":{ "type": "map", "values": "string" }},
                    { "name": "ad_group_id", "type": "string" },
                    { "name": "advertiser_id", "type": "string" },
                    { "name": "agency_id", "type": "string" },
                    { "name": "campaign_id", "type": "string" },
                    { "name": "creative_id", "type": "string" },
                    { "name": "placement_id", "type": "string" },
                    { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_4", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type":
                            { "type": "map", "values": "string"}} ]}, "null"]},
                    { "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }},
                    { "name": "keywords", "type": "string" },
                    { "name": "inventory_partner", "type": "string" },
                    { "name": "media_partner", "type": "string" },
                    { "name": "search_phase", "type": "string" },
                    { "name": "search_terms", "type": "string" },
                    { "name": "site_id", "type": "string" },
                    { "name": "supply_vendor_publisher_id", "type": "string" },
                    { "name": "other_media_group_levels", "type":{ "type": "map", "values": "string" }}
                ]}
                }
            },
            { "name": "Views", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_view", "fields": [
                    { "name": "event_id", "type": "string" }, 
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "event_group_id", "type": "string" },
                    { "name": "source", "type": "string" },
                    { "name": "event_actions", "type":{ "type": "array", "items": "string" }},                 
                    { "name": "event_subtype_1", "type": [ "string", "null" ]},
                    { "name": "event_subtype_2", "type": [ "string", "null" ]},
                    { "name": "http_status_code", "type": "string" },
                    { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_5", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type":
                            { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_6", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type":
                            { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "synthetic_event", "type": "boolean" },
                    { "name": "tag_url", "type": [      { "type": "record", "name": "url_struct_7", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type":
                            { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                    { "name": "other_entity_keys", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "entity_struct_1", "fields": [
                            { "name": "entity_type", "type": [ "string", "null" ] },
                            { "name": "entity_domain", "type": [ "string", "null" ] },
                            { "name": "entity_id", "type": [ "string", "null" ] }
                            ]}}},   
                    { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                    { "name": "others", "type":{ "type": "map", "values": "string" }},
                    { "name": "advertiser_id", "type": "string" },
                    { "name": "agency_id", "type": "string" },
                    { "name": "has_interaction", "type": [ "boolean", "null" ] },
                    { "name": "Interactions", "type":
                        { "type": "array", "items":
                        { "type": "record", "name": "aqfer_interaction", "fields": [
                          { "name": "event_id", "type": "string" }, 
                          { "name": "event_timestamp", "type": "long" },
                          { "name": "event_group_id", "type": "string" },
                          { "name": "source", "type": "string" },
                          { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                          { "name": "event_subtype_1", "type": [ "string", "null" ]},
                          { "name": "event_subtype_2", "type": [ "string", "null" ]},
                          { "name": "http_status_code", "type": "string" },
                          { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_5", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                  { "type": "map", "values": "string" }}]}, "null"]},
                          { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_6", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                  { "type": "map", "values": "string" }}]}, "null"]},
                          { "name": "synthetic_event", "type": "boolean" },
                          { "name": "tag_url", "type": [      { "type": "record", "name": "url_struct_7", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                  { "type": "map", "values": "string" }}]}, "null"]},
                          { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                          { "name": "other_entity_keys", "type":
                          { "type": "array", "items":
                                  { "type": "record", "name": "entity_struct_1", "fields": [
                                  { "name": "entity_type", "type": [ "string", "null" ] },
                                  { "name": "entity_domain", "type": [ "string", "null" ] },
                                  { "name": "entity_id", "type": [ "string", "null" ] }
                                  ]}}},   
                          { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                          { "name": "others", "type":{ "type": "map", "values": "string" }},
                          { "name": "advertiser_id", "type": "string" },
                          { "name": "agency_id", "type": "string" }
                      ]}
                }
            }
                ]}
                }
            },
            { "name": "GeoEvents", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_geo_event", "fields": [
                    { "name": "event_id", "type": "string" }, 
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "event_group_id", "type": "string" },
                    { "name": "source", "type": "string" },
                    { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                    { "name": "event_subtype_1", "type": [ "string", "null" ]},
                    { "name": "event_subtype_2", "type": [ "string", "null" ]},
                    { "name": "http_status_code", "type": "string" },
                    { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                    { "name": "other_entity_keys", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "entity_struct_1", "fields": [
                            { "name": "entity_type", "type": [ "string", "null" ] },
                            { "name": "entity_domain", "type": [ "string", "null" ] },
                            { "name": "entity_id", "type": [ "string", "null" ] }
                            ]}}},
                    { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                    { "name": "others", "type":{ "type": "map", "values": "string" }},
                    { "name": "horizontal_accuracy", "type": ["int", "null"] },
                    { "name": "device_id_type", "type": "string" }
                ]}
                }
            },    
            { "name": "POIVisits", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_poi_visit", "fields": [
                    { "name": "event_id", "type": "string" }, 
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "event_group_id", "type": "string" },
                    { "name": "source", "type": "string" },
                    { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                    { "name": "event_subtype_1", "type": [ "string", "null" ]},
                    { "name": "event_subtype_2", "type": [ "string", "null" ]},
                    { "name": "http_status_code", "type": "string" },
                    { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                    { "name": "other_entity_keys", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "entity_struct_1", "fields": [
                            { "name": "entity_type", "type": [ "string", "null" ] },
                            { "name": "entity_domain", "type": [ "string", "null" ] },
                            { "name": "entity_id", "type": [ "string", "null" ] }
                            ]}}},   
                    { "name": "metrics", "type":{ "type": "map", "values": "string" }},
                    { "name": "others", "type":{ "type": "map", "values": "string" }},
                    { "name": "poi_id", "type": "string" },
                    { "name": "minimum_duration", "type": ["int", "null"] },
                    { "name": "device_id_type", "type": "string" }
                ]}
                }
            },     
            { "name": "OtherEvents", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_event", "fields": [
                    { "name": "event_id", "type": "string" },
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "event_group_id", "type": "string" },
                    { "name": "source", "type": "string" },
                    { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                    { "name": "event_subtype_1", "type": [ "string", "null" ]},
                    { "name": "event_subtype_2", "type": [ "string", "null" ]},
                    { "name": "http_status_code", "type": "string" },
                    { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_8", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_2", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "synthetic_event", "type": "boolean" },
                    { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_9", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                    { "name": "other_entity_keys", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "entity_struct_1", "fields": [
                            { "name": "entity_type", "type": [ "string", "null" ] },
                            { "name": "entity_domain", "type": [ "string", "null" ] },
                            { "name": "entity_id", "type": [ "string", "null" ] }
                            ]}}},                  
                    { "name": "metrics", "type": { "type": "map", "values": "string" }},
                    { "name": "others", "type":{ "type": "map", "values": "string" }},
                    { "name": "ad_group_id", "type": "string" },
                    { "name": "advertiser_id", "type": "string" },
                    { "name": "agency_id", "type": "string" },
                    { "name": "campaign_id", "type": "string" },
                    { "name": "creative_id", "type": "string" },
                    { "name": "placement_id", "type": "string" },
                    { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_4", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type":
                            { "type": "map", "values": "string"}} ]}, "null"]},
                    { "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }}
                ]}
                }
            },                                  
            { "name": "Impressions", "type":
                { "type": "array", "items":
                { "type": "record", "name": "aqfer_impression", "fields": [
                    { "name": "event_id", "type": "string" },
                    { "name": "event_timestamp", "type": "long" },
                    { "name": "event_group_id", "type": "string" },
                    { "name": "source", "type": "string" },
                    { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                    { "name": "event_subtype_1", "type": [ "string", "null" ]},
                    { "name": "event_subtype_2", "type": [ "string", "null" ]},
                    { "name": "http_status_code", "type": "string" },
                    { "name": "page_url", "type": [ { "type": "record", "name": "url_struct_8", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "page_referer_url", "type": [ { "type": "record", "name": "url_struct_9", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "synthetic_event", "type": "boolean" },
                    { "name": "tag_url", "type": [ { "type": "record", "name": "url_struct_10", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type": { "type": "map", "values": "string" }}]}, "null"]},
                    { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                    { "name": "other_entity_keys", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "entity_struct_1", "fields": [
                            { "name": "entity_type", "type": [ "string", "null" ] },
                            { "name": "entity_domain", "type": [ "string", "null" ] },
                            { "name": "entity_id", "type": [ "string", "null" ] }
                            ]}}},                  
                    { "name": "metrics", "type": { "type": "map", "values": "string" }},
                    { "name": "others", "type":{ "type": "map", "values": "string" }},
                    { "name": "ad_group_id", "type": "string" },
                    { "name": "advertiser_id", "type": "string" },
                    { "name": "agency_id", "type": "string" },
                    { "name": "campaign_id", "type": "string" },
                    { "name": "creative_id", "type": "string" },
                    { "name": "placement_id", "type": "string" },
                    { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_11", "fields": [
                            { "name": "scheme", "type": [ "string", "null" ] },
                            { "name": "host", "type": [ "string", "null" ] },
                            { "name": "path", "type": [ "string", "null" ] },
                            { "name": "query", "type":
                            { "type": "map", "values": "string"}} ]}, "null"]},
                    { "name": "other_marketing_program_levels", "type":{ "type": "map", "values": "string" }},
                    { "name": "keywords", "type": "string" },
                    { "name": "inventory_partner", "type": "string" },
                    { "name": "media_partner", "type": "string" },
                    { "name": "search_phase", "type": "string" },
                    { "name": "search_terms", "type": "string" },
                    { "name": "site_id", "type": "string" },
                    { "name": "supply_vendor_publisher_id", "type": "string" },
                    { "name": "other_media_group_levels", "type":{ "type": "map", "values": "string" }},
                    { "name": "cpm_cost", "type": "int" },
                    { "name": "cpm_currency", "type": [ "string", "null" ] },
                    { "name": "cpm_paid", "type": "int" },
                    { "name": "has_click_conversion", "type": [ "boolean", "null" ] },
                    { "name": "has_click", "type": [ "boolean", "null" ] },
                    { "name": "has_engagement", "type": [ "boolean", "null" ] },
                    { "name": "viewable", "type": [ "boolean", "null" ] },
                    { "name": "Clicks", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "aqfer_click", "fields": [
                            { "name": "event_id", "type": "string" },
                            { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                           { "name": "event_subtype_1", "type": [ "string", "null" ]},
                            { "name": "event_subtype_2", "type": [ "string", "null" ]},
                            { "name": "http_status_code", "type": "string" },
                            { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                            { "name": "tag_url", "type": [          { "type": "record", "name": "url_struct_12", "fields": [
                                        { "name": "scheme", "type": [ "string", "null" ] },
                                        { "name": "host", "type": [ "string", "null" ] },
                                        { "name": "path", "type": [ "string", "null" ] },
                                        { "name": "query", "type":
                                            { "type": "map", "values": "string" }}]}, "null"]},                                                  
                            { "name": "others", "type":{ "type": "map", "values": "string" }},                         
                            { "name": "metrics", "type":{ "type": "map", "values": "string" }},                          
                            { "name": "cpc_paid", "type": [ "int", "null" ] },
                            { "name": "cpc_cost", "type": [ "int", "null" ] },
                            { "name": "video_pct", "type": [ "int", "null" ] },
                            { "name": "destination_url", "type": [          { "type": "record", "name": "url_struct_13", "fields": [
                                        { "name": "scheme", "type": [ "string", "null" ] },
                                        { "name": "host", "type": [ "string", "null" ] },
                                        { "name": "path", "type": [ "string", "null" ] },
                                        { "name": "query", "type":
                                            { "type": "map", "values": "string" }}]}, "null"]}
                            ]}
                    }
                    },
                    { "name": "Engagements", "type":
                    { "type": "array", "items":
                            { "type": "record", "name": "aqfer_engagement", "fields": [
                            { "name": "event_id", "type": "string" },
                            { "name": "event_actions", "type":{ "type": "array", "items": "string" }},
                            { "name": "event_subtype_1", "type": [ "string", "null" ]},
                            { "name": "event_subtype_2", "type": [ "string", "null" ]},
                            { "name": "http_status_code", "type": "string" },
                            { "name": "other_event_ids", "type":{ "type": "map", "values": "string" }},
                            { "name": "tag_url", "type": [          { "type": "record", "name": "url_struct_14", "fields": [
                                        { "name": "scheme", "type": [ "string", "null" ] },
                                        { "name": "host", "type": [ "string", "null" ] },
                                        { "name": "path", "type": [ "string", "null" ] },
                                        { "name": "query", "type":
                                            { "type": "map", "values": "string" }}]}, "null"]},                                                  
                            { "name": "others", "type":{ "type": "map", "values": "string" }},                         
                            { "name": "metrics", "type":{ "type": "map", "values": "string" }},                          
                            { "name": "cpe_paid", "type": [ "int", "null" ] },
                            { "name": "cpe_cost", "type": [ "int", "null" ] },
                            { "name": "engagement_actions", "type":
                                { "type": "array", "items": "string" }
                            }                       
                            ]}
                    }
                    }                  
                ]}
                }
            },
            { "name": "beyond_lookback", "type": "boolean" }
            ]}
          """
          return schema.parse(out_sch), dat_hnd

outputs:
## Output Enhancements:
# 1) After AQ-7888 is completed, remove field name list from project
## UPDATE AVRO DESTINATION TO VTMS FOR PROD ## 

  # event date is within lookback window - avro
  - destination: /new-schema-pipeline-test/collate-merge/collate-merge/avro
    project:
      '.':
        omit_if: beyond_lookback
        field_names: [event_date, tenant_id, entity_type, entity_domain, entity_id_bucket, entity_id, event_window_start, event_window_duration, has_impressions, has_clicks, has_engagements, has_conversions, has_views, has_interactions, has_geo_events, has_poi_visits, has_other_events, Attributes, EventGroups, Conversions, Views, GeoEvents, POIVisits, OtherEvents, Impressions]
    split_by:
      key:
      - tenant_id
      - entity_type
      - entity_domain
      - event_date
      - entity_id_bucket
      strip_key: true    
  # event date is within lookback window - parquet     
  - destination: /new-schema-pipeline-test/collate-merge/parquet
    format: parquet
    project:
      '.':
        omit_if: beyond_lookback
        field_names: [event_date, tenant_id, entity_type, entity_domain, entity_id, event_window_start, event_window_duration, has_impressions, has_clicks, has_engagements, has_conversions, has_views, has_interactions, has_geo_events, has_poi_visits, has_other_events, Attributes, EventGroups, Conversions, Views, GeoEvents, POIVisits, OtherEvents, Impressions]
    split_by:
      key:
      - tenant_id
      - entity_type
      - entity_domain
      - event_date
      strip_key: true
  # event date is beyond lookback window - parquet     
  - destination:  /new-schema-pipeline-test/collate-merge/parquet_beyond
    format: parquet
    project:
      '.':
        omit_unless: beyond_lookback
        field_names: [event_date, tenant_id, entity_type, entity_domain, batch_id, entity_id, event_window_start, event_window_duration, has_impressions, has_clicks, has_engagements, has_conversions, has_views, has_interactions, has_geo_events, has_poi_visits, has_other_events, Attributes, EventGroups, Conversions, Views, GeoEvents, POIVisits, OtherEvents, Impressions]
    split_by:
      key:
      - tenant_id
      - entity_type
      - entity_domain
      - event_date
      - batch_id
      strip_key: true      
split:
  bucket_size_bytes: 2147483648 # 2 GB
sort:
  num_partition_keys: 0
  key_names:
    - tenant_id 
    - entity_type
    - entity_domain
    - batch_id
    - entity_id
  project:
    '.':
      field_names: [tenant_id, entity_type, entity_domain, entity_id]
capacity:
  split_stage:
    in_proc_executor:
      num_workers: 12
  collate_stage:
    in_proc_executor:
      num_workers: 32
  sort_stage:
    in_proc_executor:
      num_workers: 4
  merge_plan_stage:
    in_proc_executor:
      num_workers: 30
  merge_do_stage:
    in_proc_executor:
      num_workers: 100
  transform_stage:
    in_proc_executor:
      num_workers: 40
  • Table of contents

Was this article helpful?