# -*- coding: utf-8 -*-
"""Base callbacks."""
import logging
import pathlib
import re
import sys
import typing as t
from typing import IO, Generator, List, Optional, Tuple, Union
from ... import DEFAULT_PATH
from ...constants.api import FIELD_JOINER, FIELD_TRIM_LEN, FIELD_TRIM_STR
from ...constants.fields import (
AGG_ADAPTER_NAME,
FIELDS_DETAILS,
FIELDS_DETAILS_EXCLUDE,
FIELDS_ENTITY_PASSTHRU,
SCHEMAS_CUSTOM,
)
from ...exceptions import ApiError
from ...tools import (
PathLike,
calc_percent,
check_path_is_not_dir,
coerce_int,
dt_now,
echo_debug,
echo_error,
echo_ok,
echo_warn,
get_path,
get_paths_format,
is_subclass_safe,
join_kv,
listify,
longest_str,
path_backup_file,
strip_right,
)
# noinspection SpellCheckingInspection
[docs]def crjoin(value):
"""Pass."""
joiner = "\n - "
return joiner + joiner.join(value)
# noinspection PyProtectedMember,PyAttributeOutsideInit
[docs]class Base:
"""Callbacks for formatting asset data.
Examples:
* :meth:`args_map` for callback generic arguments to format assets.
* :meth:`args_map_custom` for callback specific arguments to format and export data.
"""
[docs] @classmethod
def args_map(cls) -> dict:
"""Get all the argument names and their defaults for this callbacks object.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume
``apiobj`` is either ``client.devices`` or ``client.users``
>>> import axonius_api_client as axonapi
>>> connect_args: dict = axonapi.get_env_connect()
>>> client: axonapi.Connect = axonapi.Connect(**connect_args)
>>> apiobj: axonapi.api.assets.AssetMixin = client.devices
>>> # or client.users or client.vulnerabilities
Flatten complex fields - Will take all sub-fields of complex fields and put them
on the root level with their values index correlated to each other.
>>> assets = apiobj.get(fields=["network_interfaces"], field_flatten=True)
Explode a single field - will take that field and create new rows for list item.
>>> assets = apiobj.get(field_explode="hostname")
Exclude fields - Will remove fields from the final output.
>>> assets = apiobj.get(field_excludes=["internal_axon_id", "adapter_list_length"])
Use field titles - Will change internal field names to their titles.
>>> assets = apiobj.get(field_titles=True)
Join fields - Will join multi value fields with carriage returns.
>>> assets = apiobj.get(field_join=True)
Join fields with no trim and custom join value - Will join multi value fields
with ``;;`` and do not trim the joined value to excels maximum cell length.
>>> assets = apiobj.get(field_join=True, field_join_value=";;", field_join_trim=0)
Add fields as empty values for fields that did not return.
>>> assets = apiobj.get(field_null=True)
Add fields as empty values for fields that did not return with a custom null value.
>>> assets = apiobj.get(
... field_null=True,
... field_null_value="EMPTY",
... field_null_value_complex="EMPTY LIST",
... )
Add and remove tags to all assets returned.
>>> assets = apiobj.get(tags_add=["tag1", "tag2"], tags_remove=["tag3", "tag4"])
Generate a report of adapters that are missing from each asset.
>>> assets = apiobj.get(report_adapters_missing=True)
Generate a report of installed software that does not match a list of regex for each
asset.
>>> assets = apiobj.get(report_software_whitelist=["chrome", "^adobe.*acrobat"])
Echo to STDERR progress messages.
>>> assets = apiobj.get(do_echo=True)
Change the amount of assets that echo page progress when do_echo is true.
>>> assets = apiobj.get(do_echo=True, page_progress=100)
Supply a set of custom callbacks to process each row before all builtin callbacks
are run. Custom callbacks receive two arguments: ``self`` (the current callback object)
and ``rows`` (the current rows being processed). Custom callbacks must return a list of
rows.
>>> def custom_cb1(self, rows):
... for row in rows:
... row["internal_axon_id"] = row["internal_axon_id"].upper()
... return rows
...
>>> assets = apiobj.get(custom_cbs=[custom_cb1])
See Also:
* :meth:`args_map_custom` for callback specific arguments to format and export data.
Notes:
These arguments can be supplied as extra kwargs passed to
:meth:`axonius_api_client.api.assets.users.Users.get` or
:meth:`axonius_api_client.api.assets.devices.Devices.get`
"""
args = {}
args.update(cls.args_map_base())
args.update(cls.args_map_custom())
return args
[docs] @classmethod
def args_map_custom(cls) -> dict:
"""Get the custom argument names and their defaults for this callbacks object.
See Also:
:meth:`args_map` for the arguments for all callback objects.
Notes:
This callback object has no custom arguments.
"""
return {}
[docs] @classmethod
def args_map_base(cls) -> dict:
"""Get the map of arguments that can be supplied to GETARGS."""
return {
"field_excludes": [],
"field_flatten": False,
"field_explode": None,
"field_titles": False,
"field_compress": False,
"field_replace": [],
"field_join": False,
"field_join_value": FIELD_JOINER,
"field_join_trim": FIELD_TRIM_LEN,
"field_null": False,
"field_null_value": None,
"field_null_value_complex": [],
"tags_add": [],
"expirable_tags": [],
"tags_add_invert_selection": False,
"tags_remove": [],
"tags_remove_invert_selection": False,
"report_adapters_missing": False,
"report_software_whitelist": [],
"page_progress": 10000,
"do_echo": False,
"custom_cbs": [],
"debug_timing": False,
"explode_entities": False,
"include_dates": False,
"csv_field_flatten": True,
"csv_field_join": True,
"csv_field_null": True,
}
[docs] def get_arg_value(self, arg: str) -> t.Any:
"""Get an argument value.
Args:
arg: key to get from :attr:`GETARGS` with a default value from :meth:`args_map`
"""
return self.GETARGS.get(arg, self.args_map()[arg])
[docs] def set_arg_value(self, arg: str, value: t.Any):
"""Set an argument value.
Args:
arg: key to set in :attr:`GETARGS`
value: value to set for key
"""
self.GETARGS[arg] = value
[docs] def __init__(
self,
apiobj,
store: dict,
state: Optional[dict] = None,
getargs: dict = None,
):
"""Callbacks base class for assets.
Args:
apiobj (:obj:`axonius_api_client.api.assets.asset_mixin.AssetMixin`): Asset object
that created this callback
store: store tracker of get method that created this callback
state: state tracker of get method that created this callback
getargs: kwargs passed to assets get method that created this callback
"""
self.LOG: logging.Logger = apiobj.LOG.getChild(self.__class__.__name__)
"""logger for this object."""
self.APIOBJ = apiobj
self.ALL_SCHEMAS: dict = apiobj.fields.get()
self.STATE: dict = state or {}
self.STORE: dict = store or {}
self.CURRENT_ROWS: List[dict] = []
self.GETARGS: dict = getargs or {}
self.TAG_ROWS_ADD: List[dict] = []
self.TAG_ROWS_REMOVE: List[dict] = []
self.CUSTOM_CB_EXC: List[dict] = []
self._init()
[docs] def _init(self):
"""Post init setup."""
pass
[docs] def start(self, **kwargs):
"""Start this callbacks object."""
self.echo(msg=f"Starting {self}")
excludes = listify(self.get_arg_value("field_excludes"))
explode_entities = self.get_arg_value("explode_entities")
include_details = self.STORE.get("include_details", False)
if explode_entities:
if not include_details:
self.echo(
msg=f"Enabling 'include_details' due to 'explode_entities'={explode_entities}",
debug=True,
)
self.STORE["include_details"] = include_details = True
if include_details:
missing = [x for x in FIELDS_DETAILS_EXCLUDE if x not in excludes]
self.echo(msg=f"Adding fields {missing} to field_excludes: {excludes}", debug=True)
self.set_arg_value("field_excludes", value=excludes + missing)
cb_args = crjoin(join_kv(obj=self.GETARGS))
self.LOG.debug(f"Get Extra Arguments: {cb_args}")
config = crjoin(self.args_strs)
self.echo(msg=f"Configuration: {config}")
store = crjoin(join_kv(obj=self.STORE))
self.echo(msg=f"Get Arguments: {store}")
# noinspection PyUnusedLocal
[docs] def echo_columns(self, **kwargs):
"""Echo the columns of the fields selected."""
if getattr(self, "ECHO_DONE", False):
return
schemas_pretty = self.APIOBJ.fields._prettify_schemas(schemas=self.schemas_selected)
schemas_pretty = crjoin(schemas_pretty)
self.echo(msg=f"Selected Columns: {schemas_pretty}")
if self.excluded_schemas:
schemas_pretty = self.APIOBJ.fields._prettify_schemas(schemas=self.excluded_schemas)
schemas_pretty = crjoin(schemas_pretty)
self.echo(msg=f"Excluded Columns: {schemas_pretty}")
final_columns = crjoin(self.final_columns)
self.echo(msg=f"Final Columns: {final_columns}")
self.ECHO_DONE = True
[docs] def stop(self, **kwargs):
"""Stop this callbacks object."""
self.do_tagging()
self.echo(msg=f"Stopping {self}")
[docs] def echo_page_progress(self):
"""Echo progress per N rows using an echo method."""
page_progress = self.get_arg_value("page_progress")
if not page_progress or not isinstance(page_progress, int):
return
proc = self.STATE.get("rows_processed_total", 0) or 0
total = self.STATE.get("rows_to_fetch_total", 0) or 0
taken = self.STATE.get("fetch_seconds_total", 0) or 0
page_total = self.STATE.get("pages_to_fetch_total", 0) or 0
page_num = self.STATE.get("page_number", 0) or 0
if not ((proc % page_progress == 0) or (proc >= total) or (proc <= 1)):
return
percent = calc_percent(part=proc, whole=total)
percent = f"{percent:.2f}%"
percent = f"{percent:>7}"
total_len = len(str(total))
rows = f"[ROWS: {proc:>{total_len}} / {total}]"
page_total_len = len(str(page_total))
pages = f"[PAGES: {page_num:>{page_total_len}} / {page_total}]"
taken = f"{taken:.2f} seconds so far"
self.echo(msg=f"PROGRESS: {percent} {rows} {pages} in {taken}")
[docs] def do_pre_row(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Pre-processing callbacks for current row.
Args:
rows: rows to process
"""
rows = listify(rows)
self.CURRENT_ROWS = rows
self.STATE.setdefault("rows_processed_total", 0)
self.STATE["rows_processed_total"] += 1
self.echo_columns()
self.echo_page_progress()
return rows
[docs] def process_row(self, row: Union[List[dict], dict]) -> List[dict]:
"""Process the callbacks for current row.
Args:
row: row to process
"""
rows = listify(row)
rows = self.do_pre_row(rows=rows)
rows = self.do_row(rows=rows)
return rows
@property
def callbacks(self) -> list:
"""Get order of callbacks to run."""
return [
self.do_custom_cbs,
self.process_tags_to_add,
self.process_tags_to_remove,
self.add_report_adapters_missing,
self.add_report_software_whitelist,
self.add_include_dates,
self.do_excludes,
self.do_add_null_values,
self.do_explode_entities,
self.do_flatten_fields,
self.do_explode_field,
self.do_join_values,
self.do_change_field_titles,
self.do_change_field_compress,
self.do_change_field_replace,
]
[docs] def do_row(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Execute the callbacks for current row.
Args:
rows: rows to process
"""
debug_timing = self.get_arg_value("debug_timing")
p_start = None
cb_start = None
if debug_timing: # pragma: no cover
p_start = dt_now()
for cb in self.callbacks:
if debug_timing: # pragma: no cover
cb_start = dt_now()
rows = cb(rows=rows)
# print(f"{cb} {json_dump(rows)}")
if debug_timing and cb_start: # pragma: no cover
cb_delta = dt_now() - cb_start
self.LOG.debug(f"CALLBACK {cb} took {cb_delta} for {len(rows)} rows")
if debug_timing and p_start: # pragma: no cover
p_delta = dt_now() - p_start
self.LOG.debug(f"CALLBACKS TOOK {p_delta} for {len(rows)} rows")
return rows
[docs] def do_custom_cbs(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Execute any custom callbacks for current row.
Args:
rows: rows to process
"""
rows = listify(rows)
custom_cbs = listify(self.get_arg_value("custom_cbs"))
for custom_cb in custom_cbs:
try:
rows = custom_cb(self=self, rows=rows)
rows = listify(rows)
except Exception as exc:
msg = f"Custom callback {custom_cb} failed: {exc}"
self.CUSTOM_CB_EXC.append({"cb": custom_cb, "exc": exc, "msg": msg})
self.echo(msg=msg, error="exception", abort=False)
return rows
[docs] def do_add_null_values(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Null out missing fields.
Args:
rows: rows to process
"""
rows = listify(rows)
field_null = self.get_arg_value("field_null")
if not field_null:
return rows
for row in rows:
for schema in self.schemas_selected:
self._do_add_null_values(row=row, schema=schema)
return rows
[docs] def _do_add_null_values(self, row: dict, schema: dict, key: str = "name_qual"):
"""Null out missing fields.
Args:
row: row being processed
schema: field schema to add null values for
key: key of field schema to add null value for in row
"""
if self.is_excluded(schema=schema) or schema.get("is_details", False):
return row
null_value = self.get_arg_value("field_null_value")
complex_null_value = self.get_arg_value("field_null_value_complex")
field = schema[key]
if schema["is_complex"]:
if field not in row:
row[field] = complex_null_value
for item in row[field]:
for sub_schema in self.get_sub_schemas(schema=schema):
self._do_add_null_values(schema=sub_schema, row=item, key="name")
else:
if field not in row:
row[field] = null_value
return row
[docs] def do_excludes(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Asset callback to remove fields from row.
Args:
rows: rows to process
"""
rows = listify(rows)
if not self.get_arg_value("field_excludes"):
return rows
for row in rows:
self._do_excludes(row=row)
return rows
[docs] def _do_excludes(self, row: dict):
"""Asset callback to remove fields from row.
Args:
row: row being processed
"""
for schema in self.schemas_selected:
field = schema["name_qual"]
if self.is_excluded(schema=schema) and field in row:
row.pop(field)
continue
if schema["is_complex"]:
items = listify(row.get(field, []))
for sub_schema in schema["sub_fields"]:
if self.is_excluded(schema=sub_schema):
sub_field = sub_schema["name"]
for item in items:
if sub_field in item:
item.pop(sub_field)
[docs] def do_join_values(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Join values.
Args:
rows: rows to process
"""
rows = listify(rows)
if not self.get_arg_value("field_join"):
return rows
for row in rows:
self._do_join_values(row=row)
return rows
[docs] def _do_join_values(self, row: dict):
"""Join values.
Args:
row: row being processed
"""
joiner = str(self.get_arg_value("field_join_value"))
trim_len = coerce_int(self.get_arg_value("field_join_trim"))
trim_str = FIELD_TRIM_STR
for field in row:
value = row[field]
if isinstance(value, list):
row[field] = value = joiner.join([str(x) for x in value])
if trim_len and isinstance(value, str) and len(value) >= trim_len:
field_len = len(value)
msg = trim_str.format(field_len=field_len, trim_len=trim_len)
value = [value[:trim_len], msg]
row[field] = joiner.join(value)
[docs] def do_change_field_replace(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Asset callback to replace characters.
Args:
rows: rows to process
"""
rows = listify(rows)
if not self.field_replacements:
return rows
rows = [{self._field_replace(key=k): v for k, v in row.items()} for row in rows]
return rows
[docs] def _field_replace(self, key: str) -> str:
"""Parse fields into required format."""
if self.field_replacements:
for x, y in self.field_replacements:
key = key.replace(x, y)
return key
@property
def field_replacements(self) -> List[Tuple[str, str]]:
"""Parse the supplied list of field name replacements."""
def parse_replace(replace):
"""Parse the supplied list of field name replacements."""
if isinstance(replace, str):
replace = replace.split("=", maxsplit=1)
if not isinstance(replace, (tuple, list)) or not replace[0]:
replace = []
if len(replace) == 1:
replace = [replace[0], ""]
return replace
if not hasattr(self, "_field_replacements"):
replaces = listify(self.get_arg_value("field_replace"))
replaces = [parse_replace(x) for x in replaces]
self._field_replacements = [x for x in replaces if x]
return self._field_replacements
[docs] def do_change_field_compress(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Asset callback to shorten field names.
Args:
rows: rows to process
"""
rows = listify(rows)
if not self.get_arg_value("field_compress"):
return rows
rows = [{self._field_compress(key=k): v for k, v in row.items()} for row in rows]
return rows
[docs] def _field_compress(self, key: str) -> str:
"""Parse fields into required format."""
if not self.get_arg_value("field_compress"):
return key
splits = key.split(".")
if splits[0] == "specific_data":
prefix = AGG_ADAPTER_NAME
# remove 'specific_data.data'
splits = splits[2:]
elif splits[0] == "adapters_data":
prefix = strip_right(obj=splits[1], fix="_adapter")
# remove 'adapters_data.aws_adapter'
splits = splits[2:]
else:
return key
new_key = ".".join(splits)
return ":".join([x for x in [prefix, new_key] if x])
[docs] def do_change_field_titles(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Asset callback to change qual name to title.
Args:
rows: rows to process
"""
rows = listify(rows)
if not self.get_arg_value("field_titles"):
return rows
for row in rows:
self._do_change_field_titles(row=row)
return rows
[docs] def _do_change_field_titles(self, row: dict):
"""Asset callback to change qual name to title.
Args:
row: row being processed
"""
null_value = self.get_arg_value("field_null_value")
complex_null_value = self.get_arg_value("field_null_value_complex")
for schema in self.final_schemas:
title = schema["column_title"]
name = schema["name_qual"]
is_complex = schema["is_complex"]
default = complex_null_value if is_complex else null_value
row[title] = row.pop(name, default)
[docs] def do_flatten_fields(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Asset callback to flatten complex fields.
Args:
rows: rows to process
"""
rows = listify(rows)
if not self.get_arg_value("field_flatten"):
return rows
for row in rows:
for schema in self.schemas_selected:
if self.schema_to_explode != schema and not schema.get("is_details", False):
self._do_flatten_fields(row=row, schema=schema)
return rows
[docs] def _do_flatten_fields(self, row: dict, schema: dict):
"""Asset callback to flatten complex fields.
Args:
row: row being processed
schema: schema to use when flattening
"""
if self.is_excluded(schema=schema):
return
if not schema["is_complex"]:
return
null_value = self.get_arg_value("field_null_value")
field = schema["name_qual"]
# remove the complex field, i.e. specific_data.data.network_interfaces
# force it into a list of items
items = listify(row.pop(field, []))
for sub_schema in self.get_sub_schemas(schema=schema):
sub_field = sub_schema["name_qual"]
sub_short = sub_schema["name"]
# for each sub-field, ensure there is an empty list to store values
row[sub_field] = []
# for each complex item, remove the sub-field, force it into a list,
# and append it to the sub-fields fully qualified name at the root row level
for item in items:
value = item.pop(sub_short, null_value)
value = value if isinstance(value, list) else [value]
row[sub_field] += value
[docs] def do_explode_entities(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Explode a row into a row for each asset entity.
Args:
rows: rows being processed
"""
rows = listify(rows)
explode = self.get_arg_value("explode_entities")
if not explode:
return rows
new_rows = []
for row in rows:
new_rows += self._do_explode_entities(row=row)
return new_rows
[docs] def _do_explode_entities(self, row: dict) -> List[dict]:
"""Explode a row into multiple rows for each asset entity.
Args:
row: row being processed
"""
def explode(idx: int, adapter: str) -> dict:
"""Explode a row into a row for each asset entity."""
new_row = {"adapters": adapter}
for k, v in row.items():
if k in FIELDS_ENTITY_PASSTHRU or k.endswith("_preferred"):
new_row[k] = v
continue
if (k in FIELDS_DETAILS or k.endswith("_details")) and not k.endswith(
"_preferred_details"
):
new_k = strip_right(obj=k, fix="_details") if k not in FIELDS_DETAILS else k
try:
new_row.setdefault(new_k, v[idx])
except Exception:
msg = (
f"Adapters length {adapters_cnt} != details length {len(v)} on {k}: {v}"
)
self.echo(msg=msg, warning=True)
continue
return new_row
adapters = row.get("adapters") or []
adapters_cnt = len(adapters)
new_rows = [explode(idx=idx, adapter=adapter) for idx, adapter in enumerate(adapters)]
return new_rows
[docs] def do_explode_field(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Explode a field into multiple rows.
Args:
rows: rows being processed
"""
rows = listify(rows)
explode = self.get_arg_value("field_explode")
if not explode or self.is_excluded(schema=self.schema_to_explode):
return rows
new_rows = []
for row in rows:
new_rows += self._do_explode_field(row=row)
return new_rows
[docs] def _do_explode_field(self, row: dict) -> List[dict]:
"""Explode a field into multiple rows.
Args:
row: row being processed
"""
null_value = self.get_arg_value("field_null_value")
schema = self.schema_to_explode
field = schema["name_qual"]
if len(listify(row.get(field, []))) <= 1: # pragma: no cover
self._do_flatten_fields(row=row, schema=schema)
return [row]
items = listify(row.pop(field, []))
new_rows_map = {}
for idx, item in enumerate(items):
new_rows_map[idx] = dict(row)
if schema["is_complex"]:
for sub_schema in self.get_sub_schemas(schema=schema):
value = item.pop(sub_schema["name"], null_value)
new_rows_map[idx][sub_schema["name_qual"]] = value
else:
new_rows_map[idx][schema["name_qual"]] = item
return [new_rows_map[idx] for idx in new_rows_map]
[docs] def do_tagging(self):
"""Add or remove tags to assets."""
self.do_tag_add()
self.do_tag_remove()
[docs] def do_tag_add(self):
"""Add tags to assets."""
tags = listify(self.get_arg_value("tags_add"))
expirable_tags = self.get_arg_value("expirable_tags")
rows = self.TAG_ROWS_ADD
invert_selection = self.get_arg_value("tags_add_invert_selection")
count_tags = len(tags)
count_supplied = len(rows)
msgs = [
f" Tags supplied ({count_tags}): {tags}",
f" Asset IDs supplied ({count_supplied})",
f" Invert selection: {invert_selection}",
]
if tags:
self.echo(["Performing API call to add tags to assets", *msgs])
count_modified = self.APIOBJ.labels.add(
rows=rows, labels=tags, invert_selection=invert_selection, expirable_tags=expirable_tags,
)
self.echo(msg=[f"API added tags to {count_modified} assets", *msgs])
[docs] def do_tag_remove(self):
"""Remove tags from assets."""
tags = listify(self.get_arg_value("tags_remove"))
rows = self.TAG_ROWS_REMOVE
invert_selection = self.get_arg_value("tags_remove_invert_selection")
count_tags = len(tags)
count_supplied = len(rows)
msgs = [
f" Asset IDs supplied ({count_supplied})",
f" Tags supplied ({count_tags}): {tags}",
f" Invert selection: {invert_selection}",
]
if tags:
self.echo(["Performing API call to remove tags from assets", *msgs])
count_modified = self.APIOBJ.labels.remove(
rows=rows, labels=tags, invert_selection=invert_selection
)
msgs = [
f"API finished removing tags from assets",
f" Asset IDs modified: " f"{count_modified}",
*msgs,
]
self.echo(msg=msgs)
[docs] def add_report_software_whitelist(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Process report: Software whitelist.
Args:
rows: rows to process
"""
rows = listify(rows)
whitelists = listify(self.get_arg_value("report_software_whitelist"))
if not whitelists:
return rows
for row in rows:
self._add_report_software_whitelist(row=row)
return rows
[docs] def _add_report_software_whitelist(self, row: dict):
"""Process report: Software whitelist.
Args:
row: row being processed
"""
whitelists = listify(self.get_arg_value("report_software_whitelist"))
sw_field = "specific_data.data.installed_software"
if sw_field not in self.fields_selected:
msg = f"Must include field (column) {sw_field!r}"
self.echo(msg=msg, error=ApiError, level="error")
sws = listify(row.get(sw_field, []))
names = [x.get("name") for x in sws if x.get("name") and isinstance(x.get("name"), str)]
extras = [n for n in names if any([re.search(x, n, re.I)] for x in whitelists)]
missing = [x for x in whitelists if any([re.search(x, n, re.I) for n in names])]
schemas = SCHEMAS_CUSTOM["report_software_whitelist"]
row[schemas["software_missing"]["name_qual"]] = sorted(list(set(missing)))
row[schemas["software_whitelist"]["name_qual"]] = whitelists
row[schemas["software_extra"]["name_qual"]] = sorted(list(set(extras)))
[docs] def add_include_dates(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Process report: Add dates (history and current)
Args:
rows: rows to process
"""
def _add_date(row):
row.update(updater)
return row
rows = listify(rows)
enabled = self.get_arg_value("include_dates")
if not enabled:
return rows
history_date = self.STORE.get("history_date_parsed")
current_date = str(dt_now())
schemas = SCHEMAS_CUSTOM["include_dates"]
updater = {
schemas["history_date"]["name_qual"]: history_date,
schemas["current_date"]["name_qual"]: current_date,
}
rows = [_add_date(row) for row in rows]
return rows
[docs] def add_report_adapters_missing(self, rows: Union[List[dict], dict]) -> List[dict]:
"""Process report: Missing adapters.
Args:
rows: rows to process
"""
rows = listify(rows)
missing = self.get_arg_value("report_adapters_missing")
if not missing:
return rows
for row in rows:
self._add_report_adapters_missing(row=row)
return rows
[docs] def _add_report_adapters_missing(self, row: dict):
"""Process report: Missing adapters.
Args:
row: row being processed
"""
schemas = SCHEMAS_CUSTOM["report_adapters_missing"]
schema = schemas["adapters_missing"]
field_name = schema["name_qual"]
adapters_row = row.get("adapters", [])
adapter_map = self.adapter_map
missing = []
for adapter in adapter_map["all"]:
if adapter in adapters_row:
continue
if adapter not in adapter_map["all_fields"]:
continue
if adapter not in missing:
missing.append(adapter)
row[field_name] = missing
# TBD: make this support normal field selection concepts
[docs] def is_excluded(self, schema: dict) -> bool:
"""Check if a name supplied to field_excludes matches one of FIND_KEYS.
Args:
schema: field schema
"""
for excluded_schema in self.excluded_schemas:
for key in self.FIND_KEYS:
schema_key = schema.get(key, None)
excluded_key = excluded_schema.get(key, None)
if (schema_key and excluded_schema) and (schema_key == excluded_key):
return True
return False
@property
def excluded_schemas(self) -> List[dict]:
"""List of all schemas that should be excluded."""
if not hasattr(self, "_excluded_schemas"):
excludes = listify(self.get_arg_value("field_excludes"))
self._excluded_schemas = self.APIOBJ.fields.get_field_names_eq(
value=excludes, key=None, selectable_only=False
)
return self._excluded_schemas
[docs] def echo(
self,
msg: t.Union[str, t.List[str]],
debug: bool = False,
error: Union[bool, str, t.Type[Exception]] = False,
warning: bool = False,
level: str = "info",
level_debug: str = "debug",
level_error: str = "error",
level_warning: str = "warning",
abort: bool = True,
):
"""Echo a message to console or log it.
Args:
msg: message to echo
error: message is an error
warning: message is a warning
level: logging level for non error/non warning messages
level_debug: logging level for debug messages
level_error: logging level for error messages
level_warning: logging level for warning messages
abort: sys.exit(1) if error is true
debug: message is a debug message
"""
do_echo = self.get_arg_value("do_echo")
msg = "\n".join(listify(msg))
if do_echo:
if error:
echo_error(msg=msg, abort=abort)
elif warning:
echo_warn(msg=msg)
elif debug:
echo_debug(msg=msg)
else:
echo_ok(msg=msg)
else:
if error:
getattr(self.LOG, level_error)(msg)
if abort:
if not is_subclass_safe(error, Exception):
error = ApiError
raise error(msg)
elif warning:
getattr(self.LOG, level_warning)(msg)
elif debug:
getattr(self.LOG, level_debug)(msg)
else:
getattr(self.LOG, level)(msg)
[docs] def get_sub_schemas(self, schema: dict) -> Generator[dict, None, None]:
"""Get all the schemas of sub-fields for a complex field.
Args:
schema: schema of complex field
"""
sub_schemas = listify(schema.get("sub_fields"))
for sub_schema in sub_schemas:
if self.is_excluded(schema=sub_schema) or not sub_schema["is_root"]:
continue
yield sub_schema
@property
def custom_schemas(self) -> List[dict]:
"""Get the custom schemas based on GETARGS."""
schemas = []
if self.get_arg_value("report_adapters_missing"):
schemas += list(SCHEMAS_CUSTOM["report_adapters_missing"].values())
if self.get_arg_value("report_software_whitelist"):
schemas += list(SCHEMAS_CUSTOM["report_software_whitelist"].values())
if self.get_arg_value("include_dates"):
schemas += list(SCHEMAS_CUSTOM["include_dates"].values())
return schemas
@property
def final_schemas(self) -> List[dict]:
"""Get the schemas that will be returned."""
if hasattr(self, "_final_schemas"):
return self._final_schemas
flat = self.get_arg_value("field_flatten")
explode_field = self.schema_to_explode.get("name_qual", "")
explode_entities = self.get_arg_value("explode_entities")
final = {}
for schema in self.schemas_selected:
if self.is_excluded(schema=schema):
continue
name = schema["name_qual"]
if explode_entities and name.endswith("_details") and name not in FIELDS_DETAILS:
continue
is_explode_field = name == explode_field
if schema["is_complex"] and (is_explode_field or flat):
for sub_schema in self.get_sub_schemas(schema=schema):
final[sub_schema["name_qual"]] = sub_schema
else:
final.setdefault(name, schema)
self._final_schemas = list(final.values())
return self._final_schemas
@property
def final_columns(self) -> List[str]:
"""Get the columns that will be returned."""
def get_key(s):
"""Get the key for a schema."""
return self._field_replace(self._field_compress(s[key]))
if hasattr(self, "_final_columns"):
return self._final_columns
use_titles = self.get_arg_value("field_titles")
key = "column_title" if use_titles else "name_qual"
self._final_columns = [get_key(s) for s in self.final_schemas]
return self._final_columns
@property
def fields_selected(self) -> List[str]:
"""Get the names of the fields that were selected."""
if hasattr(self, "_fields_selected"):
return self._fields_selected
include_details = self.STORE.get("include_details", False)
fields = listify(self.STORE.get("fields_parsed", []))
api_fields = [x for x in self.APIOBJ.FIELDS_API if x not in fields]
if include_details:
api_fields += FIELDS_DETAILS
self._fields_selected = []
for field in api_fields + fields:
self._fields_selected.append(field)
if include_details and field not in FIELDS_DETAILS and not field.endswith("_details"):
field_details = f"{field}_details"
self._fields_selected.append(field_details)
for row in self.CURRENT_ROWS:
self._fields_selected += [x for x in row if x not in self._fields_selected]
return self._fields_selected
# TBD move _details fields from selected to final?!
@property
def schemas_selected(self) -> List[dict]:
"""Get the schemas of the fields that were selected."""
if hasattr(self, "_schemas_selected"):
return self._schemas_selected
self._schemas_selected = self.custom_schemas + self.APIOBJ.fields.get_field_names_eq(
value=self.fields_selected, key=None, fields_error=False, selectable_only=False
)
return self._schemas_selected
@property
def schema_to_explode(self) -> dict:
"""Get the schema of the field that should be exploded."""
if hasattr(self, "_schema_to_explode"):
return self._schema_to_explode
explode = self.get_arg_value("field_explode")
self._schema_to_explode = {}
if not explode:
return self._schema_to_explode
valids = []
for schema in self.schemas_selected:
for key in self.FIND_KEYS:
name = schema.get(key)
if name:
valids.append(name)
if name == explode:
self._schema_to_explode = schema
return self._schema_to_explode
valids = sorted(list(set(valids)))
self.echo(msg=f"Explode field {explode!r} not found, valid fields:{valids}", error=ApiError)
@property
def adapter_map(self) -> dict:
"""Build a map of adapters that have connections."""
if getattr(self, "_adapter_map", None):
return getattr(self, "_adapter_map", None)
self._adapters_meta = getattr(
self, "_adapters_meta", self.APIOBJ.adapters.get(get_clients=False)
)
self._adapter_map = {}
self._adapter_map["has_cnx"] = has_cnx = []
self._adapter_map["all"] = all_adapters = []
self._adapter_map["all_fields"] = [f"{x}_adapter" for x in self.ALL_SCHEMAS]
for adapter in self._adapters_meta:
name_raw = adapter["name_raw"]
cnt = adapter["cnx_count_total"]
if name_raw not in all_adapters:
all_adapters.append(name_raw)
if cnt and name_raw not in has_cnx:
has_cnx.append(name_raw)
return self._adapter_map
@property
def args_strs(self) -> List[str]:
"""Get a list of strings that describe each arg in :meth:`args_map`."""
lines = []
arg_desc = {k: v for k, v in ARG_DESCRIPTIONS.items() if k in self.args_map()}
longest = longest_str(list(arg_desc.values()))
for arg in self.args_map():
desc = f"{arg_desc[arg]}:"
value = self.get_arg_value(arg)
if isinstance(value, str):
value = repr(value)
lines.append(f"{desc:{longest}}{value}")
return lines
[docs] def __str__(self) -> str:
"""Show info for this object."""
return f"{self.CB_NAME.upper()} processor"
[docs] def __repr__(self) -> str:
"""Show info for this object."""
return self.__str__()
CB_NAME: str = "base"
"""name for this callback"""
FIND_KEYS: List[str] = ["name", "name_qual", "column_title"]
"""field schema keys to use when finding a fields schema"""
APIOBJ = None
""":obj:`axonius_api_client.api.assets.asset_mixin.AssetMixin`: assets object."""
ALL_SCHEMAS: dict = None
"""Map of adapter -> field schemas."""
STATE: dict = None
"""state dict used by get assets method to track paging."""
STORE: dict = None
"""store dict used by get assets method to track arguments."""
CURRENT_ROWS: t.Optional[t.List[dict]] = None
"""current rows being processed"""
GETARGS: dict = None
"""original kwargs supplied to get assets method."""
TAG_ROWS_ADD: List[dict] = None
"""tracker of assets to add tags to in :meth:`do_tagging`."""
TAG_ROWS_REMOVE: List[dict] = None
"""tracker of assets to remove tags from in :meth:`do_tagging`."""
CUSTOM_CB_EXC: List[dict] = None
"""tracker of custom callbacks that have been executed by :meth:`do_custom_cbs`"""
# noinspection PyAttributeOutsideInit
[docs]class ExportMixins(Base):
"""Export mixins for callbacks."""
[docs] @classmethod
def args_map_export(cls) -> dict:
"""Get the export argument names and their defaults for this callbacks object.
See Also:
:meth:`args_map_custom` for the arguments specific to this callback object.
:meth:`args_map` for the arguments for all callback objects.
"""
return {
"export_file": None,
"export_path": DEFAULT_PATH,
"export_overwrite": False,
"export_backup": False,
"export_schema": False,
"export_fd": None,
"export_fd_close": True,
}
[docs] def open_fd(self) -> IO:
"""Open a file descriptor."""
if self.arg_export_fd:
return self.open_fd_arg()
elif self.arg_export_file:
return self.open_fd_path()
else:
return self.open_fd_stdout()
[docs] def open_fd_arg(self) -> IO:
"""Open a file descriptor supplied in GETARGS."""
self._fd: IO = self.arg_export_fd
self._fd_close: bool = self.arg_export_fd_close
self._fd_info: str = f"{self._fd}"
self.echo(msg=f"Exporting to {self._fd_info}")
return self._fd
@property
def export_full_path(self) -> pathlib.Path:
"""Pass."""
return get_paths_format(
self.arg_export_path, self.arg_export_file, mapping=self.export_templates
)
[docs] def open_fd_path(self) -> IO:
"""Open a file descriptor for a path."""
export_fd_close = self.arg_export_fd_close
export_backup = self.arg_export_backup
export_overwrite = self.arg_export_overwrite
self._file_path: pathlib.Path = self.export_full_path
self._file_path_backup: Optional[pathlib.Path] = None
self._fd_close: bool = export_fd_close
check_path_is_not_dir(path=self._file_path)
if self._file_path.exists():
if export_backup:
self._file_path_backup: pathlib.Path = path_backup_file(path=self._file_path)
self._file_mode: str = "Renamed existing file and created new file"
self.echo(
msg=f"Renamed existing file to {str(self._file_path_backup)!r}",
debug=True,
)
elif not export_overwrite:
msg = f"Export file {str(self._file_path)!r} already exists and overwrite is False!"
self.echo(msg=msg, error=ApiError, level="error")
else:
self._file_mode: str = "Overwrote existing file"
else:
self._file_mode: str = "Created new file"
if not self._file_path.parent.is_dir():
self._file_path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
self.echo(msg=f"Created directory {str(self._file_path.parent)!r}", debug=True)
if not self._file_path.exists():
self._file_path.touch(mode=0o600)
self.echo(msg=f"Created new file {str(self._file_path)!r}", debug=True)
self._fd_info: str = f"file {str(self._file_path)!r} ({self._file_mode})"
self.echo(msg=f"Exporting to {self._fd_info}")
self._fd: IO = self._file_path.open(mode="w", encoding="utf-8")
return self._fd
[docs] def open_fd_stdout(self) -> IO:
"""Open a file descriptor to STDOUT."""
self._fd_close: bool = False
self._fd: IO = sys.stdout
self._fd_info: str = "stdout"
self.echo(msg=f"Exporting to {self._fd_info}")
return self._fd
[docs] def close_fd(self):
"""Close a file descriptor."""
self._fd.write("\n")
close = getattr(self, "_fd_close", False)
closer = getattr(self._fd, "close", None)
if close and callable(closer):
closer()
self.echo(msg=f"Finished exporting to {self._fd_info}")
@property
def export_templates(self) -> dict:
"""Pass."""
return self.STORE.get("export_templates") or {}
@property
def arg_export_fd(self) -> Optional[IO]:
"""Pass."""
return self.get_arg_value("export_fd")
@property
def arg_export_file(self) -> Optional[PathLike]:
"""Pass."""
value = self.get_arg_value("export_file")
if isinstance(value, (str, pathlib.Path)) and value:
return value
return None
@property
def arg_export_path(self) -> pathlib.Path:
"""Pass."""
value = self.get_arg_value("export_path")
return get_path(obj=value)
@property
def arg_export_backup(self) -> bool:
"""Pass."""
return self.get_arg_value("export_backup")
@property
def arg_export_overwrite(self) -> bool:
"""Pass."""
return self.get_arg_value("export_overwrite")
@property
def arg_export_fd_close(self) -> bool:
"""Pass."""
return self.get_arg_value("export_fd_close")
ARG_DESCRIPTIONS: dict = {
"field_excludes": "Fields to exclude from output",
"field_flatten": "Flatten complex fields",
"field_explode": "Field to explode",
"field_titles": "Rename fields to titles",
"field_compress": "Shorten field names in output to 'adapter:field'",
"field_replace": "Field name character replacements",
"field_join": "Join list field values",
"field_join_value": "Join list field values using",
"field_join_trim": "Join list field character limit (0 = None)",
"field_null": "Add null values for missing fields",
"field_null_value": "Null value to use for missing simple fields",
"field_null_value_complex": "Null value to use for missing complex fields",
"tags_add": "Tags to add to assets",
"expirable_tags": "Expiration dates for tags",
"tags_add_invert_selection": "Invert selection for tags to add",
"tags_remove": "Tags to remove from assets",
"tags_remove_invert_selection": "Invert selection for tags to remove",
"report_adapters_missing": "Add Missing Adapters calculation",
"report_software_whitelist": "Missing Software to calculate",
"page_progress": "Echo page progress every N assets",
"do_echo": "Echo messages to console",
"custom_cbs": "Custom callbacks to perform on assets",
"json_flat": "For JSON Export: Use JSONL format",
"csv_key_miss": "For CSV Export: Value to use when keys are missing",
"csv_key_extras": "For CSV Export: What to do with extra CSV columns",
"csv_dialect": "For CSV Export: CSV Dialect to use",
"csv_quoting": "For CSV Export: CSV quoting style",
"csv_field_flatten": "For CSV/XLSX Export: Enable flattening of complex fields",
"csv_field_join": "For CSV/XLSX Export: Enable joining of list fields",
"csv_field_null": "For CSV/XLSX Export: Enable null values for missing fields",
"export_file": "File to export data to",
"export_path": "Directory to export data to",
"export_overwrite": "Overwrite export_file if it exists",
"export_schema": "Export schema of fields",
"export_fd": "Export to a file descriptor",
"export_fd_close": "Close the file descriptor when done",
"export_backup": "If export_file exists, rename it with the datetime",
"table_format": "For Table export: Table format to use",
"table_max_rows": "For Table export: Maximum rows to output",
"table_api_fields": "For Table export: Include API fields in output",
"xlsx_column_length": "For XLSX export: Length to use for every column",
"xlsx_cell_format": "For XLSX Export: Formatting to apply to every cell",
"debug_timing": "Enable logging of time taken for each callback",
"explode_entities": "Split rows into one row for each asset entity",
"include_dates": "Include history date and current date as a columns in the output",
}
"""Descriptions of all arguments for all callbacks"""