# -*- coding: utf-8 -*-
"""Utilities and tools."""
import calendar
import codecs
import csv
import inspect
import io
import ipaddress
import json
import logging
import pathlib
import platform
import re
import sys
from datetime import datetime, timedelta, timezone
from itertools import zip_longest
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Pattern,
Tuple,
Type,
TypeVar,
Union,
)
from urllib.parse import urljoin
import click
import dateutil.parser
import dateutil.relativedelta
import dateutil.tz
from . import INIT_DOTENV, PACKAGE_FILE, PACKAGE_ROOT, VERSION
from .constants.api import GUI_PAGE_SIZES
from .constants.general import (
DEBUG_ARGS,
DEBUG_TMPL,
ERROR_ARGS,
ERROR_TMPL,
FILE_DATE_FMT,
NO,
OK_ARGS,
OK_TMPL,
TRIM_MSG,
URL_STARTS,
WARN_ARGS,
WARN_TMPL,
YES,
)
from .constants.logs import MAX_BODY_LEN
from .exceptions import ToolsError
from .setup_env import find_dotenv, get_env_ax
LOG: logging.Logger = logging.getLogger(PACKAGE_ROOT).getChild("tools")
EMAIL_RE_STR: str = (
r"([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\"([]!#-[^-~ \t]|(\\[\t -~]))+\")"
r"@([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\[[\t -Z^-~]*])"
)
EMAIL_RE: Pattern = re.compile(EMAIL_RE_STR, re.I)
PathLike: TypeVar = TypeVar("PathLike", pathlib.Path, str, bytes)
DAYS_MAP: dict = dict(zip(range(7), calendar.day_name))
[docs]def listify(obj: Any, dictkeys: bool = False) -> list:
"""Force an object into a list.
Notes:
* :obj:`list`: returns as is
* :obj:`tuple`: convert to list
* :obj:`None`: returns as an empty list
* any of :data:`axonius_api_client.constants.general.SIMPLE`: return as a list of obj
* :obj:`dict`: if dictkeys is True, return as list of keys of obj,
otherwise return as a list of obj
Args:
obj: object to coerce to list
dictkeys: if obj is dict, return list of keys of obj
"""
if isinstance(obj, list):
return obj
if isinstance(obj, tuple):
return list(obj)
if obj is None:
return []
if isinstance(obj, dict) and dictkeys:
return list(obj)
return [obj]
[docs]def grouper(iterable: Iterable, n: int, fillvalue: Optional[Any] = None) -> Iterator:
"""Split an iterable into chunks.
Args:
iterable: iterable to split into chunks of size n
n: length to split iterable into
fillvalue: value to use as filler for last chunk
"""
return zip_longest(*([iter(iterable)] * n), fillvalue=fillvalue)
[docs]def coerce_int(
obj: Any,
max_value: Optional[int] = None,
min_value: Optional[int] = None,
allow_none: bool = False,
valid_values: Optional[List[int]] = None,
errmsg: Optional[str] = None,
) -> int:
"""Convert an object into int.
Args:
obj: object to convert to int
Raises:
:exc:`ToolsError`: if obj is not able to be converted to int
"""
if allow_none and (obj is None or str(obj).lower().strip() in ["none", "null"]):
return None
pre = f"{errmsg}\n" if errmsg else ""
vtype = type(obj).__name__
try:
value = int(obj)
except Exception:
raise ToolsError(f"{pre}Supplied value {obj!r} of type {vtype} is not an integer.")
if max_value is not None and value > max_value:
raise ToolsError(f"{pre}Supplied value {obj!r} is greater than max value of {max_value}.")
if min_value is not None and value < min_value:
raise ToolsError(f"{pre}Supplied value {obj!r} is less than min value of {min_value}.")
if valid_values and value not in valid_values:
raise ToolsError(f"{pre}Supplied value {obj!r} is not one of {valid_values}.")
return value
[docs]def coerce_int_float(value: Union[int, float, str]) -> Union[int, float]:
"""Convert an object into int or float.
Args:
obj: object to convert to int or float
Raises:
:exc:`ToolsError`: if obj is not able to be converted to int or float
"""
if isinstance(value, float):
return value
if isinstance(value, int):
return value
if isinstance(value, str):
value = value.strip()
if value.isdigit():
return int(value)
if value.replace(".", "").isdigit():
return float(value)
vtype = type(value).__name__
raise ToolsError(f"Supplied value {value!r} of type {vtype} is not an integer or float.")
[docs]def coerce_bool(obj: Any, errmsg: Optional[str] = None) -> bool:
"""Convert an object into bool.
Args:
obj: object to coerce to bool, will check against
:data:`axonius_api_client.constants.general.YES` and
:data:`axonius_api_client.constants.general.NO`
Raises:
:exc:`ToolsError`: obj is not able to be converted to bool
"""
def combine(obj):
return ", ".join([f"{x!r}" for x in obj])
coerce_obj = obj
if isinstance(obj, str):
coerce_obj = coerce_obj.lower().strip()
if coerce_obj in YES:
return True
if coerce_obj in NO:
return False
vtype = type(obj).__name__
msg = listify(errmsg)
msg += [
f"Supplied value {coerce_obj!r} of type {vtype} must be one of:",
f" For True: {combine(YES)}",
f" For False: {combine(NO)}",
]
raise ToolsError("\n".join(msg))
[docs]def is_str(value: Any, not_empty: bool = True) -> bool:
"""Check if value is non empty string."""
return isinstance(value, str) and (
isinstance(value, str) and bool(value.strip()) if not_empty else True
)
[docs]def is_email(value: Any) -> bool:
"""Check if a value is a valid email."""
return is_str(value=value, not_empty=True) and bool(EMAIL_RE.fullmatch(value))
[docs]def is_int(obj: Any, digit: bool = False) -> bool:
"""Check if obj is int typeable.
Args:
obj: object to check
digit: allow checking str/bytes
"""
if digit:
if (isinstance(obj, str) or isinstance(obj, bytes)) and obj.isdigit():
return True
return not isinstance(obj, bool) and isinstance(obj, int)
[docs]def join_url(url: str, *parts) -> str:
"""Join a URL to any number of parts.
Args:
url: str to add parts to
*parts: str(s) to append to url
"""
url = url.rstrip("/") + "/"
for part in parts:
if not part:
continue
url = url.rstrip("/") + "/"
part = part.lstrip("/")
url = urljoin(url, part)
return url
[docs]def strip_right(obj: Union[List[str], str], fix: str) -> Union[List[str], str]:
"""Strip text from the right side of obj.
Args:
obj: str(s) to strip fix from
fix: str to remove from obj(s)
"""
if isinstance(obj, list) and all([isinstance(x, str) for x in obj]):
return [strip_right(obj=x, fix=fix) for x in obj]
if isinstance(obj, str):
plen = len(fix)
if obj.endswith(fix):
return obj[:-plen]
return obj
[docs]def strip_left(obj: Union[List[str], str], fix: str) -> Union[List[str], str]:
"""Strip text from the left side of obj.
Args:
obj: str(s) to strip fix from
fix: str to remove from obj(s)
"""
if isinstance(obj, list) and all([isinstance(x, str) for x in obj]):
return [strip_left(obj=x, fix=fix) for x in obj]
if isinstance(obj, str):
plen = len(fix)
if obj.startswith(fix):
return obj[plen:]
return obj
[docs]class AxJSONEncoder(json.JSONEncoder):
"""Pass."""
[docs] def __init__(self, *args, **kwargs):
"""Pass."""
self.fallback = kwargs.pop("fallback", None)
super().__init__(*args, **kwargs)
[docs] def default(self, obj):
"""Pass."""
if isinstance(obj, datetime):
return obj.isoformat()
if has_to_dict(obj):
return obj.to_dict()
if callable(getattr(self, "fallback", None)):
return self.fallback(obj)
return super().default(obj) # pragma: no cover
[docs]def has_to_dict(obj: Any) -> bool:
"""Pass."""
return hasattr(obj, "to_dict") and callable(obj.to_dict)
[docs]def json_dump(
obj: Any,
indent: int = 2,
sort_keys: bool = False,
error: bool = True,
fallback: Any = str,
to_dict: bool = True,
cls: Type = AxJSONEncoder,
**kwargs,
) -> Any:
"""Serialize an object into json str.
Args:
obj: object to serialize into json str
indent: json str indent level
sort_keys: sort dict keys
error: if json error happens, raise it
**kwargs: passed to :func:`json.dumps`
"""
obj = bytes_to_str(value=obj)
if to_dict and has_to_dict(obj):
obj = obj.to_dict()
try:
return json.dumps(
obj, indent=indent, sort_keys=sort_keys, cls=cls, fallback=fallback, **kwargs
)
except Exception: # pragma: no cover
if error:
raise
return obj
[docs]def json_load(obj: str, error: bool = True, **kwargs) -> Any:
"""Deserialize a json str into an object.
Args:
obj: str to deserialize into obj
error: if json error happens, raise it
**kwargs: passed to :func:`json.loads`
"""
try:
return json.loads(obj, **kwargs)
except Exception:
if error:
raise
return obj
[docs]def json_log(
obj: Any,
error: bool = False,
trim: Optional[int] = MAX_BODY_LEN,
trim_lines: bool = True,
trim_msg: str = TRIM_MSG,
**kwargs,
) -> str: # pragma: no cover
"""Pass."""
return json_reload(
obj=obj, error=error, trim=trim, trim_lines=trim_lines, trim_msg=trim_msg, **kwargs
)
[docs]def json_reload(
obj: Any,
error: bool = False,
trim: Optional[int] = None,
trim_lines: bool = False,
trim_msg: str = TRIM_MSG,
**kwargs,
) -> str:
"""Re-serialize a json str into a pretty json str.
Args:
obj: str to deserialize into obj and serialize back to str
error: If json error happens, raise it
**kwargs: passed to :func:`json_dump`
"""
obj = json_load(obj=obj, error=error)
if not isinstance(obj, str):
obj = json_dump(obj=obj, error=error, **kwargs)
obj = coerce_str(value=obj, trim=trim, trim_msg=trim_msg, trim_lines=trim_lines)
return obj
[docs]def dt_parse(obj: Union[str, timedelta, datetime], default_tz_utc: bool = False) -> datetime:
"""Parse a str, datetime, or timedelta into a datetime object.
Notes:
* :obj:`str`: will be parsed into datetime obj
* :obj:`datetime.timedelta`: will be parsed into datetime obj as now - timedelta
* :obj:`datetime.datetime`: will be re-parsed into datetime obj
Args:
obj: object or list of objects to parse into datetime
"""
if isinstance(obj, list) and all([isinstance(x, str) for x in obj]):
return [dt_parse(obj=x) for x in obj]
if isinstance(obj, datetime):
obj = str(obj)
if isinstance(obj, timedelta):
obj = str(dt_now() - obj)
value = dateutil.parser.parse(obj)
if default_tz_utc and not value.tzinfo:
value = value.replace(tzinfo=dateutil.tz.tzutc())
return value
[docs]def dt_parse_tmpl(obj: Union[str, timedelta, datetime], tmpl: str = "%Y-%m-%d") -> str:
"""Parse a string into the format used by the REST API.
Args:
obj: date time to parse using :meth:`dt_parse`
tmpl: strftime template to convert obj into
"""
valid_fmts = [
"YYYY-MM-DD",
"YYYYMMDD",
]
try:
dt = dt_parse(obj=obj)
return dt.strftime(tmpl)
except Exception:
vtype = type(obj).__name__
valid = "\n - " + "\n - ".join(valid_fmts)
raise ToolsError(
(
f"Could not parse date {obj!r} of type {vtype}"
f", try a string in the format of:{valid}"
)
)
[docs]def dt_now(
delta: Optional[timedelta] = None,
tz: timezone = dateutil.tz.tzutc(),
) -> datetime:
"""Get the current datetime in for a specific tz.
Args:
delta: convert delta into datetime str instead of returning now
tz: timezone to return datetime in
"""
if isinstance(delta, timedelta):
return dt_parse(obj=delta)
return datetime.now(tz)
[docs]def dt_now_file(fmt: str = FILE_DATE_FMT, **kwargs):
"""Pass."""
return dt_now(**kwargs).strftime(fmt)
[docs]def dt_sec_ago(obj: Union[str, timedelta, datetime], exact: bool = False) -> int:
"""Get number of seconds ago a given datetime was.
Args:
obj: parsed by :meth:`dt_parse` into a datetime obj
"""
obj = dt_parse(obj=obj)
now = dt_now(tz=obj.tzinfo)
value = (now - obj).total_seconds()
return value if exact else round(value)
[docs]def dt_min_ago(obj: Union[str, timedelta, datetime]) -> int:
"""Get number of minutes ago a given datetime was.
Args:
obj: parsed by :meth:`dt_sec_ago` into seconds ago
"""
return round(dt_sec_ago(obj=obj) / 60)
[docs]def dt_days_left(obj: Optional[Union[str, timedelta, datetime]]) -> Optional[int]:
"""Get number of days left until a given datetime.
Args:
obj: parsed by :meth:`dt_sec_ago` into days left
"""
ret = None
if obj:
obj = dt_parse(obj=obj)
now = dt_now(tz=obj.tzinfo)
seconds = (obj - now).total_seconds()
ret = round(seconds / 60 / 60 / 24)
return ret
[docs]def dt_within_min(
obj: Union[str, timedelta, datetime],
n: Optional[Union[str, int]] = None,
) -> bool:
"""Check if given datetime is within the past n minutes.
Args:
obj: parsed by :meth:`dt_min_ago` into minutes ago
n: int of :meth:`dt_min_ago` should be greater than or equal to
"""
if not is_int(obj=n, digit=True):
return False
return dt_min_ago(obj=obj) >= int(n)
[docs]def get_path(obj: PathLike) -> pathlib.Path:
"""Convert a str into a fully resolved & expanded Path object.
Args:
obj: obj to convert into expanded and resolved absolute Path obj
"""
return pathlib.Path(obj).expanduser().resolve()
[docs]def path_read(
obj: PathLike, binary: bool = False, is_json: bool = False, **kwargs
) -> Union[bytes, str]:
"""Read data from a file.
Notes:
* if path filename ends with ".json", data will be deserialized using
:meth:`json_load`
Args:
obj: path to read data form, parsed by :meth:`get_path`
binary: read the data as binary instead of str
is_json: deserialize data using :meth:`json_load`
**kwargs: passed to :meth:`json_load`
Raises:
:exc:`ToolsError`: path does not exist as file
"""
robj = get_path(obj=obj)
if not robj.is_file():
raise ToolsError(f"Supplied path='{obj}' (resolved='{robj}') does not exist!")
if binary:
data = robj.read_bytes()
else:
data = robj.read_text()
if is_json:
data = json_load(obj=data, **kwargs)
if robj.suffix == ".json" and isinstance(data, str):
kwargs.setdefault("error", False)
data = json_load(obj=data, **kwargs)
return robj, data
[docs]def get_backup_filename(path: PathLike) -> str:
"""Pass."""
path = get_path(obj=path)
return f"{path.stem}_{dt_now_file()}{path.suffix}"
[docs]def get_backup_path(path: PathLike) -> pathlib.Path:
"""Pass."""
path = get_path(obj=path)
return path.parent / get_backup_filename(path=path)
[docs]def check_path_is_not_dir(path: PathLike) -> pathlib.Path:
"""Pass."""
path = get_path(obj=path)
if path.is_dir():
raise ToolsError(f"'{path}' is a directory, not a file")
return path
[docs]def path_create_parent_dir(
path: PathLike, make_parent: bool = True, protect_parent=0o700
) -> pathlib.Path:
"""Pass."""
path = get_path(obj=path)
if not path.parent.is_dir():
if make_parent:
path.parent.mkdir(mode=protect_parent, parents=True, exist_ok=True)
else:
raise ToolsError(
f"Parent directory '{path.parent}' does not exist and make_parent is False"
)
return path
[docs]def path_backup_file(
path: PathLike,
backup_path: Optional[PathLike] = None,
make_parent: bool = True,
protect_parent=0o700,
**kwargs,
) -> pathlib.Path:
"""Pass."""
path = get_path(obj=path)
if not path.is_file():
raise ToolsError(f"'{path}' does not exist as a file, can not backup")
if backup_path:
backup_path = get_path(obj=backup_path)
else:
backup_path = get_backup_path(path=path)
check_path_is_not_dir(path=backup_path)
if backup_path.is_file():
backup_path = get_backup_path(path=backup_path)
path_create_parent_dir(path=backup_path, make_parent=make_parent, protect_parent=protect_parent)
path.rename(backup_path)
return backup_path
[docs]def auto_suffix(
path: PathLike,
data: Union[bytes, str],
error: bool = False,
**kwargs,
) -> Union[bytes, str]:
"""Pass."""
path = get_path(obj=path)
if path.suffix == ".json" and not (isinstance(data, str) or isinstance(data, bytes)):
data = json_dump(obj=data, error=error, **kwargs)
return data
[docs]def path_write(
obj: PathLike,
data: Union[bytes, str],
overwrite: bool = False,
backup: bool = False,
backup_path: Optional[PathLike] = None,
binary: bool = False,
binary_encoding: str = "utf-8",
is_json: bool = False,
make_parent: bool = True,
protect_file=0o600,
protect_parent=0o700,
suffix_auto: bool = True,
**kwargs,
) -> Tuple[pathlib.Path, Tuple[int, Optional[pathlib.Path]]]:
"""Write data to a file.
Notes:
* if obj filename ends with ".json", serializes data using :meth:`json_dump`.
Args:
obj: path to write data to, parsed by :meth:`get_path`
data: data to write to obj
binary: write the data as binary instead of str
binary_encoding: encoding to use when switching from str/bytes
is_json: serialize data using :meth:`json_load`
overwrite: overwrite obj if exists
make_parent: If the parent directory does not exist, create it
protect_file: octal mode of permissions to set on file
protect_dir: octal mode of permissions to set on parent directory when creating
**kwargs: passed to :meth:`json_dump`
Raises:
:exc:`ToolsError`: path exists as file and overwrite is False
:exc:`ToolsError`: if parent path does not exist and make_parent is False
"""
obj = get_path(obj=obj)
if is_json:
data = json_dump(**combo_dicts(kwargs, obj=data))
if suffix_auto:
data = auto_suffix(**combo_dicts(kwargs, path=obj, data=data))
if binary:
if isinstance(data, str):
data = data.encode(binary_encoding)
method = obj.write_bytes
else:
if isinstance(data, bytes):
data = data.decode(binary_encoding)
method = obj.write_text
check_path_is_not_dir(path=obj)
if obj.exists():
if backup:
backup_path = path_backup_file(
path=obj,
backup_path=backup_path,
make_parent=make_parent,
protect_parent=protect_parent,
)
elif overwrite is False:
raise ToolsError(f"File '{obj}' already exists and overwrite is False")
else:
path_create_parent_dir(path=obj, make_parent=make_parent, protect_parent=protect_parent)
obj.touch()
if protect_file:
obj.chmod(protect_file)
bytes_written = method(data)
return obj, (bytes_written, backup_path)
[docs]def longest_str(obj: List[str]) -> int:
"""Determine the length of the longest string in a list of strings.
Args:
obj: list of strings to calculate length of
"""
return round(max([len(x) + 5 for x in obj]), -1)
[docs]def split_str(
obj: Union[List[str], str],
split: str = ",",
strip: Optional[str] = None,
do_strip: bool = True,
lower: bool = True,
empty: bool = False,
) -> List[str]:
"""Split a string or list of strings into a list of strings.
Args:
obj: string or list of strings to split
split: character to split on
strip: characters to strip
do_strip: strip each item from the split
lower: lowercase each item from the split
empty: remove empty items post split
"""
if obj is None:
return []
if isinstance(obj, list):
return [
y
for x in obj
for y in split_str(
obj=x,
split=split,
strip=strip,
do_strip=do_strip,
lower=lower,
empty=empty,
)
]
if not isinstance(obj, str):
raise ToolsError(f"Unable to split non-str value {obj}")
ret = []
for x in obj.split(split):
if lower:
x = x.lower()
if do_strip:
x = x.strip(strip)
if not empty and not x:
continue
ret.append(x)
return ret
[docs]def echo_debug(msg: str, tmpl: bool = True, **kwargs):
"""Echo a message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
"""
echoargs = {}
echoargs.update(DEBUG_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = DEBUG_TMPL.format(msg=msg)
LOG.debug(msg)
click.secho(msg, **echoargs)
[docs]def echo_ok(msg: str, tmpl: bool = True, **kwargs):
"""Echo a message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
"""
echoargs = {}
echoargs.update(OK_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = OK_TMPL.format(msg=msg)
LOG.info(msg)
click.secho(msg, **echoargs)
[docs]def echo_warn(msg: str, tmpl: bool = True, **kwargs):
"""Echo a warning message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
"""
echoargs = {}
echoargs.update(WARN_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = WARN_TMPL.format(msg=msg)
LOG.warning(msg)
click.secho(msg, **echoargs)
[docs]def echo_error(msg: str, abort: bool = True, tmpl: bool = True, **kwargs):
"""Echo an error message to console.
Args:
msg: message to echo
tmpl: template to using for echo
kwargs: passed to ``click.secho``
abort: call sys.exit(1) after echoing message
"""
echoargs = {}
echoargs.update(ERROR_ARGS)
echoargs.update(kwargs)
if tmpl:
msg = ERROR_TMPL.format(msg=msg)
LOG.error(msg)
click.secho(msg, **echoargs)
if abort:
sys.exit(1)
[docs]def sysinfo() -> dict:
"""Gather system information."""
try:
cli_args = sys.argv
except Exception: # pragma: no cover
cli_args = "No sys.argv!"
info = {}
info["API Client Version"] = VERSION
info["API Client Package"] = PACKAGE_FILE
info["Init loaded .env file"] = INIT_DOTENV
info["Path to .env file"] = find_dotenv()
info["OS envs"] = get_env_ax()
info["Date"] = str(dt_now())
info["Python System Version"] = ", ".join(sys.version.splitlines())
info["Command Line Args"] = cli_args
platform_attrs = [
"machine",
"node",
"platform",
"processor",
"python_branch",
"python_compiler",
"python_implementation",
"python_revision",
"python_version",
"release",
"system",
"version",
"win32_edition",
]
for attr in platform_attrs:
method = getattr(platform, attr, None)
value = "unavailable"
if method:
value = method()
attr = attr.replace("_", " ").title()
info[attr] = value
return info
[docs]def calc_percent(part: Union[int, float], whole: Union[int, float], places: int = 2) -> float:
"""Calculate the percentage of part out of whole.
Args:
part: number to get percent of whole
whole: number to caclulate against part
places: number of decimal places to return
"""
if 0 in [part, whole]:
value = 0.00
elif part > whole:
value = 100.00
else:
value = 100 * (part / whole)
value = trim_float(value=value, places=places)
return value
[docs]def trim_float(value: float, places: int = 2) -> float:
"""Trim a float to N places.
Args:
value: float to trim
places: decimal places to trim value to
"""
if isinstance(places, int):
value = float(f"{value:.{places}f}")
return value
[docs]def join_kv(
obj: Union[List[dict], dict], listjoin: str = ", ", tmpl: str = "{k}: {v!r}"
) -> List[str]:
"""Join a dictionary into key value strings.
Args:
obj: dict or list of dicts to stringify
listjoin: string to use for joining
tmpl: template to format key value pairs of dict
"""
if isinstance(obj, list):
return [join_kv(obj=x, listjoin=listjoin, tmpl=tmpl) for x in obj]
if not isinstance(obj, dict):
raise ToolsError(f"Object must be a dict, supplied {type(obj)}")
items = []
for k, v in obj.items():
if isinstance(v, (list, tuple)):
v = listjoin.join([str(i) for i in v])
items.append(tmpl.format(k=k, v=v))
continue
if isinstance(v, dict):
items.append(f"{k}:")
items += join_kv(obj=v, listjoin=listjoin, tmpl=" " + tmpl)
continue
items.append(tmpl.format(k=k, v=v))
return items
[docs]def get_type_str(obj: Any):
"""Get the type name of a class.
Args:
obj: class or tuple of classes to get type name(s) of
"""
if isinstance(obj, tuple):
return " or ".join([x.__name__ for x in obj])
else:
return obj.__name__
[docs]def check_type(value: Any, exp: Any, name: str = "", exp_items: Optional[Any] = None):
"""Check that a value is the appropriate type.
Args:
value: value to check type of
exp: type(s) that value should be
name: identifier of what value is for
exp_items: if value is a list, type(s) that list items should be
"""
name = f" for {name!r}" if name else ""
if not isinstance(value, exp):
vtype = get_type_str(obj=type(value))
etype = get_type_str(obj=exp)
err = f"Required type {etype}{name} but received type {vtype}: {value!r}"
raise ToolsError(err)
if exp_items and isinstance(value, list):
for idx, item in enumerate(value):
if isinstance(item, exp_items):
continue
vtype = get_type_str(obj=type(item))
etype = get_type_str(obj=exp_items)
err = (
f"Required type {etype}{name} in list item {idx} but received "
f"type {vtype}: {value!r}"
)
raise ToolsError(err)
[docs]def check_empty(value: Any, name: str = ""):
"""Check if a value is empty.
Args:
value: value to check type of
name: identifier of what value is for
"""
if not value:
vtype = type(value).__name__
name = f" for {name!r}" if name else ""
err = f"Required value{name} but received an empty {vtype}: {value!r}"
raise ToolsError(err)
[docs]def get_raw_version(value: str) -> str:
"""Caclulate the raw bits of a version str.
Args:
value: version to calculate
"""
check_type(value=value, exp=str)
converted = "0"
version = value
if ":" in value:
if "." in value and value.index(":") > value.index("."):
raise ToolsError(f"Invalid version with ':' after '.' in {value!r}")
converted, version = value.split(":", 1)
octects = version.split(".")
for octect in octects:
if not octect.isdigit():
raise ToolsError(f"Invalid version with non-digit {octect!r} in {value!r}")
if len(octect) > 8:
octect = octect[:8]
converted += "".join(["0" for _ in range(8 - len(octect))]) + octect
return converted
[docs]def coerce_str_to_csv(
value: str,
coerce_list: bool = False,
errmsg: Optional[str] = None,
) -> List[str]:
"""Coerce a string into a list of strings.
Args:
value: string to seperate using comma
"""
pre = f"{errmsg}\n" if errmsg else ""
new_value = value
if isinstance(value, str):
new_value = [x.strip() for x in value.split(",") if x.strip()]
if not new_value:
raise ToolsError(f"{pre}Empty value after parsing CSV: {value!r}")
if not isinstance(new_value, (list, tuple)):
if coerce_list:
new_value = listify(obj=new_value)
else:
vtype = type(new_value).__name__
raise ToolsError(f"{pre}Invalid type {vtype} supplied, must be a list")
if not new_value:
raise ToolsError(f"{pre}Empty list supplied {value}")
return new_value
[docs]def parse_ip_address(value: str) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Parse a string into an IP address.
Args:
value: ip address
"""
try:
return ipaddress.ip_address(value)
except Exception as exc:
raise ToolsError(str(exc))
[docs]def parse_ip_network(value: str) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network]:
"""Parse a string into an IP network.
Args:
value: ip network
"""
if "/" not in str(value):
vtype = type(value).__name__
raise ToolsError(
(
f"Supplied value {value!r} of type {vtype} is not a valid subnet "
"- format must be <address>/<CIDR>."
)
)
try:
return ipaddress.ip_network(value)
except Exception as exc:
raise ToolsError(str(exc))
[docs]def kv_dump(obj: dict) -> str:
"""Get a string representation of a dictionaries key value pairs.
Args:
obj: dictionary to get string of
"""
return "\n " + "\n ".join([f"{k}: {v}" for k, v in obj.items()])
[docs]def bom_strip(content: Union[str, bytes], strip=True, bom: bytes = codecs.BOM_UTF8) -> str:
"""Remove the UTF-8 BOM marker from the beginning of a string.
Args:
content: string to remove BOM marker from if found
strip: remove whitespace before & after removing BOM marker
"""
content = content.strip() if strip else content
if isinstance(bom, bytes) and isinstance(content, str):
bom = bom.decode()
elif isinstance(bom, str) and isinstance(content, bytes):
bom = bom.encode()
bom_len = len(bom)
if content.startswith(bom):
content = content[bom_len:]
content = content.strip() if strip else content
return content
[docs]def read_stream(stream) -> str:
"""Try to read input from a stream.
Args:
stream: stdin or a file descriptor to read input from
"""
stream_name = format(getattr(stream, "name", stream))
if stream.isatty():
raise ToolsError(f"No input provided on {stream_name!r}")
# its STDIN with input or a file
content = stream.read().strip()
if not content:
raise ToolsError(f"Empty content supplied to {stream_name!r}")
return content
[docs]def check_gui_page_size(size: Optional[int] = None) -> int:
"""Check page size to see if it one of the valid GUI page sizes.
Args:
size: page size to check
Raises:
:exc:`ApiError`: if size is not one of
:data:`axonius_api_client.constants.api.GUI_PAGE_SIZES`
"""
size = size or GUI_PAGE_SIZES[0]
size = coerce_int(size)
if size not in GUI_PAGE_SIZES:
raise ToolsError(f"gui_page_size of {size} is invalid, must be one of {GUI_PAGE_SIZES}")
return size
[docs]def calc_gb(value: Union[str, int], places: int = 2, is_kb: bool = True) -> float:
"""Convert bytes into GB.
Args:
value: bytes
places: decimal places to trim value to
is_kb: values are in kb or bytes
"""
value = coerce_int_float(value=value)
value = value / 1024 / 1024
value = (value / 1024) if not is_kb else value
value = trim_float(value=value, places=places)
return value
[docs]def calc_perc_gb(
obj: dict,
whole_key: str,
part_key: str,
perc_key: Optional[str] = None,
places: int = 2,
update: bool = True,
is_kb: bool = True,
) -> dict:
"""Calculate the GB and percent from a dict.
Args:
obj: dict to get whole_key and part_key from
whole_key: key to get whole value from and convert to GB and set as whole_key_gb
part_key: key to get part value from and convert to GB and set as part_key_gb
perc_key: key to set percent in
is_kb: values are in kb or bytes
"""
perc_key = perc_key or f"{part_key}_percent"
whole_value = obj[whole_key] or 0
part_value = obj[part_key] or 0
whole_gb = calc_gb(value=whole_value, places=places, is_kb=is_kb)
part_gb = calc_gb(value=part_value, places=places, is_kb=is_kb)
perc = calc_percent(part=part_gb, whole=whole_gb, places=places)
ret = obj if update else {}
ret[f"{part_key}_gb"] = part_gb
ret[f"{whole_key}_gb"] = whole_gb
ret[perc_key] = perc
return ret
[docs]def get_subcls(cls: type, excludes: Optional[List[type]] = None) -> list:
"""Get all subclasses of a class."""
excludes = excludes or []
subs = [s for c in cls.__subclasses__() for s in get_subcls(c)]
return [x for x in list(set(cls.__subclasses__()).union(subs)) if x not in excludes]
[docs]def prettify_obj(obj: Union[dict, list], indent: int = 0) -> List[str]:
"""Pass."""
spaces = " " * indent
sub_indent = indent + 2
if isinstance(obj, dict):
lines = ["", f"{spaces}-----"] if not indent else []
for k, v in obj.items():
lines += [f"{spaces}- {k}:", *prettify_obj(v, sub_indent)]
return lines
elif isinstance(obj, list):
return [y for x in obj for y in prettify_obj(x, indent)]
return [f"{spaces} {obj}"]
[docs]def token_parse(obj: str) -> str:
"""Pass."""
url_check = "token="
if isinstance(obj, str) and url_check in obj:
idx = obj.index(url_check) + len(url_check)
obj = obj[idx:]
return obj
[docs]def combo_dicts(*args, **kwargs) -> dict:
"""Pass."""
# TBD make this descend
ret = {}
for x in args:
if isinstance(x, dict):
ret.update(x)
ret.update(kwargs)
return ret
[docs]def is_url(value: str) -> bool:
"""Pass."""
return isinstance(value, str) and any([value.startswith(x) for x in URL_STARTS])
[docs]def bytes_to_str(value: Any) -> Union[str, Any]:
"""Convert obj to str if it is bytes."""
return value.decode() if isinstance(value, bytes) else value
[docs]def strip_str(value: Any) -> Union[str, Any]:
"""Strip a value if it is a string."""
return value.strip() if isinstance(value, str) else value
[docs]def coerce_str(
value: Any,
strip: bool = True,
none: Any = "",
trim: Optional[int] = None,
trim_lines: bool = False,
trim_msg: str = TRIM_MSG,
) -> Union[str, Any]:
"""Coerce a value to a string."""
value = bytes_to_str(value=value)
if value is None:
value = none
if not isinstance(value, str):
value = str(value)
if strip:
value = strip_str(value=value)
value = str_trim(value=value, trim=trim, trim_lines=trim_lines, trim_msg=trim_msg)
return value
[docs]def str_trim(
value: str,
trim: Optional[int] = None,
trim_lines: bool = False,
trim_msg: str = TRIM_MSG,
) -> str:
"""Pass."""
trim_type = "lines" if trim_lines else "characters"
if isinstance(trim, int) and trim > 0:
trim_done = False
if trim_lines:
value = value.splitlines()
value_len = len(value)
if value_len >= trim:
value = value[:trim]
trim_done = True
value = "\n".join(value)
else:
value_len = len(value)
if value_len >= trim:
value = value[:trim]
trim_done = True
if trim_done:
value += trim_msg.format(trim_type=trim_type, trim=trim, value_len=value_len)
return value
[docs]def get_cls_path(value: Any) -> str:
"""Pass."""
if inspect.isclass(value):
cls = value
elif hasattr(value, "__class__"):
cls = value.__class__
else:
cls = value
if hasattr(cls, "__module__") and hasattr(cls, "__name__"):
return f"{cls.__module__}.{cls.__name__}"
return str(value)
[docs]def csv_writer(
rows: List[dict],
columns: Optional[List[str]] = None,
quotes: str = "nonnumeric",
dialect: str = "excel",
line_ending: str = "\n",
key_extra_error: bool = False,
key_missing_value: Optional[Any] = None,
) -> str: # pragma: no cover
"""Pass."""
quotes = getattr(csv, f"QUOTE_{quotes.upper()}")
if not columns:
columns = []
for row in rows:
columns += [x for x in row if x not in columns]
stream = io.StringIO()
writer = csv.DictWriter(
stream,
fieldnames=columns,
quoting=quotes,
lineterminator=line_ending,
dialect=dialect,
restval=key_missing_value,
extrasaction="raise" if key_extra_error else "ignore",
)
writer.writerow(dict(zip(columns, columns)))
writer.writerows(rows)
content = stream.getvalue()
stream.close()
return content
[docs]def parse_int_min_max(value, default=0, min_value=None, max_value=None):
"""Pass."""
if isinstance(value, str) and value.isdigit():
value = int(value)
if not isinstance(value, int):
value = default
if min_value is not None and value < min_value:
value = default
if max_value is not None and value > max_value:
value = default
return value
[docs]def safe_replace(obj: dict, value: str) -> str:
"""Pass."""
for search, replace in obj.items():
if isinstance(search, str) and isinstance(replace, str) and search and search in value:
value = value.replace(search, replace)
return value
[docs]def int_days_map(value: Union[str, List[Union[str, int]]], names: bool = False) -> List[str]:
"""Pass."""
ret = []
value = coerce_str_to_csv(value=value, coerce_list=True)
valid = ", ".join([f"{v} ({k})" for k, v in DAYS_MAP.items()])
for item in value:
found = False
for number, name in DAYS_MAP.items():
if isinstance(item, str) and item.lower() == name.lower():
ret.append(number)
found = True
if (isinstance(item, str) and item.isdigit()) or isinstance(item, int):
item = coerce_int(
obj=item,
min_value=0,
max_value=6,
errmsg=f"Invalid day {item!r} supplied, valid: {valid}",
)
if item == number:
ret.append(number)
found = True
if not found:
item = str(item)
raise ToolsError(f"Invalid day {item!r} supplied, valid: {valid}")
if names:
ret = [v for k, v in DAYS_MAP.items() if k in ret]
else:
ret = [str(k) for k, v in DAYS_MAP.items() if k in ret]
return ret
[docs]def lowish(value: Any) -> Any:
"""Pass."""
if isinstance(value, (list, tuple)):
return [lowish(x) for x in value]
return value.lower() if isinstance(value, str) else value