transform.py

import aq_utils
import schema
import time
import hashlib
import json


def get_data_type_value(type):
    types = {"int": 0, "float": 0, "double": 0, "string": "", "map": {}, "list": [], "null": None}
    return types.get(type)


def get_formatted_schema(raw_schema):
    formatted_schema = {}
    for field in raw_schema['fields']:
        formatted_schema[field['name']] = field['type']
    return formatted_schema


def format_data(data, formatted_schema):
    for element in formatted_schema:
        if element not in data:
            if isinstance(formatted_schema[element], list):
                data[element] = get_data_type_value("null") if "null" in formatted_schema[element] else \
                    get_data_type_value(formatted_schema[element][0])
            else:
                data[element] = get_data_type_value(formatted_schema[element])


raw_clicks_schema = json.loads(aq_utils.load_schema("amdp_schemas-v6.0.yaml", 'click_collated').__str__())
formatted_clicks_schema = get_formatted_schema(raw_clicks_schema)
raw_engagement_schema = json.loads(aq_utils.load_schema("amdp_schemas-v6.0.yaml", 'engagement_collated').__str__())
formatted_engagement_schema = get_formatted_schema(raw_engagement_schema)

invalid_ids = set(['NA'])
current_ts = int(time.time() * 1000)


def isValid(cc):
    return cc is not None and cc != '' and cc != 'NA'


def non_empty(list):
    return len(list) != 0


def sortByTimestamp(evt):
    evt.sort(key=lambda x: x['event_timestamp'])


def set_event_group(evt, result, retain_event_id: bool):
    event_groups = result['event_groups']
    event_group = {}
    event_group['event_id'] = ''

    # these fields will be calculated at the end of collate-merge
    event_group['event_timestamp'] = 0
    event_group['first_event_id'] = ''
    event_group['last_timestamp'] = 0
    event_group['events_count'] = 0

    event_group['source'] = evt['source']  # if evt['source'] else ""
    event_group['browser'] = evt['browser']
    event_group['browser_version'] = evt['browser_version']
    event_group['device_type'] = evt['device_type']

    # event_group['hashed_ip'] = evt.get('hashed_ip', '')

    event_group['network'] = evt['network']
    event_group['network_type'] = evt['network_type']
    event_group['os'] = evt['os']
    event_group['os_version'] = evt['os_version']
    event_group['throughput'] = evt['throughput']
    event_group['user_agent'] = evt['user_agent']
    event_group['area_code'] = evt['area_code']
    event_group['city'] = evt['city']
    event_group['country_code'] = evt['country_code']
    event_group['dma'] = evt['dma']
    event_group['fips'] = evt['fips']
    event_group['global_region'] = evt.get('global_region') if evt.get('global_region') else ""
    event_group['latitude'] = str(evt['latitude']) if evt['latitude'] else ""
    event_group['longitude'] = str(evt['longitude']) if evt['longitude'] else ""
    event_group['msa'] = evt['msa']
    event_group['region_code'] = evt['region_code']
    event_group['timezone'] = evt['timezone']
    event_group['zip'] = evt['zip']
    # event_policy = evt['policy']
    event_group['policy'] = evt['policy'] if not evt['policy'] else []
    event_group['other_geo'] = dict(sorted(evt['other_geo'].items())) if isinstance(evt['other_geo'], dict) else {}
    if retain_event_id:
        event_group['event_id'] = evt['event_id']
    else:
        event_group['event_id'] = hashlib.sha256(str(event_group.items()).encode()).hexdigest()
    evt['event_group_id'] = event_group['event_id']
    if event_group not in event_groups:
        result['event_groups'].append(event_group)


def update_events(rec, result, key, advertiser_id):
    result[key] = []
    input_events = rec.get(key, [])
    for input_event in input_events:
        set_event_group(input_event, result, 0)
        input_event['advertiser_id'] = advertiser_id
        input_event['policy'] = []
        result[key].append(input_event)
    sortByTimestamp(result[key])


def get_clicks_dict(rec, result, subevents_dict, advertiser_id):
    invalid_clicks = []
    clicks = rec.get('clicks', [])
    for clk in clicks:
        set_event_group(clk, result, 0)
        clk['advertiser_id'] = advertiser_id
        imp_id = clk['parent_event_id']
        clk_id = clk['event_id']
        orhan_imp = {}
        orhan_imp = clk
        orhan_imp['event_id'] = imp_id if imp_id else ""
        orhan_imp['has_click'] = True
        result['has_clicks'] = True
        orhan_imp['has_engagement'] = None
        orhan_imp['viewable'] = None
        orhan_imp['synthetic_event'] = True

        clk_subevent = {}
        clk_subevent['event_timestamp'] = orhan_imp['event_timestamp']
        clk_subevent['agency_id'] = orhan_imp['agency_id']
        clk_subevent['event_id'] = clk_id
        clk_subevent['page_url'] = orhan_imp['page_url']
        clk_subevent['page_referer_url'] = orhan_imp['page_referer_url']
        clk_subevent['destination_url'] = orhan_imp['destination_url']
        if 'cpc_paid' in orhan_imp:
            clk_subevent['cpc_paid'] = orhan_imp['cpc_paid']
        if 'cpc_cost' in orhan_imp:
            clk_subevent['cpc_cost'] = orhan_imp['cpc_cost']
        clk_subevent['other_event_ids'] = orhan_imp['other_event_ids']
        clk_subevent['metrics'] = orhan_imp['metrics']
        clk_subevent['others'] = orhan_imp['others']

        orhan_imp['clicks'] = [clk_subevent]
        orhan_imp['engagements'] = []

        del orhan_imp['destination_url']
        orhan_imp.pop('cpc_paid') if 'cpc_paid' in orhan_imp else None
        orhan_imp.pop('cpc_cost') if 'cpc_cost' in orhan_imp else None

        if imp_id in invalid_ids:
            invalid_clicks.append(orhan_imp)
        else:
            ex_clk = subevents_dict.get(imp_id)
            if ex_clk is not None:
                ex_clk['clicks'].append(clk_subevent)
                ex_clk['has_click'] = True
                ex_clk['has_click_conversion'] = ex_clk.get('has_click_conversion') or orhan_imp.get(
                    'has_click_conversion')
                subevents_dict[imp_id] = ex_clk
            else:
                subevents_dict[imp_id] = orhan_imp

    return subevents_dict, invalid_clicks


def get_engagement_dict(rec, result, subevents_dict, advertiser_id):
    invalid_engagements = []
    engagements = rec.get('engagements', [])
    for eng in engagements:
        set_event_group(eng, result, 0)
        eng['advertiser_id'] = advertiser_id
        imp_id = eng['parent_event_id']
        orhan_imp = {}
        orhan_imp = eng
        orhan_imp['event_id'] = imp_id
        orhan_imp['has_click_conversion'] = None
        orhan_imp['has_click'] = None
        orhan_imp['has_engagement'] = True
        result['has_engagements'] = True
        orhan_imp['viewable'] = True
        orhan_imp['synthetic_event'] = True

        eng_subevent = {}
        eng_subevent['event_timestamp'] = orhan_imp['event_timestamp']
        eng_subevent['event_subtype_1'] = orhan_imp['event_subtype_1']

        if 'cpc_paid' in orhan_imp:
            eng_subevent['cpc_paid'] = orhan_imp['cpc_paid']
        if 'cpc_cost' in orhan_imp:
            eng_subevent['cpc_cost'] = orhan_imp['cpc_cost']
        if 'cpe_paid' in orhan_imp:
            eng_subevent['cpe_paid'] = orhan_imp['cpe_paid']
        if 'cpe_cost' in orhan_imp:
            eng_subevent['cpe_cost'] = orhan_imp['cpe_cost']
        if 'engagement_actions' in orhan_imp:
            eng_subevent['engagement_actions'] = orhan_imp['engagement_actions']

        eng_subevent['metrics'] = orhan_imp['metrics']
        eng_subevent['others'] = orhan_imp['others']

        orhan_imp['clicks'] = []
        orhan_imp['engagements'] = [eng_subevent]

        orhan_imp.pop('engagement_actions') if 'engagement_actions' in orhan_imp else None
        del orhan_imp['event_subtype_1']
        orhan_imp.pop('cpc_paid') if 'cpc_paid' in orhan_imp else None
        orhan_imp.pop('cpc_cost') if 'cpc_cost' in orhan_imp else None
        orhan_imp.pop('cpe_paid') if 'cpe_paid' in orhan_imp else None
        orhan_imp.pop('cpe_cost') if 'cpe_cost' in orhan_imp else None

        if imp_id in invalid_ids:
            invalid_engagements.append(orhan_imp)
        else:
            ex_eng = subevents_dict.get(imp_id)
            if ex_eng is not None:
                ex_eng['engagements'].append(eng_subevent)
            else:
                subevents_dict[imp_id] = orhan_imp

    return subevents_dict, invalid_engagements


def get_impressions_dict(rec, result, advertiser_id):
    impressions_dict = {}
    impressions = rec.get('impressions', [])
    for inp_imp in impressions:
        inp_imp['advertiser_id'] = advertiser_id
        set_event_group(inp_imp, result, 0)
        imp_id = inp_imp['event_id']
        if imp_id not in impressions_dict:
            inp_imp['clicks'] = []
            inp_imp['engagements'] = []
            inp_imp['viewable'] = None
            impressions_dict[imp_id] = inp_imp
        else:
            impressions_dict[imp_id]['others'].update(inp_imp['others'])
    return impressions_dict


def subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id):
    result['has_clicks'] = False
    result['has_engagements'] = False
    impressions_dict = get_impressions_dict(rec, result, advertiser_id)
    [subevents_dict, invalid_subevents] = get_engagement_dict(rec, result, {}, advertiser_id)
    [subevents_dict, invalid_clicks] = get_clicks_dict(rec, result, subevents_dict, advertiser_id)
    invalid_subevents.extend(invalid_clicks)
    # insert subevents into parent impression
    impressions = []
    for imp_id in impressions_dict:
        impression = impressions_dict[imp_id]
        impression['synthetic_event'] = False
        se = subevents_dict.pop(imp_id, None)
        if se is not None:
            impression['clicks'].extend(se['clicks'])
            impression['engagements'].extend(se['engagements'])
            impression['has_click'] = non_empty(impression['clicks'])
            result['has_clicks'] = result['has_clicks'] or impression['has_click']
            impression['has_engagement'] = non_empty(impression['engagements'])
            result['has_engagements'] = result['has_engagements'] or impression['has_engagement']
            # if impression['has_click_conversion'] is None or not impression['has_click_conversion']:
            if not impression.get('has_click_conversion'):
                impression['has_click_conversion'] = se['has_click_conversion']
            if impression['viewable'] is None or not impression['viewable']:
                impression['viewable'] = se['viewable']
        impressions.append(impression)
    impressions.extend(invalid_subevents)
    impressions.extend(subevents_dict.values())
    sortByTimestamp(impressions)
    result['impressions'] = impressions


def dat_hnd(rec: any) -> any:
    result = {}
    advertiser_id = rec.get('advertiser_id', "")
    result['entity_id'] = rec['entity_id']
    result['event_window_start'] = None  # handled in collate-merge
    result['event_window_duration'] = '1 Hour'
    result['event_groups'] = []
    # if event_groups are an input we use that data,
    # otherwise calculate based on other inputs
    input_event_groups = rec.get('event_groups', [])

    for input_event_group in input_event_groups:
        set_event_group(input_event_group, result, 1)

    update_events(rec, result, 'conversions', advertiser_id)
    update_events(rec, result, 'views', advertiser_id)
    update_events(rec, result, 'geo_events', advertiser_id)
    update_events(rec, result, 'poi_visits', advertiser_id)
    update_events(rec, result, 'attributes', advertiser_id)
    update_events(rec, result, 'other_event', advertiser_id)

    # Logic Change for interaction as it's not required in output And Corrected names for elements in the schema
    if rec.get("interactions"):
        update_events(rec, result, 'interactions', advertiser_id)

    subevent_aggregation_for_imp_clk_eng(rec, result, advertiser_id)
    result['has_impressions'] = non_empty(result['impressions'])
    result['has_conversions'] = non_empty(result['conversions'])
    result['has_views'] = non_empty(result['views'])
    result['has_geo_events'] = non_empty(result['geo_events'])
    result['has_poi_visits'] = non_empty(result['poi_visits'])
    result['has_interactions'] = False if not rec.get("interactions") else non_empty(result['interactions'])
    result['has_other_events'] = non_empty(result['other_event'])

    for element in result['impressions']:
        element['has_click_conversion'] = element.get("has_click_conversion") or False

        element['has_click'] = element.get("has_click") or False
        element['has_engagement'] = element.get("has_click") or False
        element['event_subtype_1'] = element.get("event_subtype_1")

        for sub_elements in element['clicks']:
            format_data(sub_elements, formatted_clicks_schema)

        for engagement in element['engagements']:
            format_data(engagement, formatted_engagement_schema)

    # Remove after confirmation
    result['event_window_start'] = result.get('event_window_start', 0) or 0
    for conv in result['conversions']:
        conv['synthetic_event'] = conv.get('synthetic_event') or False
    for view in result['views']:
        view['synthetic_event'] = view.get('synthetic_event') or False
        view['has_interaction'] = view.get('has_interaction') or False
        view['interaction'] = view.get('interaction') or []

    for other in result['other_event']:
        other['synthetic_event'] = other.get('synthetic_event') or False

    result['text_click'] = []
    result['id_sync'] = []
    return [result]


def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    output_schema = aq_utils.load_schema("amdp_schemas-v6.0.yaml", 'datasubject_collated')
    return output_schema, dat_hnd

  • Table of contents

Was this article helpful?