transform.py

import aq_utils
import schema
import time
import copy
import os

invalid_ids = set(['NA'])
current_ts = int(time.time() * 1000)

run_id = os.environ.get('BATCH_ID', "test_1")
if run_id is None:
    raise Exception("batch_id not found")


def isValid(e):
    return e is not None and e != '' and e != 'NA'


def non_empty(list):
    return len(list) > 0


def sortByTimestamp(evt):
    evt.sort(key=lambda x: x.get('event_timestamp', float('inf')))


def update_key_if_valid(result, key, value):
    if not isValid(result.get(key)):
        result[key] = value


def update_events(rec, result, event_groups):
    event_types = {'attributes', 'conversions', 'views', 'geo_events', 'poi_visits', 'other_event', 'id_syncs', 'text_clicks'}
    result['advertiser_id'] = rec.get('advertiser_id', "")
    impressions_dict = {}
    invalid_impressions = []

    for e in event_types:
        result[e] = []
    input_events = rec['events']
    for evt in input_events:
        for e in event_types:
            result[e].extend(evt[e])
        for imp in evt['impressions']:
            imp_id = imp['event_id']
            if imp_id in invalid_ids:
                invalid_impressions.append(imp)
            else:
                if imp_id not in impressions_dict:
                    impressions_dict[imp_id] = imp
                else:
                    imp_event = impressions_dict[imp_id]

                    if imp_event['synthetic_event'] == True and imp['synthetic_event'] == False:
                        imp['clicks'].extend(imp_event['clicks'])
                        imp['engagements'].extend(imp_event['engagements'])
                        imp_event = imp
                        impressions_dict[imp_id] = imp_event
                    else:
                        imp_event['clicks'].extend(imp['clicks'])
                        imp_event['engagements'].extend(imp['engagements'])
                        imp_event['others'].update(imp['others'])
            impressions_dict[imp_id]['has_click'] = non_empty(impressions_dict[imp_id]['clicks'])
            impressions_dict[imp_id]['has_engagement'] = non_empty(impressions_dict[imp_id]['engagements'])
            if impressions_dict[imp_id]['has_click_conversion'] is None or not impressions_dict[imp_id][
                'has_click_conversion']:
                impressions_dict[imp_id]['has_click_conversion'] = imp['has_click_conversion']
            if impressions_dict[imp_id]['viewable'] is None or not impressions_dict[imp_id]['viewable']:
                impressions_dict[imp_id]['viewable'] = imp['viewable']
            sortByTimestamp(impressions_dict[imp_id]['clicks'])
            # if "engagements" in impressions_dict[imp_id]:
            #     sortByTimestamp(impressions_dict[imp_id]['engagements'])
        for event_group in evt['event_groups']:
            event_id = event_group['event_id']
            if event_id not in event_groups:
                event_groups[event_id] = event_group

    result['impressions'] = list(impressions_dict.values())
    result['impressions'].extend(invalid_impressions)
    sortByTimestamp(result['impressions'])
    for e in event_types:
        sortByTimestamp(result[e])


def split(rec, event_groups):  # , date):
    event_types = {'impressions', 'conversions', 'views', 'geo_events', 'poi_visits', 'other_event', 'id_syncs', 'text_clicks'}
    results = []
    recsdict = {}
    # create a parent rec without events
    parentrec = {}
    for i in rec:
        if i in event_types:
            parentrec[i] = []
        else:
            parentrec[i] = rec[i]

    recsdict = copy.deepcopy(parentrec)
    # sort and split
    for e in event_types:
        for evt in rec[e]:
            # add event group by event_group_id if not exists
            if event_groups.get(evt['event_group_id']) not in recsdict['event_groups']:
                recsdict['event_groups'].append(event_groups.get(evt['event_group_id']))
            recsdict[e].append(evt)

    evt = recsdict
    evt['has_impressions'] = non_empty(evt['impressions'])
    evt['has_conversions'] = non_empty(evt['conversions'])
    evt['has_views'] = non_empty(evt['views'])
    evt['has_geo_events'] = non_empty(evt['geo_events'])
    evt['has_id_syncs'] = non_empty(evt['id_syncs'])
    evt['has_text_clicks'] = non_empty(evt['text_clicks'])
    evt['has_poi_visits'] = non_empty(evt['poi_visits'])
    evt['has_other_events'] = non_empty(evt['other_event'])

    # if any of the impressions has a true 'has_click' or 'has_engagement' flag, we can set the global booleans = True
    evt['has_clicks'] = False
    evt['has_engagements'] = False
    evt['has_interactions'] = False  ## TODO calculate this later as part of interaction subevent
    for i in evt['impressions']:
        if i['has_click'] == True:
            evt['has_clicks'] = True
        if i['has_engagement'] == True:
            evt['has_engagements'] = True

    results.append(evt)
    update_event_groups(results)
    return results


def update_event_groups(results):
    for result in results:
        event_types = {'impressions', 'conversions', 'views', 'geo_events', 'poi_visits', 'other_event', 'id_syncs', 'text_clicks'}
        events_dict = {}
        for e in event_types:
            for evt in result[e]:
                if evt['event_group_id'] not in events_dict:
                    events_dict[evt['event_group_id']] = [evt]
                else:
                    events_dict[evt['event_group_id']].append(evt)

        for event_group in result['event_groups']:
            events = events_dict[event_group['event_id']]
            sortByTimestamp(events)
            event_group['event_timestamp'] = events[0]['event_timestamp']
            event_group['first_event_id'] = events[0]['event_id']
            event_group['last_timestamp'] = events[-1]['event_timestamp']
            event_group['events_count'] = len(events)

        # if this events ts is earlier than the data_subject
        # level ts, replace data_subject with this event ts
        if result['event_window_start'] is None or event_group['event_timestamp'] < result['event_window_start']:
            result['event_window_start'] = event_group['event_timestamp']


def dat_hnd(rec: any) -> any:
    result = {}
    results = []
    event_groups = {}
    result['entity_id'] = rec['entity_id']
    result['batch_id'] = run_id
    result['event_window_start'] = None  # calculated with event groups
    result['event_window_duration'] = '1 Day'
    result['event_groups'] = []
    update_events(rec, result, event_groups)
    results = split(result, event_groups)
    return results


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    output_schema = aq_utils.load_schema("amdp_schemas-v6.1.1.yaml", 'datasubject_collated')
    schema.add_fields(output_schema, schema.Field(name="batch_id", schema=schema.StringSchema()))
    return output_schema, dat_hnd
  • Table of contents

Was this article helpful?