config.yaml

############################
# SETUP
# 1) Set work area, inputs, outputs 20220726163015

# note: metrics, others, and other_entity_ids will be shared across all dates since we dont know which specific date they apply to
############################


work_area: s3://.default@com.aqfer..development/new-pipeline-test/collate-merge/work-area/100021
separate_transform_stage: true
inputs:
  - name: events
    locations:
      - folder: s3://.default@com.aqfer..development/new-pipeline-test/collate-sort/entity_type=ck/entity_domain=TTD #new sort only
      - folder: s3://.default@com.aqfer..development/new-pipeline-test/collate-merge/collate-merge/output/avro
    capture_partition_keys:
      - entity_type
      - entity_domain
      - entity_id_bucket
    key:
      - entity_id
      - entity_type
      - entity_domain
      - entity_id_bucket
      - advertiser_id 
transform:
  - python:
      code: |
          import schema
          import time
          import datetime
          import copy

          invalid_ids = set(['NA'])
          current_ts = int(time.time() * 1000)


          def isValid(e):
              return e is not None and e != '' and e != 'NA'

          def non_empty(list):
              return len(list) > 0              

          def sortByTimestamp(evt):
              evt.sort(key=lambda x: x['event_timestamp'])

          def update_key_if_valid(result, key, value):
              if not isValid(result.get(key)):
                      result[key] = value

          def update_events(rec, result):
              event_types = set(['attributes', 'site_visits', 'segmentation_events', 'conversions', 'webvisits', 'page_views'])
              result['country_code'] = None
              result['region_code'] = None
              result['geo_id'] = None
              result['other_entity_ids'] = []
              result['others'] = {}
              result['metrics'] = {}
              # result['has_clicks'] = False
              result['advertiser_id'] = rec['advertiser_id']
              impressions_dict = {}
              invalid_impressions = []
            
              for e in event_types:
                  result[e] = []
              input_events = rec['events']
              for evt in input_events:
                  update_key_if_valid(result, 'country_code', evt['country_code'])
                  update_key_if_valid(result, 'region_code', evt['region_code'])
                  update_key_if_valid(result, 'geo_id', evt['geo_id'])
                  for e in event_types:
                      result[e].extend(evt[e])
                  for imp in evt['impressions']:
                      imp_id = imp['event_id']
                      if imp_id in invalid_ids:
                          invalid_impressions.append(imp)
                      else:
                          if imp_id not in impressions_dict:
                              impressions_dict[imp_id] = imp
                          else:
                              imp_event = impressions_dict[imp_id]
            
                              if imp_event['synthetic_event'] == True and imp['synthetic_event'] == False:
                                  imp['clicks'].extend(imp_event['clicks'])
                                  imp['engagements'].extend(imp_event['engagements'])
                                  imp_event = imp
                                  impressions_dict[imp_id] = imp_event                          
                              else:
                                  imp_event['clicks'].extend(imp['clicks'])
                                  imp_event['engagements'].extend(imp['engagements'])
                                  imp_event['others'].update(imp['others'])
                      impressions_dict[imp_id]['has_click'] = non_empty(impressions_dict[imp_id]['clicks'])
                      if impressions_dict[imp_id]['has_click_conversion'] is None or not impressions_dict[imp_id]['has_click_conversion']:
                          impressions_dict[imp_id]['has_click_conversion'] = imp['has_click_conversion']
                      if impressions_dict[imp_id]['viewable'] is None or not impressions_dict[imp_id]['viewable']:
                          impressions_dict[imp_id]['viewable'] = imp['viewable']
                      sortByTimestamp(impressions_dict[imp_id]['clicks'])
                      sortByTimestamp(impressions_dict[imp_id]['engagements'])
                  result['other_entity_ids'].extend(evt['other_entity_ids'])
                  result['others'].update(evt['others'])
                  result['metrics'].update(evt['metrics'])
            
              result['impressions'] = list(impressions_dict.values())
              result['impressions'].extend(invalid_impressions)
              sortByTimestamp(result['impressions'])
              for e in event_types:
                  sortByTimestamp(result[e])
           
          def split_by_date(rec):
              event_types = set(['impressions', 'site_visits', 'segmentation_events', 'conversions', 'webvisits', 'page_views'])
              results = []
              recsdict = {}
              # create a parent rec without events
              parentrec = {}
              for i in rec:
                  if i in event_types:
                      parentrec[i] = []
                  else:
                      parentrec[i] = rec[i]
        
              # sort and split by event timestamp
              for e in event_types:
                  for evt in rec[e]:
                      evt_date = datetime.datetime.utcfromtimestamp(int(str(evt['event_timestamp'])[0:10])).strftime('%Y%m%d')
                      if evt_date not in recsdict:
                          recsdict[evt_date] = copy.deepcopy(parentrec)
                      recsdict[evt_date][e].append(evt)
        
              for dt, evt in recsdict.items():
                  evt['event_date'] = dt
                  evt['has_conversion'] = non_empty(evt['conversions'])
                  evt['has_page_view'] = non_empty(evt['page_views'])
                  evt['has_webvisit'] = non_empty(evt['webvisits'])
                  evt['has_site_visits'] = non_empty(evt['site_visits'])
                  # if any of the impressions has a true 'has_click' flag, we can set the global 'has_clicks' = True
                  evt['has_clicks'] = False
                  for i in evt['impressions']:
                      if i['has_click'] == True:
                          evt['has_clicks'] = True                       
                  results.append(evt)
              return results

          def dat_hnd(rec: any) -> any:
              result = {}
              results = []
              result['entity_id'] = rec['entity_id']
              result['event_window_duration'] = '1 Day'
              result['last_updated_ts'] = current_ts
              result['entity_type'] = rec['entity_type']
              result['entity_domain'] = rec['entity_domain']
              result['entity_id_bucket'] = rec['entity_id_bucket']
              update_events(rec, result)
              results = split_by_date(result)
              return results


          def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
            out_sch = """
              { "type": "record", "name": "aqfer_data_subject_collated", "fields": [
              { "name": "event_date", "type": "string" },
              { "name": "entity_type", "type": "string" },
              { "name": "entity_domain", "type": "string" },
              { "name": "entity_id_bucket", "type": "string" },
              { "name": "entity_id", "type": "string" },
              { "name": "advertiser_id", "type": "string" },
              { "name": "event_window_duration", "type": "string" },
              { "name": "country_code", "type": [ "string", "null" ] },
              { "name": "region_code", "type": [ "string", "null" ] },
              { "name": "geo_id", "type": [ "long", "null" ] },
              { "name": "attributes", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_attribute", "fields": [
                      { "name": "name", "type": "string" },
                      { "name": "value", "type": "string" },
                      { "name": "source", "type": "string" },
                      { "name": "source_attribute_name", "type": "string" },
                      { "name": "source_attribute_value", "type": "string" },
                      { "name": "modeled", "type": "boolean" },
                      { "name": "score", "type": "int" },
                      { "name": "magnitude", "type": "int" },
                      { "name": "last_updated_ts", "type": "long" },
                      { "name": "modified_run_id", "type": "long" }
                  ]}
                  }
              },
              { "name": "impressions", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_impression", "fields": [
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "event_type", "type": "string" },
                      { "name": "event_id", "type": "string" },
                      { "name": "synthetic_event", "type": "boolean" },
                      { "name": "source", "type": "string" },
                      { "name": "parent_event_id", "type": "string" },
                      { "name": "agency_id", "type": "string" },
                      { "name": "advertiser_id", "type": "string" },
                      { "name": "page_url", "type": [      { "type": "record", "name": "url_struct", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_1", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_2", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "campaign_id", "type": [ "string", "null" ] },
                      { "name": "ad_group_id", "type": [ "string", "null" ] },
                      { "name": "creative_id", "type": [ "string", "null" ] },
                      { "name": "placement_id", "type": [ "string", "null" ] },
                      { "name": "other_marketing_program_levels", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "media_partner", "type": [ "string", "null" ] },
                      { "name": "inventory_partner", "type": [ "string", "null" ] },
                      { "name": "supply_vendor_publisher_id", "type": [ "string", "null" ] },
                      { "name": "site_id", "type": [ "string", "null" ] },
                      { "name": "other_media_group_levels", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "has_click_conversion", "type": [ "boolean", "null" ] },
                      { "name": "has_click", "type": [ "boolean", "null" ] },
                      { "name": "viewable", "type": [ "boolean", "null" ] },
                      { "name": "cpm_currency", "type": [ "string", "null" ] },
                      { "name": "cpm_paid", "type": [ "int", "null" ] },
                      { "name": "cpm_cost", "type": [ "int", "null" ] },
                      { "name": "keywords", "type": [ "string", "null" ] },
                      { "name": "search_terms", "type": [ "string", "null" ] },
                      { "name": "search_phrase", "type": [ "string", "null" ] },
                      { "name": "other_entity_ids", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "entity_struct", "fields": [
                              { "name": "entity_type", "type": [ "string", "null" ] },
                              { "name": "entity_domain", "type": [ "string", "null" ] },
                              { "name": "entity_id", "type": [ "string", "null" ] }
                              ]}
                      }
                      },
                      { "name": "other_event_ids", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "http_info", "type":       { "type": "record", "name": "http_schema", "fields": [
                              { "name": "status_code", "type": [ "string", "null" ] },
                              { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_3", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                              { "name": "cookie", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct", "fields": [
                                      { "name": "ip_type", "type": [ "string", "null" ] },
                                      { "name": "ip_domain", "type": [ "string", "null" ] },
                                      { "name": "ip", "type": [ "string", "null" ] }
                                      ]}, "null"]},
                              { "name": "user_agent_info", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "headers", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}},
                      { "name": "geo", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "metrics", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "others", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "clicks", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "aqfer_click_subevent", "fields": [
                              { "name": "event_timestamp", "type": "long" },
                              { "name": "agency_id", "type": "string" },
                              { "name": "click_id", "type": "string" },
                              { "name": "page_url", "type": [          { "type": "record", "name": "url_struct_4", "fields": [
                                          { "name": "scheme", "type": [ "string", "null" ] },
                                          { "name": "host", "type": [ "string", "null" ] },
                                          { "name": "path", "type": [ "string", "null" ] },
                                          { "name": "query", "type":
                                              { "type": "map", "values": "string" }
                                          }
                                          ]}, "null"]},
                              { "name": "page_referer_url", "type": [          { "type": "record", "name": "url_struct_5", "fields": [
                                          { "name": "scheme", "type": [ "string", "null" ] },
                                          { "name": "host", "type": [ "string", "null" ] },
                                          { "name": "path", "type": [ "string", "null" ] },
                                          { "name": "query", "type":
                                              { "type": "map", "values": "string" }
                                          }
                                          ]}, "null"]},
                              { "name": "destination_url", "type": [          { "type": "record", "name": "url_struct_6", "fields": [
                                          { "name": "scheme", "type": [ "string", "null" ] },
                                          { "name": "host", "type": [ "string", "null" ] },
                                          { "name": "path", "type": [ "string", "null" ] },
                                          { "name": "query", "type":
                                              { "type": "map", "values": "string" }
                                          }
                                          ]}, "null"]},
                              { "name": "cpc_paid", "type": [ "int", "null" ] },
                              { "name": "cpc_cost", "type": [ "int", "null" ] },
                              { "name": "other_event_ids", "type":
                                  { "type": "map", "values": "string" }
                              },
                              { "name": "metrics", "type":
                                  { "type": "map", "values": "string" }
                              },
                              { "name": "others", "type":
                                  { "type": "map", "values": "string" }
                              }
                              ]}
                      }
                      },
                      { "name": "engagements", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "aqfer_engagement_subevent", "fields": [
                              { "name": "engagement_type", "type": "string" },
                              { "name": "engagement_sub_type", "type": [ "string", "null" ] },
                              { "name": "event_timestamp", "type": "long" },
                              { "name": "cpc_paid", "type": [ "int", "null" ] },
                              { "name": "cpc_cost", "type": [ "int", "null" ] },
                              { "name": "cpe_paid", "type": [ "int", "null" ] },
                              { "name": "cpe_cost", "type": [ "int", "null" ] },
                              { "name": "engagement_actions", "type":
                                  { "type": "array", "items": "string" }
                              },
                              { "name": "metrics", "type":
                                  { "type": "map", "values": "string" }
                              },
                              { "name": "others", "type":
                                  { "type": "map", "values": "string" }
                              }
                              ]}
                      }
                      }
                  ]}
                  }
              },
              { "name": "conversions", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_conversion", "fields": [
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "event_type", "type": "string" },
                      { "name": "event_id", "type": "string" },
                      { "name": "source", "type": "string" },
                      { "name": "parent_event_id", "type": "string" },
                      { "name": "agency_id", "type": "string" },
                      { "name": "advertiser_id", "type": "string" },
                      { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_7", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_8", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_9", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "conversion_actions", "type":
                      { "type": "array", "items": "string" }
                      },
                      { "name": "conversion_type", "type": [ "string", "null" ] },
                      { "name": "conversion_sub_type", "type": [ "string", "null" ] },
                      { "name": "keywords", "type": [ "string", "null" ] },
                      { "name": "search_terms", "type": [ "string", "null" ] },
                      { "name": "search_phrase", "type": [ "string", "null" ] },
                      { "name": "other_entity_ids", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "entity_struct_1", "fields": [
                              { "name": "entity_type", "type": [ "string", "null" ] },
                              { "name": "entity_domain", "type": [ "string", "null" ] },
                              { "name": "entity_id", "type": [ "string", "null" ] }
                              ]}
                      }
                      },
                      { "name": "other_event_ids", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "http_info", "type":       { "type": "record", "name": "http_schema_1", "fields": [
                              { "name": "status_code", "type": [ "string", "null" ] },
                              { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_10", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                              { "name": "cookie", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct_1", "fields": [
                                      { "name": "ip_type", "type": [ "string", "null" ] },
                                      { "name": "ip_domain", "type": [ "string", "null" ] },
                                      { "name": "ip", "type": [ "string", "null" ] }
                                      ]}, "null"]},
                              { "name": "user_agent_info", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "headers", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}},
                      { "name": "geo", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "metrics", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "others", "type":
                      { "type": "map", "values": "string" }
                      }
                  ]}
                  }
              },
              { "name": "page_views", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_page_view", "fields": [
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "event_type", "type": "string" },
                      { "name": "event_id", "type": "string" },
                      { "name": "source", "type": "string" },
                      { "name": "parent_event_id", "type": "string" },
                      { "name": "agency_id", "type": "string" },
                      { "name": "advertiser_id", "type": "string" },
                      { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_11", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_12", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_13", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "keywords", "type": "string" },
                      { "name": "search_terms", "type": "string" },
                      { "name": "search_phrase", "type": "string" },
                      { "name": "other_entity_ids", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "entity_struct_2", "fields": [
                              { "name": "entity_type", "type": [ "string", "null" ] },
                              { "name": "entity_domain", "type": [ "string", "null" ] },
                              { "name": "entity_id", "type": [ "string", "null" ] }
                              ]}
                      }
                      },
                      { "name": "other_event_ids", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "http_info", "type":       { "type": "record", "name": "http_schema_2", "fields": [
                              { "name": "status_code", "type": [ "string", "null" ] },
                              { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_14", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                              { "name": "cookie", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct_2", "fields": [
                                      { "name": "ip_type", "type": [ "string", "null" ] },
                                      { "name": "ip_domain", "type": [ "string", "null" ] },
                                      { "name": "ip", "type": [ "string", "null" ] }
                                      ]}, "null"]},
                              { "name": "user_agent_info", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "headers", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}},
                      { "name": "geo", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "metrics", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "others", "type":
                      { "type": "map", "values": "string" }
                      }
                  ]}
                  }
              },
              { "name": "webvisits", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_webvisit", "fields": [
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "event_type", "type": "string" },
                      { "name": "event_id", "type": "string" },
                      { "name": "source", "type": "string" },
                      { "name": "parent_event_id", "type": "string" },
                      { "name": "agency_id", "type": "string" },
                      { "name": "advertiser_id", "type": "string" },
                      { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_15", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_16", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_17", "fields": [
                              { "name": "scheme", "type": [ "string", "null" ] },
                              { "name": "host", "type": [ "string", "null" ] },
                              { "name": "path", "type": [ "string", "null" ] },
                              { "name": "query", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}, "null"]},
                      { "name": "keywords", "type": "string" },
                      { "name": "search_terms", "type": "string" },
                      { "name": "search_phrase", "type": "string" },
                      { "name": "other_entity_ids", "type":
                      { "type": "array", "items":
                              { "type": "record", "name": "entity_struct_3", "fields": [
                              { "name": "entity_type", "type": [ "string", "null" ] },
                              { "name": "entity_domain", "type": [ "string", "null" ] },
                              { "name": "entity_id", "type": [ "string", "null" ] }
                              ]}
                      }
                      },
                      { "name": "other_event_ids", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "http_info", "type":       { "type": "record", "name": "http_schema_3", "fields": [
                              { "name": "status_code", "type": [ "string", "null" ] },
                              { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_18", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                              { "name": "cookie", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct_3", "fields": [
                                      { "name": "ip_type", "type": [ "string", "null" ] },
                                      { "name": "ip_domain", "type": [ "string", "null" ] },
                                      { "name": "ip", "type": [ "string", "null" ] }
                                      ]}, "null"]},
                              { "name": "user_agent_info", "type":
                              { "type": "map", "values": "string" }
                              },
                              { "name": "headers", "type":
                              { "type": "map", "values": "string" }
                              }
                          ]}},
                      { "name": "geo", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "metrics", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "others", "type":
                      { "type": "map", "values": "string" }
                      }
                  ]}
                  }
              },
              { "name": "site_visits", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_site_visit", "fields": [
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "event_type", "type": "string" },
                      { "name": "domain", "type": "string" },
                      { "name": "metrics", "type":
                      { "type": "map", "values": "string" }
                      },
                      { "name": "others", "type":
                      { "type": "map", "values": "string" }
                      }
                  ]}
                  }
              },
              { "name": "segmentation_events", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "aqfer_segmentation_event", "fields": [
                      { "name": "event_timestamp", "type": "long" },
                      { "name": "partner_name", "type": "string" },
                      { "name": "partner_id", "type": "string" },
                      { "name": "partner_assigned_client_id", "type": "string" },
                      { "name": "partner_user_id", "type": "string" },
                      { "name": "segment", "type": "string" }
                  ]}
                  }
              },
              { "name": "metrics", "type":
                  { "type": "map", "values": "string" }
              },
              { "name": "others", "type":
                  { "type": "map", "values": "string" }
              },
              { "name": "other_entity_ids", "type":
                  { "type": "array", "items":
                  { "type": "record", "name": "entity_struct_4", "fields": [
                      { "name": "entity_type", "type": [ "string", "null" ] },
                      { "name": "entity_domain", "type": [ "string", "null" ] },
                      { "name": "entity_id", "type": [ "string", "null" ] }
                  ]}
                  }
              },
              { "name": "last_updated_ts", "type": "long" },
              { "name": "has_conversion", "type": "boolean" },
              { "name": "has_site_visits", "type": "boolean" },
              { "name": "has_page_view", "type": "boolean" },
              { "name": "has_webvisit", "type": "boolean" },
              { "name": "has_clicks", "type": "boolean" }
              ]}
            """
            return schema.parse(out_sch), dat_hnd

outputs:
## UPDATE AVRO DESTINATION TO VTMS FOR PROD ## 
  - destination: s3://.default@com.aqfer..development/new-pipeline-test/collate-merge/collate-merge/output/avro
    split_by:
      key:
      - entity_type
      - entity_domain
      #- advertiser_id
      - event_date
      - entity_id_bucket
      strip_key: true     
  - destination: s3://.default@com.aqfer..development/new-pipeline-test/collate-merge/collate-merge/output/parquet
    format: parquet
    split_by:
      key:
      - entity_type
      - entity_domain
      - advertiser_id
      - event_date
      - entity_id_bucket
      strip_key: true
split:
  bucket_size_bytes: 2147483648 # 2 GB
sort:
  num_partition_keys: 0
  key_names:
    - entity_type
    - entity_domain
    - advertiser_id
    - batch_id
    - entity_id
  project:
    '.':
      field_names: [ entity_type, entity_domain, entity_id, advertiser_id ]
capacity:
  split_stage:
    in_proc_executor:
      num_workers: 12
  collate_stage:
    in_proc_executor:
      num_workers: 32
  sort_stage:
    in_proc_executor:
      num_workers: 4
  merge_plan_stage:
    in_proc_executor:
      num_workers: 30
  merge_do_stage:
    in_proc_executor:
      num_workers: 100
  transform_stage:
    in_proc_executor:
      num_workers: 40
  • Table of contents

Was this article helpful?