import schema, random
import aq_utils
import hashlib
import os
batchid = os.getenv("BATCH_ID")
def setup() -> any:
global entity_tuple, gen_event_id, event_types, statid_algo, invalid_uus, invalid_suus, invalid_cks, ck_name
event_types = set()
options = aq_utils.load(os.getenv("USER_DIR") + '/options.yaml')
event_types.update(options[0].get('filter_event_types', []))
gen_event_id = options[0].get('gen_event_id', False)
entity_tuple = options[0].get('infer_entity_tuple', {})
ck_name = entity_tuple.get('cookie_name')
statid_algo = entity_tuple.get('statid_algo', 'algo1')
invalid_uus = entity_tuple.get('invalid_uus', ['NA'])
invalid_suus = entity_tuple.get('invalid_suus', ['NA'])
invalid_cks = entity_tuple.get('invalid_cookies', ['NA'])
def get_long(value):
# Check if 'value' is not None and is an instance of int or str
if value is not None and isinstance(value, (int, str)):
return int(value)
# Return 0 if 'value' is None or doesn't meet the criteria
return 0
def process_partner_url(partner_url):
return {
"status": partner_url.get("status"),
"latency": get_long(partner_url.get("latency_ms")),
"beacon_id": partner_url.get("beacon_id"),
"domain": partner_url.get("domain")
}
def statid_algo1(res):
# ! ALGO1 = IP address + user agent + accept_encoding_header + accept_language_header
raw_ip = res.get("raw_ip")
hashed_ip = res.get("hashed_ip")
if raw_ip in (None, "", "NA") and hashed_ip is not None:
ip = hashed_ip
else:
ip = raw_ip
# Get user_agent, use user_agent_unwrapped_components if available
user_agent_unwrapped_components = res.get("user_agent_unwrapped_components", {})
user_agent = res.get("user_agent") or user_agent_unwrapped_components.get("user_agent")
# Extract other values
accept_encoding = res.get("accept_encoding_header")
accept_language = res.get("accept_language_header")
# Check if any of the values is None
if None in (ip, user_agent, accept_encoding, accept_language):
return None
# Concatenate values and compute the SHA-256 hash
entity_id = f"{ip}{user_agent}{accept_encoding}{accept_language}"
return hashlib.sha256(entity_id.encode('utf-8')).hexdigest().upper()
def compute_event_id(res):
event_id, rand = "", ""
is_rand = False
aqEvtId = next((av.split('=')[1].strip() for av in res["aqfer_var"].split(',') if '=' in av and 'z_evid' in av), "")
if res["event_id"] not in (None, "", "NA"):
event_id = res["event_id"]
elif "z_evid" in res["query"] and res["query"]["z_evid"] not in ("", "NA"):
event_id = res["query"]["z_evid"]
elif aqEvtId not in (None, "", "NA"):
event_id = aqEvtId
else:
rand = str(random.random())
is_rand = True
evt_id = ""
if res["hashed_ip"] not in (None, "", "NA"):
evt_id = f"{res['hashed_ip']}.{res['event_timestamp']}.{rand}"
elif res["raw_ip"] not in (None, "", "NA"):
evt_id = f"{res['raw_ip']}.{res['event_timestamp']}.{rand}"
elif res["event_timestamp"] not in (None, 0):
evt_id = f"{res['event_timestamp']}.{rand}"
else:
evt_id = rand
event_id = hashlib.sha256(evt_id.encode('utf-8')).hexdigest()
return event_id, is_rand, rand
def infer_entity_tup(rec):
# ck = rec["cookie"].get(ck_name)
is_valid_uu = rec["uu"] not in invalid_uus
is_valid_suu = rec["suu"] not in invalid_suus
# is_valid_ck = ((ck != None) and (ck not in invalid_cks))
is_valid_ck = ((rec.get("ck") != None) and (rec.get("ck") not in invalid_cks))
if rec.get("prsn") != None:
entity_type = "prsn"
entity_domain = entity_tuple["entity_domain_config"]["prsn"]
entity_id = rec["prsn"]
elif is_valid_uu and rec["uu"] != "NA":
entity_type = "prsn"
entity_domain = entity_tuple["entity_domain_config"]["uu"]
entity_id = rec["uu"]
elif rec.get("hh") != None:
entity_type = "hh"
entity_domain = entity_tuple["entity_domain_config"]["hh"]
entity_id = rec["hh"]
elif rec.get("org") != None:
entity_type = "org"
entity_domain = entity_tuple["entity_domain_config"]["org"]
entity_id = rec["org"]
elif rec.get("rdid") != None:
entity_type = "rdid"
entity_domain = entity_tuple["entity_domain_config"]["rdid"]
entity_id = rec["rdid"]
elif rec.get("dev") != None:
entity_type = "dev"
entity_domain = entity_tuple["entity_domain_config"]["dev"]
entity_id = rec["dev"]
elif rec.get("ck") != None:
entity_type = "ck"
entity_domain = entity_tuple["entity_domain_config"]["ck"]
entity_id = rec["ck"]
elif is_valid_suu and rec["suu"] != "NA":
entity_type = "ck"
entity_domain = entity_tuple["entity_domain_config"]["suu"]
entity_id = rec["suu"]
elif rec.get("statid") != None:
entity_type = "statid"
entity_domain = entity_tuple["entity_domain_config"]["statid"]
entity_id = rec["statid"]
else:
entity_type = "statid"
entity_domain = f"_z__z_{rec.get('host', '')}_z_algo1_z_"
entity_id = statid_algo1(rec)
return entity_type, entity_domain, entity_id
def generate_other_entity_keys(rec):
selected_algo = statid_algo
other_entity_keys = []
# Define the list of available algorithms
available_algorithms = ["algo1"]
# Function to generate entity information for a given algorithm
def generate_entity_info(algo):
entity_type = "statid"
entity_domain = f"_z__z_{rec.get('host', '')}_z_{algo}_z_"
entity_id = globals()[f"statid_{algo}"](rec)
return {
"entity_type": entity_type,
"entity_domain": entity_domain,
"entity_id": entity_id
}
# If the entity type is "statid", generate entity information for the non-selected algorithm, else generate all
if rec.get("entity_type") == "statid":
for algo in available_algorithms:
if algo == selected_algo:
continue # Skip the selected algorithm
algorithm_info = generate_entity_info(algo)
other_entity_keys.append(algorithm_info)
else:
for algo in available_algorithms:
algorithm_info = generate_entity_info(algo)
other_entity_keys.append(algorithm_info)
return other_entity_keys
def dat_hnd(input_rec: any) -> any:
output_rec = {
"hashed_ip": input_rec["hashed_ip"],
"raw_ip": input_rec["raw_ip"],
"event_timestamp": get_long(input_rec["event_timestamp"]),
"event_date": input_rec["event_date"],
"event_hour": input_rec["event_hour"],
"host": input_rec["host"],
"path": input_rec["path"],
"query": input_rec["query"],
"referer": input_rec["referer"],
"cookie": input_rec["cookie"],
"status_code": int(input_rec["status_code"]) if input_rec["status_code"] is not None else None,
"aqfer_var": input_rec["aqfer_var"],
"dnt": input_rec["dnt"],
"accept_header": input_rec["accept_header"],
"accept_language_header": input_rec["accept_language_header"],
"accept_encoding_header": input_rec["accept_encoding_header"],
"country_code": input_rec["country_code"],
"region_code": input_rec["region_code"],
"latitude": input_rec["latitude"],
"longitude": input_rec["longitude"],
"dma": input_rec["dma"],
"msa": input_rec["msa"],
"timezone": input_rec["timezone"],
"area_code": input_rec["area_code"],
"fips": input_rec["fips"],
"city": input_rec["city"],
"zip": input_rec["zip"],
"network": input_rec["network"],
"network_type": input_rec["network_type"],
"throughput": input_rec["throughput"],
"tag_type": input_rec["tag_type"],
"cls": input_rec["class"],
"event_id": input_rec["event_id"],
"uu": input_rec["uu"],
"suu": input_rec["suu"],
"puu": input_rec["puu"],
"domain": input_rec["domain"],
"redirect_domain": input_rec["redirect_domain"],
"partner_urls": [process_partner_url(partner_url) for partner_url in input_rec["partner_urls"]],
"version": input_rec["version"],
}
# Unwrapping user_agent_unwrapped_components to top level record
for key, value in input_rec["user_agent_unwrapped_components"].items():
# To rename "device_type" key to "device" and replace null with an empty string
if key == "device_type":
output_rec["device"] = value
else:
output_rec[key] = value
# Adding advertiser_id key if not present on the output record
if not output_rec["query"].get("adv"):
# If not present, add "adv" key to query map with an empty string value
output_rec["query"]["adv"] = ""
# Generating entity_type, entity_domain, entity_id
output_rec["entity_type"], output_rec["entity_domain"], output_rec["entity_id"] = infer_entity_tup(input_rec)
# Generating event_id
if gen_event_id:
output_rec["event_id"], is_rand, rand_num = compute_event_id(output_rec)
if is_rand:
output_rec["query"]["aqfer_rand"] = rand_num
# Generating event_type
event_type_value = output_rec["query"].get("aqet", "NA")
output_rec["event_type"] = event_type_value if not event_types or event_type_value in event_types else "othev"
# Generating other_entity_keys
output_rec["other_entity_keys"] = generate_other_entity_keys(output_rec)
output_rec["others"] = {}
output_rec["run_id"] = batchid
return [output_rec]
def schema_handler_v1(sch: schema.Schema) -> (schema.Schema, any):
setup()
output_schema = aq_utils.load_schema("internal_schemas-v1.0.yaml", 'parse_receipt')
return output_schema, dat_hnd