transform.py

import schema
import json
import aq_utils
import os
import traceback
from urllib.parse import urlparse
import sys

sys.path.append(os.environ.get("USER_DIR"))
from py_config_vars import regions_dict, entity_domain, id_format, partner, owner, puu_entity_type, puu_entity_domain

run_id = os.environ.get('BATCH_ID', "1234")
if not run_id:
    raise Exception("BATCH_ID NOT SET")

def extract_event_id(string, key, delimeter):
    values = string.split(",")
    for value in values:
        if key in value:
            return value.split(delimeter)[1]
    return ""

def dat_hnd(rec: any) -> any:
    tenant_id = rec['query'].get('tid') or "tid"
    rec['runid'] = run_id # need to get from env variable
    
    rec['other_entity_keys'] = []
    obj_keys = {
        "entity_type": rec.get('entity_type'),
        "entity_domain": entity_domain.format(owner=owner, partner=partner, property=rec.get('entity_domain', ''),
                                              id_type=rec.get('entity_type', ''), id_format=id_format),
        "entity_id": rec.get('entity_id')
    }
    other_entity_items = [obj_keys]
    if rec['query'].get('suu'):
        suu_object_keys = {
            "entity_type": "statid",
            "entity_domain": entity_domain.format(owner=owner, partner=partner, property=rec.get('entity_domain', ''),
                                                  id_type="statid", id_format=id_format),
            "entity_id": rec['query'].get('suu')
        }
        other_entity_items.append(suu_object_keys)
    if rec['query'].get('puu'):
        puu_object_keys = {
            "entity_type": puu_entity_type,
            "entity_domain": puu_entity_domain,
            "entity_id": rec['query'].get('puu')
        }
        other_entity_items.append(puu_object_keys)
    for key in rec.get('cookie', {}).keys():
        if key != "none":
            other_entity_items.append({"entity_type": "ck",
                                       "entity_domain": entity_domain.format(owner=owner, partner=partner,
                                                                             property=rec.get('entity_domain', ''),
                                                                             id_type=key, id_format=""),
                                       "entity_id": rec['cookie'].get(key)
                                       })
    if rec.get("hashed_ip"):
        other_entity_items.append({"entity_type": "ip",
                                   "entity_domain": entity_domain.format(owner=owner, partner=partner,
                                                                         property=rec.get('entity_domain', ''),
                                                                         id_type="v4", id_format="md5"),
                                   "entity_id": rec.get("hashed_ip")
                                   })
    if rec.get("raw_ip"):
        other_entity_items.append({"entity_type": "ip",
                                   "entity_domain": entity_domain.format(owner=owner, partner=partner,
                                                                         property=rec.get('entity_domain', ''),
                                                                         id_type="v4", id_format="raw"),
                                   "entity_id": rec.get("raw_ip")
                                   })
    rec['other_entity_keys'].extend(other_entity_items)
    
    country_code = rec['country_code']
    rec['global_region'] = "Unknown"
    for region, countries in regions_dict.items():
        if country_code in countries:
            rec['global_region'] = region
            break
    
    rec['entity_domain'] = entity_domain.format(owner=owner, partner=partner, property=rec.get('entity_domain', ''), id_type=rec.get('entity_type', ''), id_format=id_format)
    # REMOVE IF CERTAIN THAT FIELDS WILL ALWAYS BE PRESENT.
    rec['query']['pevid'] = rec['query'].get("pevid") or extract_event_id(rec['aqfer_var'], "z_evid", "=")
    rec['query']['src'] = rec['query'].get('src') or "aUT"
    
    return [rec]
  
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
    schema.add_fields(sch, schema.Field(name="global_region", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="runid", schema=schema.StringSchema()))
    schema.add_fields(sch, schema.Field(name="other_entity_keys", schema=schema.ArraySchema(items=schema.RecordSchema(name='other_entity_keys', fields=[schema.Field(name="entity_type", schema=schema.StringSchema()),schema.Field(name="entity_domain", schema=schema.StringSchema()),schema.Field(name="entity_id", schema=schema.StringSchema())]))))
    return sch, dat_hnd
  • Table of contents

Was this article helpful?