1
0
Fork 0
mirror of https://github.com/morpheus65535/bazarr synced 2024-12-21 23:32:31 +00:00

Replaced deprecated Google Universal Analytics by GA4

This commit is contained in:
morpheus65535 2023-03-17 09:01:15 -04:00 committed by GitHub
parent 52507854e8
commit abc48b4ed0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1028 additions and 1748 deletions

View file

@ -32,7 +32,6 @@ if postgresql:
(OperationalError, 'server closed the connection unexpectedly'),
)
logger.debug(
f"Connecting to PostgreSQL database: {settings.postgresql.host}:{settings.postgresql.port}/{settings.postgresql.database}")
database = ReconnectPostgresqlDatabase(settings.postgresql.database,

View file

@ -125,6 +125,7 @@ def configure_logging(debug=False):
logging.getLogger("srt").setLevel(logging.ERROR)
logging.getLogger("SignalRCoreClient").setLevel(logging.CRITICAL)
logging.getLogger("websocket").setLevel(logging.CRITICAL)
logging.getLogger("ga4mp.ga4mp").setLevel(logging.ERROR)
logging.getLogger("waitress").setLevel(logging.ERROR)
logging.getLogger("knowit").setLevel(logging.CRITICAL)

View file

@ -57,6 +57,9 @@ os.environ["SZ_HI_EXTENSION"] = settings.general.hi_extension
# set anti-captcha provider and key
configure_captcha_func()
# import analytics module to make sure logging is properly configured afterwards
from utilities.analytics import event_tracker # noqa E402
# configure logging
configure_logging(settings.general.getboolean('debug') or args.debug)
import logging # noqa E402

View file

@ -8,7 +8,7 @@ from utilities.path_mappings import path_mappings
from utilities.post_processing import pp_replace, set_chmod
from languages.get_languages import alpha2_from_alpha3, alpha2_from_language, alpha3_from_language, language_from_alpha3
from app.database import TableEpisodes, TableMovies
from utilities.analytics import track_event
from utilities.analytics import event_tracker
from radarr.notify import notify_radarr
from sonarr.notify import notify_sonarr
from app.event_handler import event_stream
@ -135,7 +135,7 @@ def process_subtitle(subtitle, media_type, audio_language, path, max_score, is_u
notify_radarr(movie_metadata['radarrId'])
event_stream(type='movie-wanted', action='delete', payload=movie_metadata['radarrId'])
track_event(category=downloaded_provider, action=action, label=downloaded_language)
event_tracker.track(provider=downloaded_provider, action=action, language=downloaded_language)
return ProcessSubtitlesResult(message=message,
reversed_path=reversed_path,

View file

@ -1,70 +1,59 @@
# coding=utf-8
import pickle
import random
import platform
import os
import logging
import codecs
from pyga.requests import Event, Tracker, Session, Visitor, Config
from pyga.entities import CustomVariable
from ga4mp import GtagMP
from app.get_args import args
from app.config import settings
from radarr.info import get_radarr_info
from sonarr.info import get_sonarr_info
bazarr_version = os.environ["BAZARR_VERSION"].lstrip('v')
os_version = platform.python_version()
sonarr_version = get_sonarr_info.version()
radarr_version = get_radarr_info.version()
python_version = platform.platform()
def track_event(category=None, action=None, label=None):
if not settings.analytics.getboolean('enabled'):
return
class EventTracker:
def __init__(self):
self.tracker = GtagMP(api_secret="qHRaseheRsic6-h2I_rIAA", measurement_id="G-3820T18GE3", client_id="temp")
anonymousConfig = Config()
anonymousConfig.anonimize_ip_address = True
tracker = Tracker('UA-138214134-3', 'none', conf=anonymousConfig)
try:
if os.path.isfile(os.path.normpath(os.path.join(args.config_dir, 'config', 'analytics.dat'))):
with open(os.path.normpath(os.path.join(args.config_dir, 'config', 'analytics.dat')), 'r') as handle:
visitor_text = handle.read()
visitor = pickle.loads(codecs.decode(visitor_text.encode(), "base64"))
if visitor.user_agent is None:
visitor.user_agent = os.environ.get("SZ_USER_AGENT")
if visitor.unique_id > int(0x7fffffff):
visitor.unique_id = random.randint(0, 0x7fffffff)
if not os.path.isfile(os.path.normpath(os.path.join(args.config_dir, 'config', 'analytics_visitor_id.txt'))):
visitor_id = self.tracker.random_client_id()
with open(os.path.normpath(os.path.join(args.config_dir, 'config', 'analytics_visitor_id.txt')), 'w+') \
as handle:
handle.write(str(visitor_id))
else:
visitor = Visitor()
visitor.unique_id = random.randint(0, 0x7fffffff)
except Exception:
visitor = Visitor()
visitor.unique_id = random.randint(0, 0x7fffffff)
with open(os.path.normpath(os.path.join(args.config_dir, 'config', 'analytics_visitor_id.txt')), 'r') as \
handle:
visitor_id = handle.read()
session = Session()
event = Event(category=category, action=action, label=label, value=1)
self.tracker.client_id = visitor_id
tracker.add_custom_variable(CustomVariable(index=1, name='BazarrVersion',
value=os.environ["BAZARR_VERSION"].lstrip('v'), scope=1))
tracker.add_custom_variable(CustomVariable(index=2, name='PythonVersion', value=platform.python_version(), scope=1))
if settings.general.getboolean('use_sonarr'):
tracker.add_custom_variable(CustomVariable(index=3, name='SonarrVersion', value=sonarr_version, scope=1))
else:
tracker.add_custom_variable(CustomVariable(index=3, name='SonarrVersion', value='unused', scope=1))
if settings.general.getboolean('use_radarr'):
tracker.add_custom_variable(CustomVariable(index=4, name='RadarrVersion', value=radarr_version, scope=1))
else:
tracker.add_custom_variable(CustomVariable(index=4, name='RadarrVersion', value='unused', scope=1))
tracker.add_custom_variable(CustomVariable(index=5, name='OSVersion', value=platform.platform(), scope=1))
self.tracker.store.set_user_property(name="BazarrVersion", value=bazarr_version)
self.tracker.store.set_user_property(name="PythonVersion", value=os_version)
self.tracker.store.set_user_property(name="SonarrVersion", value=sonarr_version)
self.tracker.store.set_user_property(name="RadarrVersion", value=radarr_version)
self.tracker.store.set_user_property(name="OSVersion", value=python_version)
try:
tracker.track_event(event, session, visitor)
except Exception:
logging.debug("BAZARR unable to track event.")
pass
else:
with open(os.path.normpath(os.path.join(args.config_dir, 'config', 'analytics.dat')), 'w+') as handle:
handle.write(codecs.encode(pickle.dumps(visitor), "base64").decode())
self.tracker.store.save()
def track(self, provider, action, language):
subtitles_event = self.tracker.create_new_event(name="subtitles")
subtitles_event.set_event_param(name="subtitles_provider", value=provider)
subtitles_event.set_event_param(name="subtitles_action", value=action)
subtitles_event.set_event_param(name="subtitles_language", value=language)
try:
self.tracker.send(events=[subtitles_event])
except Exception:
logging.debug("BAZARR unable to track event.")
else:
self.tracker.store.save()
event_tracker = EventTracker()

3
libs/ga4mp/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from ga4mp.ga4mp import GtagMP, FirebaseMP
__all__ = ['GtagMP','FirebaseMP']

44
libs/ga4mp/event.py Normal file
View file

@ -0,0 +1,44 @@
from ga4mp.item import Item
class Event(dict):
def __init__(self, name):
self.set_event_name(name)
def set_event_name(self, name):
if len(name) > 40:
raise ValueError("Event name cannot exceed 40 characters.")
self["name"] = name
def get_event_name(self):
return self.get("name")
def set_event_param(self, name, value):
# Series of checks to comply with GA4 event collection limits: https://support.google.com/analytics/answer/9267744
if len(name) > 40:
raise ValueError("Event parameter name cannot exceed 40 characters.")
if name in ["page_location", "page_referrer", "page_title"] and len(str(value)) > 300:
raise ValueError("Event parameter value for page info cannot exceed 300 characters.")
if name not in ["page_location", "page_referrer", "page_title"] and len(str(value)) > 100:
raise ValueError("Event parameter value cannot exceed 100 characters.")
if "params" not in self.keys():
self["params"] = {}
if len(self["params"]) >= 100:
raise RuntimeError("Event cannot contain more than 100 parameters.")
self["params"][name] = value
def get_event_params(self):
return self.get("params")
def delete_event_param(self, name):
# Since only 25 event parameters are allowed, this will allow the user to delete a parameter if necessary.
self["params"].pop(name, None)
def create_new_item(self, item_id=None, item_name=None):
return Item(item_id=item_id, item_name=item_name)
def add_item_to_event(self, item):
if not isinstance(item, dict):
raise ValueError("'item' must be an instance of a dictionary.")
if "items" not in self["params"].keys():
self.set_event_param("items", [])
self["params"]["items"].append(item)

416
libs/ga4mp/ga4mp.py Normal file
View file

@ -0,0 +1,416 @@
###############################################################################
# Google Analytics 4 Measurement Protocol for Python
# Copyright (c) 2022, Adswerve
#
# This project is free software, distributed under the BSD license.
# Adswerve offers consulting and integration services if your firm needs
# assistance in strategy, implementation, or auditing existing work.
###############################################################################
import json
import logging
import urllib.request
import time
import datetime
import random
from ga4mp.utils import params_dict
from ga4mp.event import Event
from ga4mp.store import BaseStore, DictStore
import os, sys
sys.path.append(
os.path.normpath(os.path.join(os.path.dirname(__file__), ".."))
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class BaseGa4mp(object):
"""
Parent class that provides an interface for sending data to Google Analytics, supporting the GA4 Measurement Protocol.
Parameters
----------
api_secret : string
Generated through the Google Analytics UI. To create a new secret, navigate in the Google Analytics UI to: Admin > Data Streams >
[choose your stream] > Measurement Protocol API Secrets > Create
See Also
--------
* Measurement Protocol (Google Analytics 4): https://developers.google.com/analytics/devguides/collection/protocol/ga4
Examples
--------
# Initialize tracking object for gtag usage
>>> ga = gtagMP(api_secret = "API_SECRET", measurement_id = "MEASUREMENT_ID", client_id="CLIENT_ID")
# Initialize tracking object for Firebase usage
>>> ga = firebaseMP(api_secret = "API_SECRET", firebase_app_id = "FIREBASE_APP_ID", app_instance_id="APP_INSTANCE_ID")
# Build an event
>>> event_type = 'new_custom_event'
>>> event_parameters = {'parameter_key_1': 'parameter_1', 'parameter_key_2': 'parameter_2'}
>>> event = {'name': event_type, 'params': event_parameters }
>>> events = [event]
# Send a custom event to GA4 immediately
>>> ga.send(events)
# Postponed send of a custom event to GA4
>>> ga.send(events, postpone=True)
>>> ga.postponed_send()
"""
def __init__(self, api_secret, store: BaseStore = None):
self._initialization_time = time.time() # used for both session_id and calculating engagement time
self.api_secret = api_secret
self._event_list = []
assert store is None or isinstance(store, BaseStore), "if supplied, store must be an instance of BaseStore"
self.store = store or DictStore()
self._check_store_requirements()
self._base_domain = "https://www.google-analytics.com/mp/collect"
self._validation_domain = "https://www.google-analytics.com/debug/mp/collect"
def _check_store_requirements(self):
# Store must contain "session_id" and "last_interaction_time_msec" in order for tracking to work properly.
if self.store.get_session_parameter("session_id") is None:
self.store.set_session_parameter(name="session_id", value=int(self._initialization_time))
# Note: "last_interaction_time_msec" factors into the required "engagement_time_msec" event parameter.
self.store.set_session_parameter(name="last_interaction_time_msec", value=int(self._initialization_time * 1000))
def create_new_event(self, name):
return Event(name=name)
def send(self, events, validation_hit=False, postpone=False, date=None):
"""
Method to send an http post request to google analytics with the specified events.
Parameters
----------
events : List[Dict]
A list of dictionaries of the events to be sent to Google Analytics. The list of dictionaries should adhere
to the following format:
[{'name': 'level_end',
'params' : {'level_name': 'First',
'success': 'True'}
},
{'name': 'level_up',
'params': {'character': 'John Madden',
'level': 'First'}
}]
validation_hit : bool, optional
Boolean to depict if events should be tested against the Measurement Protocol Validation Server, by default False
postpone : bool, optional
Boolean to depict if provided event list should be postponed, by default False
date : datetime
Python datetime object for sending a historical event at the given date. Date cannot be in the future.
"""
# check for any missing or invalid parameters among automatically collected and recommended event types
self._check_params(events)
self._check_date_not_in_future(date)
self._add_session_id_and_engagement_time(events)
if postpone is True:
# build event list to send later
for event in events:
event["_timestamp_micros"] = self._get_timestamp(time.time())
self._event_list.append(event)
else:
# batch events into sets of 25 events, the maximum allowed.
batched_event_list = [
events[event : event + 25] for event in range(0, len(events), 25)
]
# send http post request
self._http_post(
batched_event_list, validation_hit=validation_hit, date=date
)
def postponed_send(self):
"""
Method to send the events provided to Ga4mp.send(events,postpone=True)
"""
for event in self._event_list:
self._http_post([event], postpone=True)
# clear event_list for future use
self._event_list = []
def append_event_to_params_dict(self, new_name_and_parameters):
"""
Method to append event name and parameters key-value pairing(s) to parameters dictionary.
Parameters
----------
new_name_and_parameters : Dict
A dictionary with one key-value pair representing a new type of event to be sent to Google Analytics.
The dictionary should adhere to the following format:
{'new_name': ['new_param_1', 'new_param_2', 'new_param_3']}
"""
params_dict.update(new_name_and_parameters)
def _http_post(self, batched_event_list, validation_hit=False, postpone=False, date=None):
"""
Method to send http POST request to google-analytics.
Parameters
----------
batched_event_list : List[List[Dict]]
List of List of events. Places initial event payload into a list to send http POST in batches.
validation_hit : bool, optional
Boolean to depict if events should be tested against the Measurement Protocol Validation Server, by default False
postpone : bool, optional
Boolean to depict if provided event list should be postponed, by default False
date : datetime
Python datetime object for sending a historical event at the given date. Date cannot be in the future.
Timestamp micros supports up to 48 hours of backdating.
If date is specified, postpone must be False or an assertion will be thrown.
"""
self._check_date_not_in_future(date)
status_code = None # Default set to know if batch loop does not work and to bound status_code
# set domain
domain = self._base_domain
if validation_hit is True:
domain = self._validation_domain
logger.info(f"Sending POST to: {domain}")
# loop through events in batches of 25
batch_number = 1
for batch in batched_event_list:
# url and request slightly differ by subclass
url = self._build_url(domain=domain)
request = self._build_request(batch=batch)
self._add_user_props_to_hit(request)
# make adjustments for postponed hit
request["events"] = (
{"name": batch["name"], "params": batch["params"]}
if (postpone)
else batch
)
if date is not None:
logger.info(f"Setting event timestamp to: {date}")
assert (
postpone is False
), "Cannot send postponed historical hit, ensure postpone=False"
ts = self._datetime_to_timestamp(date)
ts_micro = self._get_timestamp(ts)
request["timestamp_micros"] = int(ts_micro)
logger.info(f"Timestamp of request is: {request['timestamp_micros']}")
if postpone:
# add timestamp to hit
request["timestamp_micros"] = batch["_timestamp_micros"]
req = urllib.request.Request(url)
req.add_header("Content-Type", "application/json; charset=utf-8")
jsondata = json.dumps(request)
json_data_as_bytes = jsondata.encode("utf-8") # needs to be bytes
req.add_header("Content-Length", len(json_data_as_bytes))
result = urllib.request.urlopen(req, json_data_as_bytes)
status_code = result.status
logger.info(f"Batch Number: {batch_number}")
logger.info(f"Status code: {status_code}")
batch_number += 1
return status_code
def _check_params(self, events):
"""
Method to check whether the provided event payload parameters align with supported parameters.
Parameters
----------
events : List[Dict]
A list of dictionaries of the events to be sent to Google Analytics. The list of dictionaries should adhere
to the following format:
[{'name': 'level_end',
'params' : {'level_name': 'First',
'success': 'True'}
},
{'name': 'level_up',
'params': {'character': 'John Madden',
'level': 'First'}
}]
"""
# check to make sure it's a list of dictionaries with the right keys
assert type(events) == list, "events should be a list"
for event in events:
assert isinstance(event, dict), "each event should be an instance of a dictionary"
assert "name" in event, 'each event should have a "name" key'
assert "params" in event, 'each event should have a "params" key'
# check for any missing or invalid parameters
for e in events:
event_name = e["name"]
event_params = e["params"]
if event_name in params_dict.keys():
for parameter in params_dict[event_name]:
if parameter not in event_params.keys():
logger.warning(
f"WARNING: Event parameters do not match event type.\nFor {event_name} event type, the correct parameter(s) are {params_dict[event_name]}.\nThe parameter '{parameter}' triggered this warning.\nFor a breakdown of currently supported event types and their parameters go here: https://support.google.com/analytics/answer/9267735\n"
)
def _add_session_id_and_engagement_time(self, events):
"""
Method to add the session_id and engagement_time_msec parameter to all events.
"""
for event in events:
current_time_in_milliseconds = int(time.time() * 1000)
event_params = event["params"]
if "session_id" not in event_params.keys():
event_params["session_id"] = self.store.get_session_parameter("session_id")
if "engagement_time_msec" not in event_params.keys():
last_interaction_time = self.store.get_session_parameter("last_interaction_time_msec")
event_params["engagement_time_msec"] = current_time_in_milliseconds - last_interaction_time if current_time_in_milliseconds > last_interaction_time else 0
self.store.set_session_parameter(name="last_interaction_time_msec", value=current_time_in_milliseconds)
def _add_user_props_to_hit(self, hit):
"""
Method is a helper function to add user properties to outgoing hits.
Parameters
----------
hit : dict
"""
for key in self.store.get_all_user_properties():
try:
if key in ["user_id", "non_personalized_ads"]:
hit.update({key: self.store.get_user_property(key)})
else:
if "user_properties" not in hit.keys():
hit.update({"user_properties": {}})
hit["user_properties"].update(
{key: {"value": self.store.get_user_property(key)}}
)
except:
logger.info(f"Failed to add user property to outgoing hit: {key}")
def _get_timestamp(self, timestamp):
"""
Method returns UNIX timestamp in microseconds for postponed hits.
Parameters
----------
None
"""
return int(timestamp * 1e6)
def _datetime_to_timestamp(self, dt):
"""
Private method to convert a datetime object into a timestamp
Parameters
----------
dt : datetime
A datetime object in any format
Returns
-------
timestamp
A UNIX timestamp in milliseconds
"""
return time.mktime(dt.timetuple())
def _check_date_not_in_future(self, date):
"""
Method to check that provided date is not in the future.
Parameters
----------
date : datetime
Python datetime object
"""
if date is None:
pass
else:
assert (
date <= datetime.datetime.now()
), "Provided date cannot be in the future"
def _build_url(self, domain):
raise NotImplementedError("Subclass should be using this function, but it was called through the base class instead.")
def _build_request(self, batch):
raise NotImplementedError("Subclass should be using this function, but it was called through the base class instead.")
class GtagMP(BaseGa4mp):
"""
Subclass for users of gtag. See `Ga4mp` parent class for examples.
Parameters
----------
measurement_id : string
The identifier for a Data Stream. Found in the Google Analytics UI under: Admin > Data Streams > [choose your stream] > Measurement ID (top-right)
client_id : string
A unique identifier for a client, representing a specific browser/device.
"""
def __init__(self, api_secret, measurement_id, client_id,):
super().__init__(api_secret)
self.measurement_id = measurement_id
self.client_id = client_id
def _build_url(self, domain):
return f"{domain}?measurement_id={self.measurement_id}&api_secret={self.api_secret}"
def _build_request(self, batch):
return {"client_id": self.client_id, "events": batch}
def random_client_id(self):
"""
Utility function for generating a new client ID matching the typical format of 10 random digits and the UNIX timestamp in seconds, joined by a period.
"""
return "%0.10d" % random.randint(0,9999999999) + "." + str(int(time.time()))
class FirebaseMP(BaseGa4mp):
"""
Subclass for users of Firebase. See `Ga4mp` parent class for examples.
Parameters
----------
firebase_app_id : string
The identifier for a Firebase app. Found in the Firebase console under: Project Settings > General > Your Apps > App ID.
app_instance_id : string
A unique identifier for a Firebase app instance.
* Android - getAppInstanceId() - https://firebase.google.com/docs/reference/android/com/google/firebase/analytics/FirebaseAnalytics#public-taskstring-getappinstanceid
* Kotlin - getAppInstanceId() - https://firebase.google.com/docs/reference/kotlin/com/google/firebase/analytics/FirebaseAnalytics#getappinstanceid
* Swift - appInstanceID() - https://firebase.google.com/docs/reference/swift/firebaseanalytics/api/reference/Classes/Analytics#appinstanceid
* Objective-C - appInstanceID - https://firebase.google.com/docs/reference/ios/firebaseanalytics/api/reference/Classes/FIRAnalytics#+appinstanceid
* C++ - GetAnalyticsInstanceId() - https://firebase.google.com/docs/reference/cpp/namespace/firebase/analytics#getanalyticsinstanceid
* Unity - GetAnalyticsInstanceIdAsync() - https://firebase.google.com/docs/reference/unity/class/firebase/analytics/firebase-analytics#getanalyticsinstanceidasync
"""
def __init__(self, api_secret, firebase_app_id, app_instance_id):
super().__init__(api_secret)
self.firebase_app_id = firebase_app_id
self.app_instance_id = app_instance_id
def _build_url(self, domain):
return f"{domain}?firebase_app_id={self.firebase_app_id}&api_secret={self.api_secret}"
def _build_request(self, batch):
return {"app_instance_id": self.app_instance_id, "events": batch}

11
libs/ga4mp/item.py Normal file
View file

@ -0,0 +1,11 @@
class Item(dict):
def __init__(self, item_id=None, item_name=None):
if item_id is None and item_name is None:
raise ValueError("At least one of 'item_id' and 'item_name' is required.")
if item_id is not None:
self.set_parameter("item_id", str(item_id))
if item_name is not None:
self.set_parameter("item_name", item_name)
def set_parameter(self, name, value):
self[name] = value

116
libs/ga4mp/store.py Normal file
View file

@ -0,0 +1,116 @@
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class BaseStore(dict):
def __init__(self):
self.update([("user_properties", {}),("session_parameters", {})])
def save(self):
raise NotImplementedError("Subclass should be using this function, but it was called through the base class instead.")
def _check_exists(self, key):
# Helper function to make sure a key exists before trying to work with values within it.
if key not in self.keys():
self[key] = {}
def _set(self, param_type, name, value):
# Helper function to set a single parameter (user or session or other).
self._check_exists(key=param_type)
self[param_type][name] = value
def _get_one(self, param_type, name):
# Helper function to get a single parameter value (user or session).
self._check_exists(key=param_type)
return self[param_type].get(name, None)
def _get_all(self, param_type=None):
# Helper function to get all user or session parameters - or the entire dictionary if not specified.
if param_type is not None:
return self[param_type]
else:
return self
# While redundant, the following make sure the distinction between session and user items is easier for the end user.
def set_user_property(self, name, value):
self._set(param_type="user_properties", name=name, value=value)
def get_user_property(self, name):
return self._get_one(param_type="user_properties", name=name)
def get_all_user_properties(self):
return self._get_all(param_type="user_properties")
def clear_user_properties(self):
self["user_properties"] = {}
def set_session_parameter(self, name, value):
self._set(param_type="session_parameters", name=name, value=value)
def get_session_parameter(self, name):
return self._get_one(param_type="session_parameters", name=name)
def get_all_session_parameters(self):
return self._get_all(param_type="session_parameters")
def clear_session_parameters(self):
self["session_parameters"] = {}
# Similar functions for other items the user wants to store that don't fit the other two categories.
def set_other_parameter(self, name, value):
self._set(param_type="other", name=name, value=value)
def get_other_parameter(self, name):
return self._get_one(param_type="other", name=name)
def get_all_other_parameters(self):
return self._get_all(param_type="other")
def clear_other_parameters(self):
self["other"] = {}
class DictStore(BaseStore):
# Class for working with dictionaries that persist for the life of the class.
def __init__(self, data: dict = None):
super().__init__()
if data:
self.update(data)
def save(self):
# Give the user back what's in the dictionary so they can decide how to save it.
self._get_all()
class FileStore(BaseStore):
# Class for working with dictionaries that get saved to a JSON file.
def __init__(self, data_location: str = None):
super().__init__()
self.data_location = data_location
try:
self._load_file(data_location)
except:
logger.info(f"Failed to find file at location: {data_location}")
def _load_file(self):
# Function to get data from the object's initialized location.
# If the provided or stored data_location exists, read the file and overwrite the object's contents.
if Path(self.data_location).exists():
with open(self.data_location, "r") as json_file:
self = json.load(json_file)
# If the data_location doesn't exist, try to create a new starter JSON file at the location given.
else:
starter_dict = '{"user_properties":{}, "session_parameters":{}}'
starter_json = json.loads(starter_dict)
Path(self.data_location).touch()
with open(self.data_location, "w") as json_file:
json.dumps(starter_json, json_file)
def save(self):
# Function to save the current dictionary to a JSON file at the object's initialized location.
try:
with open(self.data_location, "w") as outfile:
json.dump(self, outfile)
except:
logger.info(f"Failed to save file at location: {self.data_location}")

392
libs/ga4mp/utils.py Normal file
View file

@ -0,0 +1,392 @@
# all automatically collected and recommended event types
params_dict = {
"ad_click": [
"ad_event_id"
],
"ad_exposure": [
"firebase_screen",
"firebase_screen_id",
"firebase_screen_class",
"exposure_time",
],
"ad_impression": [
"ad_event_id"
],
"ad_query": [
"ad_event_id"
],
"ad_reward": [
"ad_unit_id",
"reward_type",
"reward_value"
],
"add_payment_info": [
"coupon",
"currency",
"items",
"payment_type",
"value"
],
"add_shipping_info": [
"coupon",
"currency",
"items",
"shipping_tier",
"value"
],
"add_to_cart": [
"currency",
"items",
"value"
],
"add_to_wishlist": [
"currency",
"items",
"value"
],
"adunit_exposure": [
"firebase_screen",
"firebase_screen_id",
"firebase_screen_class",
"exposure_time",
],
"app_clear_data": [],
"app_exception": [
"fatal",
"timestamp",
"engagement_time_msec"
],
"app_remove": [],
"app_store_refund": [
"product_id",
"value",
"currency",
"quantity"
],
"app_store_subscription_cancel": [
"product_id",
"price",
"value",
"currency",
"cancellation_reason",
],
"app_store_subscription_convert": [
"product_id",
"price",
"value",
"currency",
"quantity",
],
"app_store_subscription_renew": [
"product_id",
"price",
"value",
"currency",
"quantity",
"renewal_count",
],
"app_update": [
"previous_app_version"
],
"begin_checkout": [
"coupon",
"currency",
"items",
"value"
],
"click": [],
"dynamic_link_app_open": [
"source",
"medium",
"campaign",
"link_id",
"accept_time"
],
"dynamic_link_app_update": [
"source",
"medium",
"campaign",
"link_id",
"accept_time",
],
"dynamic_link_first_open": [
"source",
"medium",
"campaign",
"link_id",
"accept_time",
],
"earn_virtual_currency": [
"virtual_currency_name",
"value"
],
"error": [
"firebase_error",
"firebase_error_value"
],
"file_download": [
"file_extension",
"file_name",
"link_classes",
"link_domain",
"link_id",
"link_text",
"link_url",
],
"firebase_campaign": [
"source",
"medium",
"campaign",
"term",
"content",
"gclid",
"aclid",
"cp1",
"anid",
"click_timestamp",
"campaign_info_source",
],
"firebase_in_app_message_action": [
"message_name",
"message_device_time",
"message_id",
],
"firebase_in_app_message_dismiss": [
"message_name",
"message_device_time",
"message_id",
],
"firebase_in_app_message_impression": [
"message_name",
"message_device_time",
"message_id",
],
"first_open": [
"previous_gmp_app_id",
"updated_with_analytics",
"previous_first_open_count",
"system_app",
"system_app_update",
"deferred_analytics_collection",
"reset_analytics_cause",
"engagement_time_msec",
],
"first_visit": [],
"generate_lead": [
"value",
"currency"
],
"in_app_purchase": [
"product_id",
"price",
"value",
"currency",
"quantity",
"subscription",
"free_trial",
"introductory_price",
],
"join_group": [
"group_id"
],
"level_end": [
"level_name",
"success"
],
"level_start": [
"level_name"
],
"level_up": [
"character",
"level"
],
"login": [
"method"
],
"notification_dismiss": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
],
"notification_foreground": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
"message_type",
],
"notification_open": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
],
"notification_receive": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
"message_type",
],
"notification_send": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
],
"os_update": [
"previous_os_version"
],
"page_view": [
"page_location",
"page_referrer"
],
"post_score": [
"level",
"character",
"score"
],
"purchase": [
"affiliation",
"coupon",
"currency",
"items",
"transaction_id",
"shipping",
"tax",
"value",
],
"refund": [
"transaction_id",
"value",
"currency",
"tax",
"shipping",
"items"
],
"remove_from_cart": [
"currency",
"items",
"value"
],
"screen_view": [
"firebase_screen",
"firebase_screen_class",
"firebase_screen_id",
"firebase_previous_screen",
"firebase_previous_class",
"firebase_previous_id",
"engagement_time_msec",
],
"scroll": [],
"search": [
"search_term"
],
"select_content": [
"content_type",
"item_id"
],
"select_item": [
"items",
"item_list_name",
"item_list_id"
],
"select_promotion": [
"items",
"promotion_id",
"promotion_name",
"creative_name",
"creative_slot",
"location_id",
],
"session_start": [],
"share": [
"content_type",
"item_id"
],
"sign_up": [
"method"
],
"view_search_results": [
"search_term"
],
"spend_virtual_currency": [
"item_name",
"virtual_currency_name",
"value"
],
"tutorial_begin": [],
"tutorial_complete": [],
"unlock_achievement": [
"achievement_id"
],
"user_engagement": [
"engagement_time_msec"
],
"video_start": [
"video_current_time",
"video_duration",
"video_percent",
"video_provider",
"video_title",
"video_url",
"visible",
],
"video_progress": [
"video_current_time",
"video_duration",
"video_percent",
"video_provider",
"video_title",
"video_url",
"visible",
],
"video_complete": [
"video_current_time",
"video_duration",
"video_percent",
"video_provider",
"video_title",
"video_url",
"visible",
],
"view_cart": [
"currency",
"items",
"value"
],
"view_item": [
"currency",
"items",
"value"
],
"view_item_list": [
"items",
"item_list_name",
"item_list_id"
],
"view_promotion": [
"items",
"promotion_id",
"promotion_name",
"creative_name",
"creative_slot",
"location_id",
],
}

View file

@ -1,8 +0,0 @@
from pyga.requests import Q
def shutdown():
'''
Fire all stored GIF requests One by One.
You should call this if you set Config.queue_requests = True
'''
map(lambda func: func(), Q.REQ_ARRAY)

View file

@ -1,512 +0,0 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from operator import itemgetter
from pyga import utils
from pyga import exceptions
try:
from urlparse import urlparse
from urllib import unquote_plus
except ImportError as e:
from urllib.parse import urlparse
from urllib.parse import unquote_plus
__author__ = "Arun KR (kra3) <the1.arun@gmail.com>"
__license__ = "Simplified BSD"
class Campaign(object):
'''
A representation of Campaign
Properties:
_type -- See TYPE_* constants, will be mapped to "__utmz" parameter.
creation_time -- Time of the creation of this campaign, will be mapped to "__utmz" parameter.
response_count -- Response Count, will be mapped to "__utmz" parameter.
Is also used to determine whether the campaign is new or repeated,
which will be mapped to "utmcn" and "utmcr" parameters.
id -- Campaign ID, a.k.a. "utm_id" query parameter for ga.js
Will be mapped to "__utmz" parameter.
source -- Source, a.k.a. "utm_source" query parameter for ga.js.
Will be mapped to "utmcsr" key in "__utmz" parameter.
g_click_id -- Google AdWords Click ID, a.k.a. "gclid" query parameter for ga.js.
Will be mapped to "utmgclid" key in "__utmz" parameter.
d_click_id -- DoubleClick (?) Click ID. Will be mapped to "utmdclid" key in "__utmz" parameter.
name -- Name, a.k.a. "utm_campaign" query parameter for ga.js.
Will be mapped to "utmccn" key in "__utmz" parameter.
medium -- Medium, a.k.a. "utm_medium" query parameter for ga.js.
Will be mapped to "utmcmd" key in "__utmz" parameter.
term -- Terms/Keywords, a.k.a. "utm_term" query parameter for ga.js.
Will be mapped to "utmctr" key in "__utmz" parameter.
content -- Ad Content Description, a.k.a. "utm_content" query parameter for ga.js.
Will be mapped to "utmcct" key in "__utmz" parameter.
'''
TYPE_DIRECT = 'direct'
TYPE_ORGANIC = 'organic'
TYPE_REFERRAL = 'referral'
CAMPAIGN_DELIMITER = '|'
UTMZ_PARAM_MAP = {
'utmcid': 'id',
'utmcsr': 'source',
'utmgclid': 'g_click_id',
'utmdclid': 'd_click_id',
'utmccn': 'name',
'utmcmd': 'medium',
'utmctr': 'term',
'utmcct': 'content',
}
def __init__(self, typ):
self._type = None
self.creation_time = None
self.response_count = 0
self.id = None
self.source = None
self.g_click_id = None
self.d_click_id = None
self.name = None
self.medium = None
self.term = None
self.content = None
if typ:
if typ not in ('direct', 'organic', 'referral'):
raise ValueError('Campaign type has to be one of the Campaign::TYPE_* constant values.')
self._type = typ
if typ == Campaign.TYPE_DIRECT:
self.name = '(direct)'
self.source = '(direct)'
self.medium = '(none)'
elif typ == Campaign.TYPE_REFERRAL:
self.name = '(referral)'
self.medium = 'referral'
elif typ == Campaign.TYPE_ORGANIC:
self.name = '(organic)'
self.medium = 'organic'
else:
self._type = None
self.creation_time = datetime.utcnow()
def validate(self):
if not self.source:
raise exceptions.ValidationError('Campaigns need to have at least the "source" attribute defined.')
@staticmethod
def create_from_referrer(url):
obj = Campaign(Campaign.TYPE_REFERRAL)
parse_rslt = urlparse(url)
obj.source = parse_rslt.netloc
obj.content = parse_rslt.path
return obj
def extract_from_utmz(self, utmz):
parts = utmz.split('.', 4)
if len(parts) != 5:
raise ValueError('The given "__utmz" cookie value is invalid.')
self.creation_time = utils.convert_ga_timestamp(parts[1])
self.response_count = int(parts[3])
params = parts[4].split(Campaign.CAMPAIGN_DELIMITER)
for param in params:
key, val = param.split('=')
try:
setattr(self, self.UTMZ_PARAM_MAP[key], unquote_plus(val))
except KeyError:
continue
return self
class CustomVariable(object):
'''
Represent a Custom Variable
Properties:
index -- Is the slot, you have 5 slots
name -- Name given to custom variable
value -- Value for the variable
scope -- Scope can be any one of 1, 2 or 3.
WATCH OUT: It's a known issue that GA will not decode URL-encoded
characters in custom variable names and values properly, so spaces
will show up as "%20" in the interface etc. (applicable to name & value)
http://www.google.com/support/forum/p/Google%20Analytics/thread?tid=2cdb3ec0be32e078
'''
SCOPE_VISITOR = 1
SCOPE_SESSION = 2
SCOPE_PAGE = 3
def __init__(self, index=None, name=None, value=None, scope=3):
self.index = index
self.name = name
self.value = value
self.scope = CustomVariable.SCOPE_PAGE
if scope:
self.scope = scope
def __setattr__(self, name, value):
if name == 'scope':
if value and value not in range(1, 4):
raise ValueError('Custom Variable scope has to be one of the 1,2 or 3')
if name == 'index':
# Custom Variables are limited to five slots officially, but there seems to be a
# trick to allow for more of them which we could investigate at a later time (see
# http://analyticsimpact.com/2010/05/24/get-more-than-5-custom-variables-in-google-analytics/
if value and (value < 0 or value > 5):
raise ValueError('Custom Variable index has to be between 1 and 5.')
object.__setattr__(self, name, value)
def validate(self):
'''
According to the GA documentation, there is a limit to the combined size of
name and value of 64 bytes after URL encoding,
see http://code.google.com/apis/analytics/docs/tracking/gaTrackingCustomVariables.html#varTypes
and http://xahlee.org/js/google_analytics_tracker_2010-07-01_expanded.js line 563
This limit was increased to 128 bytes BEFORE encoding with the 2012-01 release of ga.js however,
see http://code.google.com/apis/analytics/community/gajs_changelog.html
'''
if len('%s%s' % (self.name, self.value)) > 128:
raise exceptions.ValidationError('Custom Variable combined name and value length must not be larger than 128 bytes.')
class Event(object):
'''
Represents an Event
https://developers.google.com/analytics/devguides/collection/gajs/eventTrackerGuide
Properties:
category -- The general event category
action -- The action for the event
label -- An optional descriptor for the event
value -- An optional value associated with the event. You can see your
event values in the Overview, Categories, and Actions reports,
where they are listed by event or aggregated across events,
depending upon your report view.
noninteraction -- By default, event hits will impact a visitor's bounce rate.
By setting this parameter to true, this event hit
will not be used in bounce rate calculations.
(default False)
'''
def __init__(self, category=None, action=None, label=None, value=None, noninteraction=False):
self.category = category
self.action = action
self.label = label
self.value = value
self.noninteraction = bool(noninteraction)
if self.noninteraction and not self.value:
self.value = 0
def validate(self):
if not(self.category and self.action):
raise exceptions.ValidationError('Events, at least need to have a category and action defined.')
class Item(object):
'''
Represents an Item in Transaction
Properties:
order_id -- Order ID, will be mapped to "utmtid" parameter
sku -- Product Code. This is the sku code for a given product, will be mapped to "utmipc" parameter
name -- Product Name, will be mapped to "utmipn" parameter
variation -- Variations on an item, will be mapped to "utmiva" parameter
price -- Unit Price. Value is set to numbers only, will be mapped to "utmipr" parameter
quantity -- Unit Quantity, will be mapped to "utmiqt" parameter
'''
def __init__(self):
self.order_id = None
self.sku = None
self.name = None
self.variation = None
self.price = None
self.quantity = 1
def validate(self):
if not self.sku:
raise exceptions.ValidationError('sku/product is a required parameter')
class Page(object):
'''
Contains all parameters needed for tracking a page
Properties:
path -- Page request URI, will be mapped to "utmp" parameter
title -- Page title, will be mapped to "utmdt" parameter
charset -- Charset encoding, will be mapped to "utmcs" parameter
referrer -- Referer URL, will be mapped to "utmr" parameter
load_time -- Page load time in milliseconds, will be encoded into "utme" parameter.
'''
REFERRER_INTERNAL = '0'
def __init__(self, path):
self.path = None
self.title = None
self.charset = None
self.referrer = None
self.load_time = None
if path:
self.path = path
def __setattr__(self, name, value):
if name == 'path':
if value and value != '':
if value[0] != '/':
raise ValueError('The page path should always start with a slash ("/").')
elif name == 'load_time':
if value and not isinstance(value, int):
raise ValueError('Page load time must be specified in integer milliseconds.')
object.__setattr__(self, name, value)
class Session(object):
'''
You should serialize this object and store it in the user session to keep it
persistent between requests (similar to the "__umtb" cookie of the GA Javascript client).
Properties:
session_id -- A unique per-session ID, will be mapped to "utmhid" parameter
track_count -- The amount of pageviews that were tracked within this session so far,
will be part of the "__utmb" cookie parameter.
Will get incremented automatically upon each request
start_time -- Timestamp of the start of this new session, will be part of the "__utmb" cookie parameter
'''
def __init__(self):
self.session_id = utils.get_32bit_random_num()
self.track_count = 0
self.start_time = datetime.utcnow()
@staticmethod
def generate_session_id():
return utils.get_32bit_random_num()
def extract_from_utmb(self, utmb):
'''
Will extract information for the "trackCount" and "startTime"
properties from the given "__utmb" cookie value.
'''
parts = utmb.split('.')
if len(parts) != 4:
raise ValueError('The given "__utmb" cookie value is invalid.')
self.track_count = int(parts[1])
self.start_time = utils.convert_ga_timestamp(parts[3])
return self
class SocialInteraction(object):
'''
Properties:
action -- Required. A string representing the social action being tracked,
will be mapped to "utmsa" parameter
network -- Required. A string representing the social network being tracked,
will be mapped to "utmsn" parameter
target -- Optional. A string representing the URL (or resource) which receives the action.
'''
def __init__(self, action=None, network=None, target=None):
self.action = action
self.network = network
self.target = target
def validate(self):
if not(self.action and self.network):
raise exceptions.ValidationError('Social interactions need to have at least the "network" and "action" attributes defined.')
class Transaction(object):
'''
Represents parameters for a Transaction call
Properties:
order_id -- Order ID, will be mapped to "utmtid" parameter
affiliation -- Affiliation, Will be mapped to "utmtst" parameter
total -- Total Cost, will be mapped to "utmtto" parameter
tax -- Tax Cost, will be mapped to "utmttx" parameter
shipping -- Shipping Cost, values as for unit and price, will be mapped to "utmtsp" parameter
city -- Billing City, will be mapped to "utmtci" parameter
state -- Billing Region, will be mapped to "utmtrg" parameter
country -- Billing Country, will be mapped to "utmtco" parameter
items -- @entity.Items in a transaction
'''
def __init__(self):
self.items = []
self.order_id = None
self.affiliation = None
self.total = None
self.tax = None
self.shipping = None
self.city = None
self.state = None
self.country = None
def __setattr__(self, name, value):
if name == 'order_id':
for itm in self.items:
itm.order_id = value
object.__setattr__(self, name, value)
def validate(self):
if len(self.items) == 0:
raise exceptions.ValidationError('Transaction need to consist of at least one item')
def add_item(self, item):
''' item of type entities.Item '''
if isinstance(item, Item):
item.order_id = self.order_id
self.items.append(item)
class Visitor(object):
'''
You should serialize this object and store it in the user database to keep it
persistent for the same user permanently (similar to the "__umta" cookie of
the GA Javascript client).
Properties:
unique_id -- Unique user ID, will be part of the "__utma" cookie parameter
first_visit_time -- Time of the very first visit of this user, will be part of the "__utma" cookie parameter
previous_visit_time -- Time of the previous visit of this user, will be part of the "__utma" cookie parameter
current_visit_time -- Time of the current visit of this user, will be part of the "__utma" cookie parameter
visit_count -- Amount of total visits by this user, will be part of the "__utma" cookie parameter
ip_address -- IP Address of the end user, will be mapped to "utmip" parameter and "X-Forwarded-For" request header
user_agent -- User agent string of the end user, will be mapped to "User-Agent" request header
locale -- Locale string (country part optional) will be mapped to "utmul" parameter
flash_version -- Visitor's Flash version, will be maped to "utmfl" parameter
java_enabled -- Visitor's Java support, will be mapped to "utmje" parameter
screen_colour_depth -- Visitor's screen color depth, will be mapped to "utmsc" parameter
screen_resolution -- Visitor's screen resolution, will be mapped to "utmsr" parameter
'''
def __init__(self):
now = datetime.utcnow()
self.unique_id = None
self.first_visit_time = now
self.previous_visit_time = now
self.current_visit_time = now
self.visit_count = 1
self.ip_address = None
self.user_agent = None
self.locale = None
self.flash_version = None
self.java_enabled = None
self.screen_colour_depth = None
self.screen_resolution = None
def __setattr__(self, name, value):
if name == 'unique_id':
if value and (value < 0 or value > 0x7fffffff):
raise ValueError('Visitor unique ID has to be a 32-bit integer between 0 and 0x7fffffff')
object.__setattr__(self, name, value)
def __getattribute__(self, name):
if name == 'unique_id':
tmp = object.__getattribute__(self, name)
if tmp is None:
self.unique_id = self.generate_unique_id()
return object.__getattribute__(self, name)
def __getstate__(self):
state = self.__dict__
if state.get('user_agent') is None:
state['unique_id'] = self.generate_unique_id()
return state
def extract_from_utma(self, utma):
'''
Will extract information for the "unique_id", "first_visit_time", "previous_visit_time",
"current_visit_time" and "visit_count" properties from the given "__utma" cookie value.
'''
parts = utma.split('.')
if len(parts) != 6:
raise ValueError('The given "__utma" cookie value is invalid.')
self.unique_id = int(parts[1])
self.first_visit_time = utils.convert_ga_timestamp(parts[2])
self.previous_visit_time = utils.convert_ga_timestamp(parts[3])
self.current_visit_time = utils.convert_ga_timestamp(parts[4])
self.visit_count = int(parts[5])
return self
def extract_from_server_meta(self, meta):
'''
Will extract information for the "ip_address", "user_agent" and "locale"
properties from the given WSGI REQUEST META variable or equivalent.
'''
if 'REMOTE_ADDR' in meta and meta['REMOTE_ADDR']:
ip = None
for key in ('HTTP_X_FORWARDED_FOR', 'REMOTE_ADDR'):
if key in meta and not ip:
ips = meta.get(key, '').split(',')
ip = ips[-1].strip()
if not utils.is_valid_ip(ip):
ip = ''
if utils.is_private_ip(ip):
ip = ''
if ip:
self.ip_address = ip
if 'HTTP_USER_AGENT' in meta and meta['HTTP_USER_AGENT']:
self.user_agent = meta['HTTP_USER_AGENT']
if 'HTTP_ACCEPT_LANGUAGE' in meta and meta['HTTP_ACCEPT_LANGUAGE']:
user_locals = []
matched_locales = utils.validate_locale(meta['HTTP_ACCEPT_LANGUAGE'])
if matched_locales:
lang_lst = map((lambda x: x.replace('-', '_')), (i[1] for i in matched_locales))
quality_lst = map((lambda x: x and x or 1), (float(i[4] and i[4] or '0') for i in matched_locales))
lang_quality_map = map((lambda x, y: (x, y)), lang_lst, quality_lst)
user_locals = [x[0] for x in sorted(lang_quality_map, key=itemgetter(1), reverse=True)]
if user_locals:
self.locale = user_locals[0]
return self
def generate_hash(self):
'''Generates a hashed value from user-specific properties.'''
tmpstr = "%s%s%s" % (self.user_agent, self.screen_resolution, self.screen_colour_depth)
return utils.generate_hash(tmpstr)
def generate_unique_id(self):
'''Generates a unique user ID from the current user-specific properties.'''
return ((utils.get_32bit_random_num() ^ self.generate_hash()) & 0x7fffffff)
def add_session(self, session):
'''
Updates the "previousVisitTime", "currentVisitTime" and "visitCount"
fields based on the given session object.
'''
start_time = session.start_time
if start_time != self.current_visit_time:
self.previous_visit_time = self.current_visit_time
self.current_visit_time = start_time
self.visit_count = self.visit_count + 1

View file

@ -1,2 +0,0 @@
class ValidationError(Exception):
pass

File diff suppressed because it is too large Load diff

View file

@ -1,125 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from random import randint
import re
import sys
from datetime import datetime
try:
from urllib import quote
except ImportError as e:
from urllib.parse import quote
if sys.version_info < (3,):
text_type = unicode
else:
text_type = str
__author__ = "Arun KR (kra3) <the1.arun@gmail.com>"
__license__ = "Simplified BSD"
RE_IP = re.compile(r'^[\d+]{1,3}\.[\d+]{1,3}\.[\d+]{1,3}\.[\d+]{1,3}$', re.I)
RE_PRIV_IP = re.compile(r'^(?:127\.0\.0\.1|10\.|192\.168\.|172\.(?:1[6-9]|2[0-9]|3[0-1])\.)')
RE_LOCALE = re.compile(r'(^|\s*,\s*)([a-zA-Z]{1,8}(-[a-zA-Z]{1,8})*)\s*(;\s*q\s*=\s*(1(\.0{0,3})?|0(\.[0-9]{0,3})))?', re.I)
RE_GA_ACCOUNT_ID = re.compile(r'^(UA|MO)-[0-9]*-[0-9]*$')
RE_FIRST_THREE_OCTETS_OF_IP = re.compile(r'^((\d{1,3}\.){3})\d{1,3}$')
def convert_ga_timestamp(timestamp_string):
timestamp = float(timestamp_string)
if timestamp > ((2 ** 31) - 1):
timestamp /= 1000
return datetime.utcfromtimestamp(timestamp)
def get_32bit_random_num():
return randint(0, 0x7fffffff)
def is_valid_ip(ip):
return True if RE_IP.match(str(ip)) else False
def is_private_ip(ip):
return True if RE_PRIV_IP.match(str(ip)) else False
def validate_locale(locale):
return RE_LOCALE.findall(str(locale))
def is_valid_google_account(account):
return True if RE_GA_ACCOUNT_ID.match(str(account)) else False
def generate_hash(tmpstr):
hash_val = 1
if tmpstr:
hash_val = 0
for ordinal in map(ord, tmpstr[::-1]):
hash_val = ((hash_val << 6) & 0xfffffff) + ordinal + (ordinal << 14)
left_most_7 = hash_val & 0xfe00000
if left_most_7 != 0:
hash_val ^= left_most_7 >> 21
return hash_val
def anonymize_ip(ip):
if ip:
match = RE_FIRST_THREE_OCTETS_OF_IP.findall(str(ip))
if match:
return '%s%s' % (match[0][0], '0')
return ''
def encode_uri_components(value):
'''Mimics Javascript's encodeURIComponent() function for consistency with the GA Javascript client.'''
return convert_to_uri_component_encoding(quote(value))
def convert_to_uri_component_encoding(value):
return value.replace('%21', '!').replace('%2A', '*').replace('%27', "'").replace('%28', '(').replace('%29', ')')
# Taken from expicient.com BJs repo.
def stringify(s, stype=None, fn=None):
''' Converts elements of a complex data structure to strings
The data structure can be a multi-tiered one - with tuples and lists etc
This method will loop through each and convert everything to string.
For example - it can be -
[[{'a1': {'a2': {'a3': ('a4', timedelta(0, 563)), 'a5': {'a6': datetime()}}}}]]
which will be converted to -
[[{'a1': {'a2': {'a3': ('a4', '0:09:23'), 'a5': {'a6': '2009-05-27 16:19:52.401500' }}}}]]
@param stype: If only one type of data element needs to be converted to
string without affecting others, stype can be used.
In the earlier example, if it is called with stringify(s, stype=datetime.timedelta)
the result would be
[[{'a1': {'a2': {'a3': ('a4', '0:09:23'), 'a5': {'a6': datetime() }}}}]]
Also, even though the name is stringify, any function can be run on it, based on
parameter fn. If fn is None, it will be stringified.
'''
if type(s) in [list, set, dict, tuple]:
if isinstance(s, dict):
for k in s:
s[k] = stringify(s[k], stype, fn)
elif type(s) in [list, set]:
for i, k in enumerate(s):
s[i] = stringify(k, stype, fn)
else: #tuple
tmp = []
for k in s:
tmp.append(stringify(k, stype, fn))
s = tuple(tmp)
else:
if fn:
if not stype or (stype == type(s)):
return fn(s)
else:
# To do str(s). But, str() can fail on unicode. So, use .encode instead
if not stype or (stype == type(s)):
try:
return text_type(s)
#return s.encode('ascii', 'replace')
except AttributeError:
return str(s)
except UnicodeDecodeError:
return s.decode('ascii', 'replace')
return s

View file

@ -13,6 +13,7 @@ flask-cors==3.0.10
flask-restx==1.0.3
Flask-SocketIO==5.3.1
Flask==2.2.2
ga4mp==2.0.4
guess_language-spirit==0.5.3
guessit==3.5.0
jsonschema==4.17.0
@ -20,7 +21,6 @@ knowit==0.4.0
peewee==3.15.3
py-pretty==1
pycountry==22.3.5
pyga==2.6.2
pyrsistent==0.19.2
pysubs2==1.4.4
python-engineio==4.3.4