##############################################################################################################################
# Entity Id collation for Subevents with no Entity Id
##############################################################################################################################
# Prod:
# Change Patch to VTMS
# Update LOOKBACK_DAYS to env var
# Update BATCH_ID to env var
# Future Enhancements:
# Figure out a way to not have to specify the output field names to make this more generic for any subevent
work_area: s3://.default@com.aqfer..development/entity_id_subevent_collation/work_area/101
separate_transform_stage: true
##############################################################################################################################
# INPUT (ingested subevent's missing event_id, patch of subevent's missing event_id, and xref of entity_id & event_id)
##############################################################################################################################
inputs:
#subevent missing event_id
- name: subevent
locations:
- folder: s3://.default@com.aqfer..development/new-pipeline-test/ingest/aqfer_engagement/valid_data=true/entity_type=ck/entity_domain=TTD/
- folder: s3://.default@com.aqfer..development/new-pipeline-test/output/patch/entity_type=ck/entity_domain=TTD/event_type=video/
capture_partition_keys:
- entity_type
- entity_domain
key:
- event_id
- entity_type
- entity_domain
transform:
- rename_schema:
top_level: subevent
- name: xref
locations:
- folder: s3://.default@com.aqfer..development/new-pipeline-test/ingest/xref/entity_type=ck/entity_domain=TTD/event_type=impr/
capture_partition_keys:
- entity_type
- entity_domain
key:
- event_id
- entity_type
- entity_domain
transform:
- rename_schema:
top_level: top_level_xref
required_parts:
- 0 #left join, b/c if we have just xref we dont need to process
##############################################################################################################################
# TRANSFORM
##############################################################################################################################
transform:
- python:
code: |
import schema
import hashlib
import datetime
lookback_days = 20 # need to add '{{env "LOOKBACK_DAYS"}}' when productionizing
def dat_hnd(rec: any) -> any:
res = []
lookback = datetime.datetime.utcnow().date() - datetime.timedelta(days = lookback_days)
eventid = rec["event_id"]
xref = rec['xref']
#iterate through all video events that were collated together by event_id
for se_in in rec['subevent']:
se_out = se_in
se_out['event_id'] = eventid
eventdate = datetime.datetime.fromtimestamp(int(str(se_out['event_timestamp'])[0:10])).date()
#standard output [we have found an entity_id and this record will move on to next stage in pipeline]
if len(xref) > 0:
entityid = xref[0].get('entity_id')
se_out['entity_id'] = entityid
se_out['is_attributed'] = True
se_out['is_patch'] = False
#standard output w/ syntehic entity_id [we dont have entity_id this record has been in patch for n days]
elif (eventdate < lookback):
se_out['entity_id'] = "SYN-" + hashlib.sha256(eventid.encode()).hexdigest()
se_out['is_attributed'] = True
se_out['is_patch'] = False
#patch [we dont have entity_id, save to patch]
else:
se_out['is_attributed'] = False
se_out['is_patch'] = True
res.append(se_out)
return res
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
out_sch = schema.find_field_schemas(sch, "subevent")[0].items
schema.add_fields(out_sch, schema.Field(name="event_id", schema=schema.StringSchema()))
schema.add_fields(out_sch, schema.Field(name="is_attributed", schema=schema.BooleanSchema()))
schema.add_fields(out_sch, schema.Field(name="is_patch", schema=schema.BooleanSchema()))
return out_sch, dat_hnd
##############################################################################################################################
# OUTPUT
##############################################################################################################################
outputs:
#has been updated with proper event_id
- destination: s3://.default@com.aqfer..development/new-pipeline-test/entity-id-collation/output/with_event_id/
project:
'.':
omit_unless: is_attributed
field_names: [entity_id, event_timestamp, event_type, event_id, source, country_code, region_code, parent_event_id, agency_id, advertiser_id, page_url, ad_url, page_referer_url, campaign_id, ad_group_id, creative_id, placement_id, other_marketing_program_levels, media_partner, inventory_partner,supply_vendor_publisher_id, site_id, other_media_group_levels, cpm_currency, cpm_paid, cpm_cost, cpc_paid, cpc_cost, cpe_paid, cpe_cost, engagement_actions, engagement_type, engagement_sub_type, keywords, search_terms, search_phrase, other_entity_ids, other_event_ids, http_info, geo, metrics, others]
drop_partitions:
- event_type
#still missing proper event_id -> save off in patch, after n days create orphan]
- destination: s3://.default@com.aqfer..development/new-pipeline-test/entity-id-collation/output/patch/
project:
'.':
omit_unless: is_patch
field_names: [entity_id, event_timestamp, event_type, event_id, source, country_code, region_code, parent_event_id, agency_id, advertiser_id, page_url, ad_url, page_referer_url, campaign_id, ad_group_id, creative_id, placement_id, other_marketing_program_levels, media_partner, inventory_partner,supply_vendor_publisher_id, site_id, other_media_group_levels, cpm_currency, cpm_paid, cpm_cost, cpc_paid, cpc_cost, cpe_paid, cpe_cost, engagement_actions, engagement_type, engagement_sub_type, keywords, search_terms, search_phrase, other_entity_ids, other_event_ids, http_info, geo, metrics, others]
drop_partitions:
- batch_id
##############################################################################################################################
# SORT
##############################################################################################################################
split:
bucket_size_bytes: 2147483648 # 2 GB
sort:
num_partition_keys: 4 #will blindly take first x key_names specified below
key_names:
- entity_type
- entity_domain
- event_type
- batch_id
- event_id
project: #something you are projecting from input
'.':
field_names: [ entity_type, entity_domain, event_id]
key:
- template: '{{index . 0}}'
- template: '{{index . 1}}'
- template: 'video'
- template: '1111111' # need to add '{{env "BATCH_ID"}}' when productionizing
- template: '{{index . 2}}'