# -*- coding: utf-8 -*-
"""Utilities and tools."""
import codecs
import ipaddress
import json
import logging
import pathlib
import platform
import sys
from datetime import datetime, timedelta, timezone
from itertools import zip_longest
from typing import (Any, Callable, Iterable, Iterator, List, Optional, Tuple,
Union)
from urllib.parse import urljoin
import click
import dateutil.parser
import dateutil.relativedelta
import dateutil.tz
from . import INIT_DOTENV, PACKAGE_FILE, PACKAGE_ROOT, VERSION
from .constants.api import GUI_PAGE_SIZES
from .constants.general import (ERROR_ARGS, ERROR_TMPL, NO, OK_ARGS, OK_TMPL,
WARN_ARGS, WARN_TMPL, YES)
from .exceptions import ToolsError
from .setup_env import find_dotenv, get_env_ax
LOG: logging.Logger = logging.getLogger(PACKAGE_ROOT).getChild("tools")
[docs]def listify(obj: Any, dictkeys: bool = False) -> list:
"""Force an object into a list.
Notes:
* :obj:`list`: returns as is
* :obj:`tuple`: convert to list
* :obj:`None`: returns as an empty list
* any of :data:`axonius_api_client.constants.general.SIMPLE`: return as a list of obj
* :obj:`dict`: if dictkeys is True, return as list of keys of obj,
otherwise return as a list of obj
Args:
obj: object to coerce to list
dictkeys: if obj is dict, return list of keys of obj
"""
if isinstance(obj, list):
return obj
if isinstance(obj, tuple):
return list(obj)
if obj is None:
return []
if isinstance(obj, dict) and dictkeys:
return list(obj)
return [obj]
[docs]def grouper(iterable: Iterable, n: int, fillvalue: Optional[Any] = None) -> Iterator:
"""Split an iterable into chunks.
Args:
iterable: iterable to split into chunks of size n
n: length to split iterable into
fillvalue: value to use as filler for last chunk
"""
return zip_longest(*([iter(iterable)] * n), fillvalue=fillvalue)
[docs]def coerce_int(obj: Any, max_value: Optional[int] = None, min_value: Optional[int] = None) -> int:
"""Convert an object into int.
Args:
obj: object to convert to int
Raises:
:exc:`ToolsError`: if obj is not able to be converted to int
"""
try:
value = int(obj)
except Exception:
vtype = type(obj).__name__
raise ToolsError(f"Supplied value {obj!r} of type {vtype} is not an integer.")
if max_value is not None and value > max_value:
raise ToolsError(f"Supplied value {obj!r} is greater than max value of {max_value}.")
if min_value is not None and value < min_value:
raise ToolsError(f"Supplied value {obj!r} is less than min value of {min_value}.")
return value
[docs]def coerce_int_float(value: Union[int, float, str]) -> Union[int, float]:
"""Convert an object into int or float.
Args:
obj: object to convert to int or float
Raises:
:exc:`ToolsError`: if obj is not able to be converted to int or float
"""
if isinstance(value, float):
return value
if isinstance(value, int):
return value
if isinstance(value, str):
value = value.strip()
if value.isdigit():
return int(value)
if value.replace(".", "").isdigit():
return float(value)
vtype = type(value).__name__
raise ToolsError(f"Supplied value {value!r} of type {vtype} is not an integer or float.")
[docs]def coerce_bool(obj: Any, errmsg: Optional[str] = None) -> bool:
"""Convert an object into bool.
Args:
obj: object to coerce to bool, will check against
:data:`axonius_api_client.constants.general.YES` and
:data:`axonius_api_client.constants.general.NO`
Raises:
:exc:`ToolsError`: obj is not able to be converted to bool
"""
coerce_obj = obj
if isinstance(obj, str):
coerce_obj = coerce_obj.lower().strip()
if coerce_obj in YES:
return True
if coerce_obj in NO:
return False
vtype = type(obj).__name__
msg = listify(errmsg)
msg += [
f"Supplied value {coerce_obj!r} of type {vtype} must be one of:",
f" For true: {YES}",
f" For false: {NO}",
]
raise ToolsError("\n".join(msg))
[docs]def is_int(obj: Any, digit: bool = False) -> bool:
"""Check if obj is int typeable.
Args:
obj: object to check
digit: allow checking str/bytes
"""
if digit:
if (isinstance(obj, str) or isinstance(obj, bytes)) and obj.isdigit():
return True
return not isinstance(obj, bool) and isinstance(obj, int)
[docs]def join_url(url: str, *parts) -> str:
"""Join a URL to any number of parts.
Args:
url: str to add parts to
*parts: str(s) to append to url
"""
url = url.rstrip("/") + "/"
for part in parts:
if not part:
continue
url = url.rstrip("/") + "/"
part = part.lstrip("/")
url = urljoin(url, part)
return url
[docs]def strip_right(obj: Union[List[str], str], fix: str) -> Union[List[str], str]:
"""Strip text from the right side of obj.
Args:
obj: str(s) to strip fix from
fix: str to remove from obj(s)
"""
if isinstance(obj, list) and all([isinstance(x, str) for x in obj]):
return [strip_right(obj=x, fix=fix) for x in obj]
if isinstance(obj, str):
plen = len(fix)
if obj.endswith(fix):
return obj[:-plen]
return obj
[docs]def strip_left(obj: Union[List[str], str], fix: str) -> Union[List[str], str]:
"""Strip text from the left side of obj.
Args:
obj: str(s) to strip fix from
fix: str to remove from obj(s)
"""
if isinstance(obj, list) and all([isinstance(x, str) for x in obj]):
return [strip_left(obj=x, fix=fix) for x in obj]
if isinstance(obj, str):
plen = len(fix)
if obj.startswith(fix):
return obj[plen:]
return obj
[docs]class AxJSONEncoder(json.JSONEncoder):
"""Pass."""
[docs] def default(self, obj):
"""Pass."""
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
[docs]def json_dump(
obj: Any, indent: int = 2, sort_keys: bool = False, error: bool = True, **kwargs
) -> Any:
"""Serialize an object into json str.
Args:
obj: object to serialize into json str
indent: json str indent level
sort_keys: sort dict keys
error: if json error happens, raise it
**kwargs: passed to :func:`json.dumps`
"""
if isinstance(obj, bytes):
obj = obj.decode("utf-8")
kwargs.setdefault("cls", AxJSONEncoder)
kwargs.setdefault("default", str)
try:
return json.dumps(obj, indent=indent, sort_keys=sort_keys, **kwargs)
except Exception:
if error:
raise
return obj
[docs]def json_load(obj: str, error: bool = True, **kwargs) -> Any:
"""Deserialize a json str into an object.
Args:
obj: str to deserialize into obj
error: if json error happens, raise it
**kwargs: passed to :func:`json.loads`
"""
try:
return json.loads(obj, **kwargs)
except Exception:
if error:
raise
return obj
[docs]def json_reload(obj: Any, error: bool = False, trim: int = None, **kwargs) -> str:
"""Re-serialize a json str into a pretty json str.
Args:
obj: str to deserialize into obj and serialize back to str
error: If json error happens, raise it
**kwargs: passed to :func:`json_dump`
"""
obj = json_load(obj=obj, error=error)
if not isinstance(obj, str):
obj = json_dump(obj=obj, error=error, **kwargs)
obj = obj or ""
if isinstance(obj, str):
obj = obj.strip()
if trim and len(obj) >= trim:
obj = obj[:trim] + f"\nTrimmed over {trim} characters"
return obj
[docs]def dt_parse(obj: Union[str, timedelta, datetime], default_tz_utc: bool = False) -> datetime:
"""Parse a str, datetime, or timedelta into a datetime object.
Notes:
* :obj:`str`: will be parsed into datetime obj
* :obj:`datetime.timedelta`: will be parsed into datetime obj as now - timedelta
* :obj:`datetime.datetime`: will be re-parsed into datetime obj
Args:
obj: object or list of objects to parse into datetime
"""
if isinstance(obj, list) and all([isinstance(x, str) for x in obj]):
return [dt_parse(obj=x) for x in obj]
if isinstance(obj, datetime):
obj = str(obj)
if isinstance(obj, timedelta):
obj = str(dt_now() - obj)
value = dateutil.parser.parse(obj)
if default_tz_utc and not value.tzinfo:
value = value.replace(tzinfo=dateutil.tz.tzutc())
return value
[docs]def dt_parse_tmpl(obj: Union[str, timedelta, datetime], tmpl: str = "%Y-%m-%d") -> str:
"""Parse a string into the format used by the REST API.
Args:
obj: date time to parse using :meth:`dt_parse`
tmpl: strftime template to convert obj into
"""
valid_fmts = [
"YYYY-MM-DD",
"YYYYMMDD",
]
try:
dt = dt_parse(obj=obj)
return dt.strftime(tmpl)
except Exception:
vtype = type(obj).__name__
valid = "\n - " + "\n - ".join(valid_fmts)
raise ToolsError(
(
f"Could not parse date {obj!r} of type {vtype}"
f", try a string in the format of:{valid}"
)
)
[docs]def dt_now(
delta: Optional[timedelta] = None,
tz: timezone = dateutil.tz.tzutc(),
) -> datetime:
"""Get the current datetime in for a specific tz.
Args:
delta: convert delta into datetime str instead of returning now
tz: timezone to return datetime in
"""
if isinstance(delta, timedelta):
return dt_parse(obj=delta)
return datetime.now(tz)
[docs]def dt_sec_ago(obj: Union[str, timedelta, datetime], exact: bool = False) -> int:
"""Get number of seconds ago a given datetime was.
Args:
obj: parsed by :meth:`dt_parse` into a datetime obj
"""
obj = dt_parse(obj=obj)
now = dt_now(tz=obj.tzinfo)
value = (now - obj).total_seconds()
return value if exact else round(value)
[docs]def dt_min_ago(obj: Union[str, timedelta, datetime]) -> int:
"""Get number of minutes ago a given datetime was.
Args:
obj: parsed by :meth:`dt_sec_ago` into seconds ago
"""
return round(dt_sec_ago(obj=obj) / 60)
[docs]def dt_days_left(obj: Optional[Union[str, timedelta, datetime]]) -> Optional[int]:
"""Get number of days left until a given datetime.
Args:
obj: parsed by :meth:`dt_sec_ago` into days left
"""
if obj:
obj = dt_parse(obj=obj)
now = dt_now(tz=obj.tzinfo)
seconds = (obj - now).total_seconds()
return round(seconds / 60 / 60 / 24)
return None
[docs]def dt_within_min(
obj: Union[str, timedelta, datetime],
n: Optional[Union[str, int]] = None,
) -> bool:
"""Check if given datetime is within the past n minutes.
Args:
obj: parsed by :meth:`dt_min_ago` into minutes ago
n: int of :meth:`dt_min_ago` should be greater than or equal to
"""
if not is_int(obj=n, digit=True):
return False
return dt_min_ago(obj=obj) >= int(n)
[docs]def get_path(obj: Union[str, pathlib.Path]) -> pathlib.Path:
"""Convert a str into a fully resolved & expanded Path object.
Args:
obj: obj to convert into expanded and resolved absolute Path obj
"""
return pathlib.Path(obj).expanduser().resolve()
[docs]def path_read(
obj: Union[str, pathlib.Path], binary: bool = False, is_json: bool = False, **kwargs
) -> Union[bytes, str]:
"""Read data from a file.
Notes:
* if path filename ends with ".json", data will be deserialized using
:meth:`json_load`
Args:
obj: path to read data form, parsed by :meth:`get_path`
binary: read the data as binary instead of str
is_json: deserialize data using :meth:`json_load`
**kwargs: passed to :meth:`json_load`
Raises:
:exc:`ToolsError`: path does not exist as file
"""
robj = get_path(obj=obj)
if not robj.is_file():
raise ToolsError(f"Supplied path='{obj}' (resolved='{robj}') does not exist!")
if binary:
data = robj.read_bytes()
else:
data = robj.read_text()
if is_json:
data = json_load(obj=data, **kwargs)
if robj.suffix == ".json" and isinstance(data, str):
kwargs.setdefault("error", False)
data = json_load(obj=data, **kwargs)
return robj, data
[docs]def path_write(
obj: Union[str, pathlib.Path],
data: Union[bytes, str],
overwrite: bool = False,
binary: bool = False,
binary_encoding: str = "utf-8",
is_json: bool = False,
make_parent: bool = True,
protect_file=0o600,
protect_parent=0o700,
**kwargs,
) -> Tuple[pathlib.Path, Callable]:
"""Write data to a file.
Notes:
* if obj filename ends with ".json", serializes data using :meth:`json_dump`.
Args:
obj: path to write data to, parsed by :meth:`get_path`
data: data to write to obj
binary: write the data as binary instead of str
binary_encoding: encoding to use when switching from str/bytes
is_json: serialize data using :meth:`json_load`
overwrite: overwrite obj if exists
make_parent: If the parent directory does not exist, create it
protect_file: octal mode of permissions to set on file
protect_dir: octal mode of permissions to set on parent directory when creating
**kwargs: passed to :meth:`json_dump`
Raises:
:exc:`ToolsError`: path exists as file and overwrite is False
:exc:`ToolsError`: if parent path does not exist and make_parent is False
"""
obj = get_path(obj=obj)
if is_json:
data = json_dump(obj=data, **kwargs)
if obj.suffix == ".json" and not (isinstance(data, str) or isinstance(data, bytes)):
kwargs.setdefault("error", False)
data = json_dump(obj=data, **kwargs)
if binary:
if isinstance(data, str):
data = data.encode(binary_encoding)
method = obj.write_bytes
else:
if isinstance(data, bytes):
data = data.decode(binary_encoding)
method = obj.write_text
if obj.is_file() and overwrite is False:
raise ToolsError(f"File '{obj}' already exists and overwrite is False")
if not obj.parent.is_dir():
if make_parent:
obj.parent.mkdir(mode=protect_parent, parents=True, exist_ok=True)
else:
error = f"Directory '{obj.parent}' does not exist and make_parent is False"
raise ToolsError(error)
obj.touch()
if protect_file:
obj.chmod(protect_file)
return obj, method(data)
[docs]def longest_str(obj: List[str]) -> int:
"""Determine the length of the longest string in a list of strings.
Args:
obj: list of strings to calculate length of
"""
return round(max([len(x) + 5 for x in obj]), -1)
[docs]def split_str(
obj: Union[List[str], str],
split: str = ",",
strip: Optional[str] = None,
do_strip: bool = True,
lower: bool = True,
empty: bool = False,
) -> List[str]:
"""Split a string or list of strings into a list of strings.
Args:
obj: string or list of strings to split
split: character to split on
strip: characters to strip
do_strip: strip each item from the split
lower: lowercase each item from the split
empty: remove empty items post split
"""
if obj is None:
return []
if isinstance(obj, list):
return [
y
for x in obj
for y in split_str(
obj=x,
split=split,
strip=strip,
do_strip=do_strip,
lower=lower,
empty=empty,
)
]
if not isinstance(obj, str):
raise ToolsError(f"Unable to split non-str value {obj}")
ret = []
for x in obj.split(split):
if lower:
x = x.lower()
if do_strip:
x = x.strip(strip)
if not empty and not x:
continue
ret.append(x)
return ret
[docs]def echo_ok(msg: str, tmpl: bool = True, **kwargs):
"""Echo a message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
"""
echoargs = {}
echoargs.update(OK_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = OK_TMPL.format(msg=msg)
LOG.info(msg)
click.secho(msg, **echoargs)
[docs]def echo_warn(msg: str, tmpl: bool = True, **kwargs):
"""Echo a warning message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
"""
echoargs = {}
echoargs.update(WARN_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = WARN_TMPL.format(msg=msg)
LOG.warning(msg)
click.secho(msg, **echoargs)
[docs]def echo_error(msg: str, abort: bool = True, tmpl: bool = True, **kwargs):
"""Echo an error message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
abort: call sys.exit(1) after echoing message
"""
echoargs = {}
echoargs.update(ERROR_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = ERROR_TMPL.format(msg=msg)
LOG.error(msg)
click.secho(msg, **echoargs)
if abort:
sys.exit(1)
[docs]def sysinfo() -> dict:
"""Gather system information."""
info = {}
info["API Client Version"] = VERSION
info["API Client Package"] = PACKAGE_FILE
info["Init loaded .env file"] = INIT_DOTENV
info["Path to .env file"] = find_dotenv()
info["OS envs"] = get_env_ax()
info["Date"] = str(dt_now())
info["Python System Version"] = ", ".join(sys.version.splitlines())
info["Command Line Args"] = sys.argv
platform_attrs = [
"machine",
"node",
"platform",
"processor",
"python_branch",
"python_compiler",
"python_implementation",
"python_revision",
"python_version",
"release",
"system",
"version",
"win32_edition",
]
for attr in platform_attrs:
method = getattr(platform, attr, None)
value = "unavailable"
if method:
value = method()
attr = attr.replace("_", " ").title()
info[attr] = value
return info
[docs]def calc_percent(part: Union[int, float], whole: Union[int, float], places: int = 2) -> float:
"""Calculate the percentage of part out of whole.
Args:
part: number to get percent of whole
whole: number to caclulate against part
places: number of decimal places to return
"""
if 0 in [part, whole]:
value = 0.00
elif part > whole:
value = 100.00
else:
value = 100 * (part / whole)
value = trim_float(value=value, places=places)
return value
[docs]def trim_float(value: float, places: int = 2) -> float:
"""Trim a float to N places.
Args:
value: float to trim
places: decimal places to trim value to
"""
if isinstance(places, int):
value = float(f"{value:.{places}f}")
return value
[docs]def join_kv(
obj: Union[List[dict], dict], listjoin: str = ", ", tmpl: str = "{k}: {v!r}"
) -> List[str]:
"""Join a dictionary into key value strings.
Args:
obj: dict or list of dicts to stringify
listjoin: string to use for joining
tmpl: template to format key value pairs of dict
"""
if isinstance(obj, list):
return [join_kv(obj=x, listjoin=listjoin, tmpl=tmpl) for x in obj]
if not isinstance(obj, dict):
raise ToolsError(f"Object must be a dict, supplied {type(obj)}")
items = []
for k, v in obj.items():
if isinstance(v, list):
v = listjoin.join([str(i) for i in v])
items.append(tmpl.format(k=k, v=v))
return items
[docs]def get_type_str(obj: Any):
"""Get the type name of a class.
Args:
obj: class or tuple of classes to get type name(s) of
"""
if isinstance(obj, tuple):
return " or ".join([x.__name__ for x in obj])
else:
return obj.__name__
[docs]def check_type(value: Any, exp: Any, name: str = "", exp_items: Optional[Any] = None):
"""Check that a value is the appropriate type.
Args:
value: value to check type of
exp: type(s) that value should be
name: identifier of what value is for
exp_items: if value is a list, type(s) that list items should be
"""
name = f" for {name!r}" if name else ""
if not isinstance(value, exp):
vtype = get_type_str(obj=type(value))
etype = get_type_str(obj=exp)
err = f"Required type {etype}{name} but received type {vtype}: {value!r}"
raise ToolsError(err)
if exp_items and isinstance(value, list):
for idx, item in enumerate(value):
if isinstance(item, exp_items):
continue
vtype = get_type_str(obj=type(item))
etype = get_type_str(obj=exp_items)
err = (
f"Required type {etype}{name} in list item {idx} but received "
f"type {vtype}: {value!r}"
)
raise ToolsError(err)
[docs]def check_empty(value: Any, name: str = ""):
"""Check if a value is empty.
Args:
value: value to check type of
name: identifier of what value is for
"""
if not value:
vtype = type(value).__name__
name = f" for {name!r}" if name else ""
err = f"Required value{name} but received an empty {vtype}: {value!r}"
raise ToolsError(err)
[docs]def get_raw_version(value: str) -> str:
"""Caclulate the raw bits of a version str.
Args:
value: version to calculate
"""
check_type(value=value, exp=str)
converted = "0"
version = value
if ":" in value:
if "." in value and value.index(":") > value.index("."):
raise ToolsError(f"Invalid version with ':' after '.' in {value!r}")
converted, version = value.split(":", 1)
octects = version.split(".")
for octect in octects:
if not octect.isdigit():
raise ToolsError(f"Invalid version with non-digit {octect!r} in {value!r}")
if len(octect) > 8:
octect = octect[:8]
converted += "".join(["0" for _ in range(8 - len(octect))]) + octect
return converted
[docs]def coerce_str_to_csv(value: str) -> List[str]:
"""Coerce a string into a list of strings.
Args:
value: string to seperate using comma
"""
new_value = value
if isinstance(value, str):
new_value = [x.strip() for x in value.split(",") if x.strip()]
if not new_value:
raise ToolsError(f"Empty value after parsing CSV: {value!r}")
if not isinstance(new_value, (list, tuple)):
vtype = type(new_value).__name__
raise ToolsError(f"Invalid type {vtype} supplied, must be a list")
if not new_value:
raise ToolsError(f"Empty list supplied {value}")
return new_value
[docs]def parse_ip_address(value: str) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Parse a string into an IP address.
Args:
value: ip address
"""
try:
return ipaddress.ip_address(value)
except Exception as exc:
raise ToolsError(str(exc))
[docs]def parse_ip_network(value: str) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network]:
"""Parse a string into an IP network.
Args:
value: ip network
"""
if "/" not in str(value):
vtype = type(value).__name__
raise ToolsError(
(
f"Supplied value {value!r} of type {vtype} is not a valid subnet "
"- format must be <address>/<CIDR>."
)
)
try:
return ipaddress.ip_network(value)
except Exception as exc:
raise ToolsError(str(exc))
[docs]def kv_dump(obj: dict) -> str:
"""Get a string representation of a dictionaries key value pairs.
Args:
obj: dictionary to get string of
"""
return "\n " + "\n ".join([f"{k}: {v}" for k, v in obj.items()])
[docs]def bom_strip(content: str, strip=True) -> str:
"""Remove the UTF-8 BOM marker from the beginning of a string.
Args:
content: string to remove BOM marker from if found
strip: remove whitespace before & after removing BOM marker
"""
if strip:
content = content.strip()
if content.startswith(codecs.BOM_UTF8.decode()):
content = content[1:]
if strip:
content = content.strip()
return content
[docs]def read_stream(stream) -> str:
"""Try to read input from a stream.
Args:
stream: stdin or a file descriptor to read input from
"""
stream_name = format(getattr(stream, "name", stream))
if stream.isatty():
raise ToolsError(f"No input provided on {stream_name!r}")
# its STDIN with input or a file
content = stream.read().strip()
if not content:
raise ToolsError(f"Empty content supplied to {stream_name!r}")
return content
[docs]def check_gui_page_size(size: Optional[int] = None) -> int:
"""Check page size to see if it one of the valid GUI page sizes.
Args:
size: page size to check
Raises:
:exc:`ApiError`: if size is not one of
:data:`axonius_api_client.constants.api.GUI_PAGE_SIZES`
"""
size = size or GUI_PAGE_SIZES[0]
if size not in GUI_PAGE_SIZES:
raise ToolsError(f"gui_page_size of {size} is invalid, must be one of {GUI_PAGE_SIZES}")
return size
[docs]def calc_gb(value: Union[str, int], places: int = 2, is_kb: bool = True) -> float:
"""Convert bytes into GB.
Args:
value: bytes
places: decimal places to trim value to
is_kb: values are in kb or bytes
"""
value = coerce_int_float(value=value)
value = value / 1024 / 1024
value = (value / 1024) if not is_kb else value
value = trim_float(value=value, places=places)
return value
[docs]def calc_perc_gb(
obj: dict,
whole_key: str,
part_key: str,
perc_key: Optional[str] = None,
places: int = 2,
update: bool = True,
is_kb: bool = True,
) -> dict:
"""Calculate the GB and percent from a dict.
Args:
obj: dict to get whole_key and part_key from
whole_key: key to get whole value from and convert to GB and set as whole_key_gb
part_key: key to get part value from and convert to GB and set as part_key_gb
perc_key: key to set percent in
is_kb: values are in kb or bytes
"""
perc_key = perc_key or f"{part_key}_percent"
whole_value = obj[whole_key] or 0
part_value = obj[part_key] or 0
whole_gb = calc_gb(value=whole_value, places=places, is_kb=is_kb)
part_gb = calc_gb(value=part_value, places=places, is_kb=is_kb)
perc = calc_percent(part=part_gb, whole=whole_gb, places=places)
ret = obj if update else {}
ret[f"{part_key}_gb"] = part_gb
ret[f"{whole_key}_gb"] = whole_gb
ret[perc_key] = perc
return ret
[docs]def get_subcls(cls) -> list:
"""Get all subclasses of a class."""
subs = [s for c in cls.__subclasses__() for s in get_subcls(c)]
return list(set(cls.__subclasses__()).union(subs))
[docs]def prettify_obj(obj, indent=0):
"""Pass."""
spaces = " " * indent
sub_indent = indent + 2
if isinstance(obj, dict):
lines = ["", f"{spaces}-----"] if not indent else []
for k, v in obj.items():
lines += [f"{spaces}- {k}:", *prettify_obj(v, sub_indent)]
return lines
elif isinstance(obj, list):
return [y for x in obj for y in prettify_obj(x, indent)]
return [f"{spaces} {obj}"]
[docs]def token_parse(obj: str) -> str:
"""Pass."""
url_check = "token="
if url_check in obj:
idx = obj.index(url_check) + len(url_check)
obj = obj[idx:]
return obj
[docs]def combo_dicts(*args):
"""Pass."""
ret = {}
for x in args:
if isinstance(x, dict):
ret.update(x)
return ret