config.yaml

############################
# SETUP
# 1) [OPS] set batch_id
# 2) [OPS] set bucket count for sharding, default is 100
# 3) Set work area, inputs, outputs
############################

work_area: s3://.default@com.aqfer..development/new-pipeline-test/collate-sort/work-area/100011
separate_transform_stage: true
# input keys should be one among clicks, impressions, engagements, page_views, conversion and web_visits
inputs:
  - name: clicks
    locations:
      - folder: s3://.default@com.aqfer..development/new-pipeline-test/ingest/aqfer_click/valid_data=true/entity_type=ck/
    capture_partition_keys:
      - entity_type
      - entity_domain
    key:
      - entity_type
      - entity_domain
      - entity_id
      - advertiser_id
  - name: conversions
    locations:
      - folder: s3://.default@com.aqfer..development/new-pipeline-test/ingest/aqfer_conversion/valid_data=true/entity_type=ck/
    capture_partition_keys:
      - entity_type
      - entity_domain
    key:
      - entity_type
      - entity_domain
      - entity_id
      - advertiser_id
    transform:
    - rename_schema:
        http_schema: http_schema_conversions
        top_level: top_level_conversions
  - name: impressions
    locations:
      - folder: s3://.default@com.aqfer..development/new-pipeline-test/ingest/aqfer_impression/valid_data=true/entity_type=ck/
    capture_partition_keys:
      - entity_type
      - entity_domain
    key:
      - entity_type
      - entity_domain
      - entity_id
      - advertiser_id
    transform:
    - rename_schema:
        http_schema: http_schema_impressions
        top_level: top_level_impressions
  - name: engagements
    locations:
      - folder: s3://.default@com.aqfer..development/new-pipeline-test/entity-id-collation/output/with_event_id/entity_type=ck/
    capture_partition_keys:
      - entity_type
      - entity_domain
    key:
      - entity_type
      - entity_domain
      - entity_id
      - advertiser_id     
    transform:      
    - rename_schema:
        http_schema: http_schema_engagements
        #top_level: top_level_engagements
transform:
- python:
    code: |
      import schema
      import time

      # columns will be entity_id, clicks, impressions, engagements, page_views, conversions and webvisits
      # array columns can be empty or not found
      invalid_ids = set(['NA'])
      current_ts = int(time.time() * 1000)

      def isValid(cc):
          return cc is not None and cc != '' and cc != 'NA'

      def non_empty(list):
          return len(list) > 0

      def sortByTimestamp(evt):
          evt.sort(key=lambda x: x['event_timestamp'])

      def update_cc_and_rc(evt, result):
          cc = evt.pop('country_code', None)
          rc = evt.pop('region_code', None)
          if not isValid(result.get('country_code')):
              result['country_code'] = cc
          if not isValid(result.get('region_code')):
              result['region_code'] = rc

      def update_events(rec, result, key, advertiser_id):
          result[key] = []
          input_events = rec.get(key, [])
          for conv in input_events:
              update_cc_and_rc(conv, result)
              conv['advertiser_id'] = advertiser_id
              result[key].append(conv)
          sortByTimestamp(result[key])
              

      def get_clicks_dict(rec, result, subevents_dict, advertiser_id):
          invalid_clicks = []
          clicks = rec.get('clicks', [])
          for clk in clicks:
              update_cc_and_rc(clk, result)
              clk['advertiser_id'] = advertiser_id
              imp_id = clk['parent_event_id']
              clk_id = clk['event_id']
              orhan_imp = {}
              orhan_imp = clk
              orhan_imp['event_type'] = 'imp'
              orhan_imp['event_id'] = imp_id
              orhan_imp['has_click'] = True
              result['has_clicks'] = True
              orhan_imp['viewable'] = None
              orhan_imp['synthetic_event'] = True
              
              clk_subevent = {}
              clk_subevent['event_timestamp'] = orhan_imp['event_timestamp']
              clk_subevent['agency_id'] = orhan_imp['agency_id']
              clk_subevent['click_id'] = clk_id
              clk_subevent['page_url'] = orhan_imp['page_url']
              clk_subevent['page_referer_url'] = orhan_imp['page_referer_url']
              clk_subevent['destination_url'] = orhan_imp['destination_url']
              clk_subevent['cpc_paid'] = orhan_imp['cpc_paid']
              clk_subevent['cpc_cost'] = orhan_imp['cpc_cost']
              clk_subevent['other_event_ids'] = orhan_imp['other_event_ids']
              clk_subevent['metrics'] = orhan_imp['metrics']
              clk_subevent['others'] = orhan_imp['others']

              orhan_imp['clicks'] = [clk_subevent]
              orhan_imp['engagements'] = []

              del orhan_imp['destination_url']
              del orhan_imp['cpc_paid']
              del orhan_imp['cpc_cost']

              if imp_id in invalid_ids:
                  invalid_clicks.append(orhan_imp)
              else:
                  ex_clk = subevents_dict.get(imp_id)
                  if ex_clk is not None:
                      ex_clk['clicks'].append(clk_subevent)
                      ex_clk['has_click'] = True
                      ex_clk['has_click_conversion'] = ex_clk['has_click_conversion'] or orhan_imp['has_click_conversion']
                      subevents_dict[imp_id] = ex_clk
                  else:
                      subevents_dict[imp_id] = orhan_imp

          return subevents_dict, invalid_clicks

      def get_engagement_dict(rec, result, subevents_dict, advertiser_id):
          invalid_engagements = []
          engagements = rec.get('engagements', [])
          for eng in engagements:
              update_cc_and_rc(eng, result)
              eng['advertiser_id'] = advertiser_id
              imp_id = eng['parent_event_id']
              orhan_imp = {}
              orhan_imp = eng
              orhan_imp['event_type'] = 'impr'
              orhan_imp['event_id'] = imp_id
              orhan_imp['has_click_conversion'] = None
              orhan_imp['has_click'] = None
              orhan_imp['viewable'] = True
              orhan_imp['synthetic_event'] = True
              

              eng_subevent = {}
              eng_subevent['event_timestamp'] = orhan_imp['event_timestamp']
              eng_subevent['engagement_type'] = orhan_imp['engagement_type']
              eng_subevent['engagement_sub_type'] = orhan_imp['engagement_sub_type']
              eng_subevent['cpc_paid'] = orhan_imp['cpc_paid']
              eng_subevent['cpc_cost'] = orhan_imp['cpc_cost']
              eng_subevent['cpe_paid'] = orhan_imp['cpe_paid']
              eng_subevent['cpe_cost'] = orhan_imp['cpe_cost']
              eng_subevent['engagement_actions'] = orhan_imp['engagement_actions']
              eng_subevent['metrics'] = orhan_imp['metrics']
              eng_subevent['others'] = orhan_imp['others']

              orhan_imp['clicks'] = []
              orhan_imp['engagements'] = [eng_subevent]

              del orhan_imp['engagement_actions']
              del orhan_imp['engagement_type']
              del orhan_imp['engagement_sub_type']
              del orhan_imp['cpc_paid']
              del orhan_imp['cpc_cost']
              del orhan_imp['cpe_paid']
              del orhan_imp['cpe_cost']
              

              if imp_id in invalid_ids:
                  invalid_engagements.append(orhan_imp)
              else:
                  ex_eng = subevents_dict.get(imp_id)
                  if ex_eng is not None:
                      ex_eng['engagements'].append(eng_subevent)
                  else:
                      subevents_dict[imp_id] = orhan_imp

          return subevents_dict, invalid_engagements

      def get_impressions_dict(rec, result, advertiser_id):
          impressions_dict = {}
          impressions = rec.get('impressions', [])
          for inp_imp in impressions:
              inp_imp['advertiser_id'] = advertiser_id
              update_cc_and_rc(inp_imp, result)
              imp_id = inp_imp['event_id']
              if imp_id not in impressions_dict:
                  inp_imp['clicks'] = []
                  inp_imp['engagements'] = []
                  inp_imp['viewable'] = None
                  impressions_dict[imp_id] = inp_imp
              else:
                  impressions_dict[imp_id]['others'].update(inp_imp['others'])
          return impressions_dict

      def subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id):
          result['has_clicks'] = False
          impressions_dict = get_impressions_dict(rec, result, advertiser_id)
          [subevents_dict, invalid_subevents] = get_engagement_dict(rec, result, {}, advertiser_id)
          [subevents_dict, invalid_clicks] = get_clicks_dict(rec, result, subevents_dict, advertiser_id)
          invalid_subevents.extend(invalid_clicks)
          # insert subevents into parent impression
          impressions = []
          for imp_id in impressions_dict:
              impression = impressions_dict[imp_id]
              impression['synthetic_event'] = False
              se = subevents_dict.pop(imp_id, None)
              if se is not None:
                  impression['clicks'].extend(se['clicks'])
                  impression['engagements'].extend(se['engagements'])
                  impression['has_click'] = non_empty(impression['clicks'])
                  result['has_clicks'] = result['has_clicks'] or impression['has_click']
                  if impression['has_click_conversion'] is None or not impression['has_click_conversion']:
                      impression['has_click_conversion'] = se['has_click_conversion']
                  if impression['viewable'] is None or not impression['viewable']:
                      impression['viewable'] = se['viewable']
              impressions.append(impression)
          impressions.extend(invalid_subevents)
          impressions.extend(subevents_dict.values())
          sortByTimestamp(impressions)
          result['impressions'] = impressions


      def dat_hnd(rec: any) -> any:
          result = {}
          advertiser_id = rec['advertiser_id']
          result['entity_id'] = rec['entity_id']
          result['advertiser_id'] = advertiser_id
          #does this still apply since we remove the hourly paritions????
          result['event_window_duration'] = '1 Hour' 
          result['country_code'] = None #
          result['region_code']  = None  #
          result['geo_id'] = None
          result['attributes'] = []
          result['site_visits'] = []
          result['segmentation_events'] = []
          
          update_events(rec, result, 'conversions', advertiser_id)
          update_events(rec, result, 'webvisits', advertiser_id)
          update_events(rec, result, 'page_views', advertiser_id)
          subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
          result['has_conversion'] = non_empty(result['conversions'])
          result['has_page_view'] = non_empty(result['page_views'])
          result['has_webvisit'] = non_empty(result['webvisits'])
          result['has_site_visits'] = False
          result['last_updated_ts'] = current_ts
          result['metrics'] = {}
          result['others'] = {}
          result['other_entity_ids'] = []
          return [result]


      def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
        out_sch = """
          { "type": "record", "name": "aqfer_data_subject_collated", "fields": [
          { "name": "entity_id", "type": "string" },
          { "name": "advertiser_id", "type": "string" },
          { "name": "event_window_duration", "type": "string" },
          { "name": "country_code", "type": [ "string", "null" ] },
          { "name": "region_code", "type": [ "string", "null" ] },
          { "name": "geo_id", "type": [ "long", "null" ] },
          { "name": "attributes", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_attribute", "fields": [
                  { "name": "name", "type": "string" },
                  { "name": "value", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "source_attribute_name", "type": "string" },
                  { "name": "source_attribute_value", "type": "string" },
                  { "name": "modeled", "type": "boolean" },
                  { "name": "score", "type": "int" },
                  { "name": "magnitude", "type": "int" },
                  { "name": "last_updated_ts", "type": "long" },
                  { "name": "modified_run_id", "type": "long" }
              ]}
              }
          },
          { "name": "impressions", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_impression", "fields": [
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_type", "type": "string" },
                  { "name": "event_id", "type": "string" },                
                  { "name": "synthetic_event", "type": "boolean" },
                  { "name": "source", "type": "string" },
                  { "name": "parent_event_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "page_url", "type": [      { "type": "record", "name": "url_struct", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_1", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_2", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "campaign_id", "type": [ "string", "null" ] },
                  { "name": "ad_group_id", "type": [ "string", "null" ] },
                  { "name": "creative_id", "type": [ "string", "null" ] },
                  { "name": "placement_id", "type": [ "string", "null" ] },
                  { "name": "other_marketing_program_levels", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "media_partner", "type": [ "string", "null" ] },
                  { "name": "inventory_partner", "type": [ "string", "null" ] },
                  { "name": "supply_vendor_publisher_id", "type": [ "string", "null" ] },
                  { "name": "site_id", "type": [ "string", "null" ] },
                  { "name": "other_media_group_levels", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "has_click_conversion", "type": [ "boolean", "null" ] },
                  { "name": "has_click", "type": [ "boolean", "null" ] },
                  { "name": "viewable", "type": [ "boolean", "null" ] },
                  { "name": "cpm_currency", "type": [ "string", "null" ] },
                  { "name": "cpm_paid", "type": [ "int", "null" ] },
                  { "name": "cpm_cost", "type": [ "int", "null" ] },
                  { "name": "keywords", "type": [ "string", "null" ] },
                  { "name": "search_terms", "type": [ "string", "null" ] },
                  { "name": "search_phrase", "type": [ "string", "null" ] },
                  { "name": "other_entity_ids", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}
                  }
                  },
                  { "name": "other_event_ids", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "http_info", "type":       { "type": "record", "name": "http_schema", "fields": [
                          { "name": "status_code", "type": [ "string", "null" ] },
                          { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_3", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                      { "type": "map", "values": "string" }
                                  }
                                  ]}, "null"]},
                          { "name": "cookie", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct", "fields": [
                                  { "name": "ip_type", "type": [ "string", "null" ] },
                                  { "name": "ip_domain", "type": [ "string", "null" ] },
                                  { "name": "ip", "type": [ "string", "null" ] }
                                  ]}, "null"]},
                          { "name": "user_agent_info", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "headers", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}},
                  { "name": "geo", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "metrics", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "others", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "clicks", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "aqfer_click_subevent", "fields": [
                          { "name": "event_timestamp", "type": "long" },
                          { "name": "agency_id", "type": "string" },
                          { "name": "click_id", "type": "string" },
                          { "name": "page_url", "type": [          { "type": "record", "name": "url_struct_4", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                          { "name": "page_referer_url", "type": [          { "type": "record", "name": "url_struct_5", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                          { "name": "destination_url", "type": [          { "type": "record", "name": "url_struct_6", "fields": [
                                      { "name": "scheme", "type": [ "string", "null" ] },
                                      { "name": "host", "type": [ "string", "null" ] },
                                      { "name": "path", "type": [ "string", "null" ] },
                                      { "name": "query", "type":
                                          { "type": "map", "values": "string" }
                                      }
                                      ]}, "null"]},
                          { "name": "cpc_paid", "type": [ "int", "null" ] },
                          { "name": "cpc_cost", "type": [ "int", "null" ] },
                          { "name": "other_event_ids", "type":
                              { "type": "map", "values": "string" }
                          },
                          { "name": "metrics", "type":
                              { "type": "map", "values": "string" }
                          },
                          { "name": "others", "type":
                              { "type": "map", "values": "string" }
                          }
                          ]}
                  }
                  },
                  { "name": "engagements", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "aqfer_engagement_subevent", "fields": [
                          { "name": "engagement_type", "type": "string" },
                          { "name": "engagement_sub_type", "type": [ "string", "null" ] },
                          { "name": "event_timestamp", "type": "long" },
                          { "name": "cpc_paid", "type": [ "int", "null" ] },
                          { "name": "cpc_cost", "type": [ "int", "null" ] },
                          { "name": "cpe_paid", "type": [ "int", "null" ] },
                          { "name": "cpe_cost", "type": [ "int", "null" ] },
                          { "name": "engagement_actions", "type":
                              { "type": "array", "items": "string" }
                          },
                          { "name": "metrics", "type":
                              { "type": "map", "values": "string" }
                          },
                          { "name": "others", "type":
                              { "type": "map", "values": "string" }
                          }
                          ]}
                  }
                  }
              ]}
              }
          },
          { "name": "conversions", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_conversion", "fields": [
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_type", "type": "string" },
                  { "name": "event_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "parent_event_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_7", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_8", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_9", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "conversion_actions", "type":
                  { "type": "array", "items": "string" }
                  },
                  { "name": "conversion_type", "type": [ "string", "null" ] },
                  { "name": "conversion_sub_type", "type": [ "string", "null" ] },
                  { "name": "keywords", "type": [ "string", "null" ] },
                  { "name": "search_terms", "type": [ "string", "null" ] },
                  { "name": "search_phrase", "type": [ "string", "null" ] },
                  { "name": "other_entity_ids", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_1", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}
                  }
                  },
                  { "name": "other_event_ids", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "http_info", "type":       { "type": "record", "name": "http_schema_1", "fields": [
                          { "name": "status_code", "type": [ "string", "null" ] },
                          { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_10", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                      { "type": "map", "values": "string" }
                                  }
                                  ]}, "null"]},
                          { "name": "cookie", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct_1", "fields": [
                                  { "name": "ip_type", "type": [ "string", "null" ] },
                                  { "name": "ip_domain", "type": [ "string", "null" ] },
                                  { "name": "ip", "type": [ "string", "null" ] }
                                  ]}, "null"]},
                          { "name": "user_agent_info", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "headers", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}},
                  { "name": "geo", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "metrics", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "others", "type":
                  { "type": "map", "values": "string" }
                  }
              ]}
              }
          },
          { "name": "page_views", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_page_view", "fields": [
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_type", "type": "string" },
                  { "name": "event_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "parent_event_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_11", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_12", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_13", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "keywords", "type": "string" },
                  { "name": "search_terms", "type": "string" },
                  { "name": "search_phrase", "type": "string" },
                  { "name": "other_entity_ids", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_2", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}
                  }
                  },
                  { "name": "other_event_ids", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "http_info", "type":       { "type": "record", "name": "http_schema_2", "fields": [
                          { "name": "status_code", "type": [ "string", "null" ] },
                          { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_14", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                      { "type": "map", "values": "string" }
                                  }
                                  ]}, "null"]},
                          { "name": "cookie", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct_2", "fields": [
                                  { "name": "ip_type", "type": [ "string", "null" ] },
                                  { "name": "ip_domain", "type": [ "string", "null" ] },
                                  { "name": "ip", "type": [ "string", "null" ] }
                                  ]}, "null"]},
                          { "name": "user_agent_info", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "headers", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}},
                  { "name": "geo", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "metrics", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "others", "type":
                  { "type": "map", "values": "string" }
                  }
              ]}
              }
          },
          { "name": "webvisits", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_webvisit", "fields": [
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_type", "type": "string" },
                  { "name": "event_id", "type": "string" },
                  { "name": "source", "type": "string" },
                  { "name": "parent_event_id", "type": "string" },
                  { "name": "agency_id", "type": "string" },
                  { "name": "advertiser_id", "type": "string" },
                  { "name": "page_url", "type": [      { "type": "record", "name": "url_struct_15", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "ad_url", "type": [      { "type": "record", "name": "url_struct_16", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "page_referer_url", "type": [      { "type": "record", "name": "url_struct_17", "fields": [
                          { "name": "scheme", "type": [ "string", "null" ] },
                          { "name": "host", "type": [ "string", "null" ] },
                          { "name": "path", "type": [ "string", "null" ] },
                          { "name": "query", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}, "null"]},
                  { "name": "keywords", "type": "string" },
                  { "name": "search_terms", "type": "string" },
                  { "name": "search_phrase", "type": "string" },
                  { "name": "other_entity_ids", "type":
                  { "type": "array", "items":
                          { "type": "record", "name": "entity_struct_3", "fields": [
                          { "name": "entity_type", "type": [ "string", "null" ] },
                          { "name": "entity_domain", "type": [ "string", "null" ] },
                          { "name": "entity_id", "type": [ "string", "null" ] }
                          ]}
                  }
                  },
                  { "name": "other_event_ids", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "http_info", "type":       { "type": "record", "name": "http_schema_3", "fields": [
                          { "name": "status_code", "type": [ "string", "null" ] },
                          { "name": "tag_url", "type": [        { "type": "record", "name": "url_struct_18", "fields": [
                                  { "name": "scheme", "type": [ "string", "null" ] },
                                  { "name": "host", "type": [ "string", "null" ] },
                                  { "name": "path", "type": [ "string", "null" ] },
                                  { "name": "query", "type":
                                      { "type": "map", "values": "string" }
                                  }
                                  ]}, "null"]},
                          { "name": "cookie", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "ip_info", "type": [        { "type": "record", "name": "ip_struct_3", "fields": [
                                  { "name": "ip_type", "type": [ "string", "null" ] },
                                  { "name": "ip_domain", "type": [ "string", "null" ] },
                                  { "name": "ip", "type": [ "string", "null" ] }
                                  ]}, "null"]},
                          { "name": "user_agent_info", "type":
                          { "type": "map", "values": "string" }
                          },
                          { "name": "headers", "type":
                          { "type": "map", "values": "string" }
                          }
                      ]}},
                  { "name": "geo", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "metrics", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "others", "type":
                  { "type": "map", "values": "string" }
                  }
              ]}
              }
          },
          { "name": "site_visits", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_site_visit", "fields": [
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "event_type", "type": "string" },
                  { "name": "domain", "type": "string" },
                  { "name": "metrics", "type":
                  { "type": "map", "values": "string" }
                  },
                  { "name": "others", "type":
                  { "type": "map", "values": "string" }
                  }
              ]}
              }
          },
          { "name": "segmentation_events", "type":
              { "type": "array", "items":
              { "type": "record", "name": "aqfer_segmentation_event", "fields": [
                  { "name": "event_timestamp", "type": "long" },
                  { "name": "partner_name", "type": "string" },
                  { "name": "partner_id", "type": "string" },
                  { "name": "partner_assigned_client_id", "type": "string" },
                  { "name": "partner_user_id", "type": "string" },
                  { "name": "segment", "type": "string" }
              ]}
              }
          },
          { "name": "metrics", "type":
              { "type": "map", "values": "string" }
          },
          { "name": "others", "type":
              { "type": "map", "values": "string" }
          },
          { "name": "other_entity_ids", "type":
              { "type": "array", "items":
              { "type": "record", "name": "entity_struct_4", "fields": [
                  { "name": "entity_type", "type": [ "string", "null" ] },
                  { "name": "entity_domain", "type": [ "string", "null" ] },
                  { "name": "entity_id", "type": [ "string", "null" ] }
              ]}
              }
          },
          { "name": "last_updated_ts", "type": "long" },
          { "name": "has_conversion", "type": "boolean" },
          { "name": "has_site_visits", "type": "boolean" },
          { "name": "has_page_view", "type": "boolean" },
          { "name": "has_webvisit", "type": "boolean" },
          { "name": "has_clicks", "type": "boolean" }
          ]}
        """
        return schema.parse(out_sch), dat_hnd

outputs:
  - destination: s3://.default@com.aqfer..development/new-pipeline-test/collate-sort/
split:
  bucket_size_bytes: 2147483648 # 2 GB
sort:
  num_partition_keys: 4
  key_names:
    - entity_type
    - entity_domain
    - batch_id
    - entity_id_bucket
    - advertiser_id
  project:
    '.':
      field_names: [ entity_type, entity_domain, entity_id,  advertiser_id ]
  key:
    - template: '{{index . 0}}'
    - template: '{{index . 1}}'
    - template: '5555555' # need to add '{{env "BATCH_ID"}}' when productionizing
    - template: '{{bucket (index . 2) 10}}'
    - template: '{{index . 3}}'    

capacity:
  split_stage:
    in_proc_executor:
      num_workers: 12
  collate_stage:
    in_proc_executor:
      num_workers: 32
  sort_stage:
    in_proc_executor:
      num_workers: 3
  merge_plan_stage:
    in_proc_executor:
      num_workers: 20
  merge_do_stage:
    in_proc_executor:
      num_workers: 20
  transform_stage:
    in_proc_executor:
      num_workers: 30
  • Table of contents

Was this article helpful?