config.yaml

##############################################################################################################################
# Entity Id collation for Subevents with no Entity Id
##############################################################################################################################
# Prod: 
#    Change Patch to VTMS
#    Update LOOKBACK_DAYS to env var
#    Update BATCH_ID to env var

# Future Enhancements: 
#    Figure out a way to not have to specify the output field names to make this more generic for any subevent

work_area: 
separate_transform_stage: true
##############################################################################################################################
# INPUT (ingested subevent's missing event_id, patch of subevent's missing event_id, and xref of entity_id & event_id) 
##############################################################################################################################
inputs:
    #subevent missing event_id
  - name: subevent
    locations:
      - folder: 
      - folder: 
    capture_partition_keys:
      - entity_type
      - entity_domain
    key: 
      - event_id
      - entity_type
      - entity_domain
    transform:
    - rename_schema:
        top_level: subevent
  - name: xref
    locations: 
      - folder: 
    capture_partition_keys:
      - entity_type
      - entity_domain
    key: 
      - event_id
      - entity_type
      - entity_domain
    transform:
    - rename_schema:
        top_level: top_level_xref
required_parts:
  - 0 #left join, b/c if we have just xref we dont need to process
  
##############################################################################################################################
# TRANSFORM
##############################################################################################################################
transform:
- python:
    code: |  
      import schema
      import hashlib
      import datetime
 
      lookback_days = 3 # need to add '{{env "LOOKBACK_DAYS"}}' when productionizing


      def dat_hnd(rec: any) -> any: 
          res = []
          lookback = datetime.datetime.utcnow().date() - datetime.timedelta(days = lookback_days)
          eventid = rec["event_id"]                
          xref = rec['xref']
          
          #iterate through all video events that were collated together by event_id
          for se_in in rec['subevent']: 
            se_out = se_in
            se_out['event_id'] = eventid
            eventdate = datetime.datetime.fromtimestamp(int(str(se_out['event_timestamp'])[0:10])).date()
            #standard output [we have found an entity_id and this record will move on to next stage in pipeline]
            if len(xref) > 0:
                entityid = xref[0].get('entity_id')
                se_out['entity_id'] = entityid
                se_out['is_attributed'] = True
                se_out['is_patch'] = False
            #standard output w/ syntehic entity_id [we dont have entity_id this record has been in patch for n days]
            elif (eventdate < lookback):
                se_out['entity_id'] = "SYN-" + hashlib.sha256(eventid.encode()).hexdigest()
                se_out['is_attributed'] = True
                se_out['is_patch'] = False
            #patch [we dont have entity_id, save to patch]
            else:
                se_out['is_attributed'] = False
                se_out['is_patch'] = True
            
            res.append(se_out)
          return res         
      
      def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):       
        out_sch = schema.find_field_schemas(sch, "subevent")[0].items
        
        schema.add_fields(out_sch, schema.Field(name="event_id", schema=schema.StringSchema()))    
        schema.add_fields(out_sch, schema.Field(name="is_attributed", schema=schema.BooleanSchema()))
        schema.add_fields(out_sch, schema.Field(name="is_patch", schema=schema.BooleanSchema()))

        return out_sch, dat_hnd       

##############################################################################################################################
# OUTPUT
##############################################################################################################################    
outputs:
  #has been updated with proper event_id
  - destination: 
    project:
      '.':
        omit_unless: is_attributed 
        field_names: [entity_id, event_timestamp, event_type, event_id, source, country_code, region_code, parent_event_id, agency_id, advertiser_id, page_url, ad_url, page_referer_url, campaign_id, ad_group_id, creative_id, placement_id, other_marketing_program_levels, media_partner, inventory_partner,supply_vendor_publisher_id, site_id, other_media_group_levels, cpm_currency, cpm_paid, cpm_cost, cpc_paid, cpc_cost, cpe_paid, cpe_cost, engagement_actions, engagement_type, engagement_sub_type, keywords, search_terms, search_phrase, other_entity_ids, other_event_ids, http_info, geo, metrics, others]
    drop_partitions:
      - event_type
  #still missing proper event_id -> save off in patch, after n days create orphan]      
  - destination: 
    project:
      '.':
        omit_unless: is_patch
        field_names: [entity_id, event_timestamp, event_type, event_id, source, country_code, region_code, parent_event_id, agency_id, advertiser_id, page_url, ad_url, page_referer_url, campaign_id, ad_group_id, creative_id, placement_id, other_marketing_program_levels, media_partner, inventory_partner,supply_vendor_publisher_id, site_id, other_media_group_levels, cpm_currency, cpm_paid, cpm_cost, cpc_paid, cpc_cost, cpe_paid, cpe_cost, engagement_actions, engagement_type, engagement_sub_type, keywords, search_terms, search_phrase, other_entity_ids, other_event_ids, http_info, geo, metrics, others]
    drop_partitions:
      - batch_id    

##############################################################################################################################
# SORT
##############################################################################################################################    
split:
  bucket_size_bytes: 2147483648 # 2 GB
sort: 
  num_partition_keys: 4 #will blindly take first x key_names specified below
  key_names:
    - entity_type
    - entity_domain
    - event_type
    - batch_id
    - event_id
  project: #something you are projecting from input
    '.':
      field_names: [ entity_type, entity_domain, event_id]
  key:
    - template: '{{index . 0}}' 
    - template: '{{index . 1}}' 
    - template: 'video' 
    - template: '1111111' # need to add '{{env "BATCH_ID"}}' when productionizing
    - template: '{{index . 2}}'
  • Table of contents

Was this article helpful?