Source code for axonius_api_client.api.assets.fields

# -*- coding: utf-8 -*-
"""API for working with fields for assets."""
import re
from typing import List, Optional, Tuple, Union

from cachetools import TTLCache, cached
from fuzzyfinder import fuzzyfinder

from ...constants.fields import (
    AGG_ADAPTER_ALTS,
    AGG_ADAPTER_NAME,
    FUZZY_SCHEMAS_KEYS,
    GET_SCHEMA_KEYS,
    GET_SCHEMAS_KEYS,
    PRETTY_SCHEMA_TMPL,
)
from ...exceptions import ApiError, NotFoundError
from ...parsers.fields import parse_fields
from ...tools import listify, split_str, strip_right
from .. import json_api
from ..api_endpoints import ApiEndpoints
from ..mixins import ChildMixins


[docs]class Fields(ChildMixins): """API for working with fields for the parent asset type. Examples: Create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume ``apiobj`` is either ``client.devices`` or ``client.users`` >>> apiobj = client.devices # or client.users * Get schemas of all adapters and their fields: :meth:`get` * Validate field names supplied: :meth:`validate` See Also: * Device assets :obj:`axonius_api_client.api.assets.devices.Devices` * User assets :obj:`axonius_api_client.api.assets.users.Users` """
[docs] @cached(cache=TTLCache(maxsize=1024, ttl=300)) def get(self) -> dict: """Get the schema of all adapters and their fields. Examples: Get all fields for all adapters >>> fields = apiobj.fields.get() See the adapter names >>> print(list(fields)) See the field fully qualified name, short name, and title for all fields of an adapter >>> schemas = fields['agg'] >>> for schema in schemas: ... qual = schema['name_qual'] ... name = schema['name_base'] ... title = schema['title'] ... print(f"title {title!r}, qualified name {name!r}, base name {name!r}") """ return parse_fields(raw=self._get().document_meta)
[docs] def validate( self, fields: Optional[Union[List[str], str]] = None, fields_regex: Optional[Union[List[str], str]] = None, fields_manual: Optional[Union[List[str], str]] = None, fields_fuzzy: Optional[Union[List[str], str]] = None, fields_default: bool = True, fields_root: Optional[str] = None, ) -> List[dict]: """Get the fully qualified field names for getting asset data. Examples: * ``fields`` gets parsed by :meth:`get_field_names_eq` * ``fields_regex`` gets parsed by :meth:`get_field_names_re` * ``fields_fuzzy`` gets parsed by :meth:`get_field_names_fuzzy` * ``fields_root`` gets parsed by :meth:`get_field_names_root` * ``fields_default`` will add :attr:`axonius_api_client.api.assets.users.Users.fields_default` or :attr:`axonius_api_client.api.assets.devices.Devices.fields_default` based on the parent asset type Notes: This is used in a number of ways to allow the user to supply fields that are easier to refer to than the fully qualified name or title of a field. Args: fields: list of fields that must equal their base name, qual name, or title fields_regex: list of fields to regex match against their base name, qual name, or title fields_manual: list of already fully qualified field names fields_fuzzy: list of fields to fuzzy match against their base name or title fields_default: include the default fields defined in the parent API object fields_root: include all root fields from a single adapter Raises: :exc:`ApiError`: if no fields selected after all processing is done """ def add(items): for item in items: if item not in selected: selected.append(item) fields = listify(obj=fields) fields_manual = listify(obj=fields_manual) fields_fuzzy = listify(obj=fields_fuzzy) selected = [] if fields_default and not fields_root: add(self.parent.fields_default) if fields_root: add(self.get_field_names_root(adapter=fields_root)) add(fields_manual) add(self.get_field_names_eq(value=fields)) add(self.get_field_names_re(value=fields_regex)) add(self.get_field_names_fuzzy(value=fields_fuzzy)) if not selected: raise ApiError("No fields supplied, must supply at least one field") return selected
[docs] def get_field_name( self, value: str, field_manual: bool = False, fields_custom: Optional[dict] = None, key: str = "name_qual", ) -> str: """Get the fully qualified name of a field. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume ``apiobj`` is either ``client.devices`` or ``client.users`` >>> apiobj = client.devices Get the FQDN of a field on the aggregated adapter >>> apiobj.fields.get_field_name(value='hostname') 'specific_data.data.hostname' >>> apiobj.fields.get_field_name(value='agg:hostname') 'specific_data.data.hostname' Get the title of a field on the aggregated adapter >>> apiobj.fields.get_field_name(value='hostname', key="title") 'Host Name' Get the FQDN of a field on the AWS adapter >>> apiobj.fields.get_field_name(value='aws:aws_device_type') 'adapters_data.aws_adapter.aws_device_type' Args: value: field to find in format of ``adapter_name:field_name`` field_manual: treat the field name as fully qualified fields_custom: custom schemas to search thru in addition to API schemas key: key of schema to return, or if empty return the schema itself Raises: :exc:`ApiError`: if more than one field found in value after splitting it """ if field_manual and key: return value adapter, fields = self.split_search(value=value) if len(fields) != 1: raise ApiError("More than one field supplied to {}".format(value)) field = fields[0] fields = self.get() adapter = self.get_adapter_name(value=adapter) schemas = fields[adapter] if fields_custom and adapter in fields_custom: schemas += fields_custom[adapter] schema = self.get_field_schema(value=field, schemas=schemas) return schema[key] if key else schema
[docs] def get_field_names_re(self, value: Union[str, List[str]], key: str = "name_qual") -> List[str]: """Get field names using regex. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume ``apiobj`` is either ``client.devices`` or ``client.users`` >>> apiobj = client.devices Get all aggregated field FQDNs that start with host >>> apiobj.fields.get_field_names_re('^host') ['specific_data.data.hostname', 'specific_data.data.hostname_preferred'] Get fields names for AWS adapter that start with id and AGG adapter that start with host >>> apiobj.fields.get_field_names_re(['aws:^id', '^host']) [ 'adapters_data.aws_adapter.id', 'specific_data.data.hostname', 'specific_data.data.hostname_preferred' ] Get all aggregated field titles that start with host >>> apiobj.fields.get_field_names_re('^host', key="title") ['Host Name', 'Preferred Host Name'] Get all aggregated field FQDNs that have a title of 'Host Name' >>> apiobj.fields.get_field_names_re('Host Name') ['specific_data.data.hostname', 'specific_data.data.hostname_preferred'] Get all field FQDNs that start with id for all adapters that match regex 'ac' >>> apiobj.fields.get_field_names_re('ac:^id') [ 'adapters_data.active_directory_adapter.id', 'adapters_data.carbonblack_defense_adapter.id', 'adapters_data.limacharlie_adapter.id' ] Args: value: regex to search for fields key: key of schema to return """ splits = self.split_searches(value=value) fields = self.get() matches = [] for adapter_re, fields_re in splits: adapters = self.get_adapter_names(value=adapter_re) for adapter in adapters: for field_re in fields_re: fschemas = self.get_field_schemas(value=field_re, schemas=fields[adapter]) names = [x[key] for x in fschemas] matches += [x for x in names if x not in matches] return matches
[docs] def get_field_names_eq(self, value: Union[str, List[str]], key: str = "name_qual") -> List[str]: """Get field names that equal a value. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume ``apiobj`` is either ``client.devices`` or ``client.users`` >>> apiobj = client.devices Get field names that equal aggregated hostname or id and AWS device type >>> apiobj.fields.get_field_names_eq(['hostname,id', 'aws:aws_device_type']) [ 'specific_data.data.hostname', 'specific_data.data.id', 'adapters_data.aws_adapter.aws_device_type' ] Get field names that equal aggregated hostname or id >>> apiobj.fields.get_field_names_eq('hostname,id') ['specific_data.data.hostname', 'specific_data.data.id'] Args: value: value to search for fields key: key of schema to return """ splits = self.split_searches(value=value) fields = self.get() matches = [] for adapter_name, names in splits: adapter = self.get_adapter_name(value=adapter_name) for name in names: schemas = fields[adapter] schema = self.get_field_schema(value=name, schemas=schemas) match = schema[key] if key else schema if match not in matches: matches.append(match) return matches
[docs] def get_field_names_fuzzy(self, value: str, key: str = "name_qual") -> List[str]: """Get field names using that equal a value. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume ``apiobj`` is either ``client.devices`` or ``client.users`` >>> apiobj = client.devices Get field names that fuzzy match a misspelt version of hostname >>> apiobj.fields.get_field_names_fuzzy('hostnme') ['specific_data.data.hostname'] Args: value: value to search for fields key: key of schema to return """ splits = self.split_searches(value=value) fields = self.get() matches = [] for adapter_name, names in splits: adapter = self.get_adapter_name(value=adapter_name) for name in names: schemas = fields[adapter] amatches = self.fuzzy_filter(search=name, schemas=schemas, key=key, root_only=True) matches += [x for x in amatches if x not in matches] return matches
[docs] def get_field_schemas_root(self, adapter: str) -> List[dict]: """Get schemas of all root fields for a given adapter. Args: adapter: name of adapter to get all root fields for See Also: :meth:`get_field_names_root` Notes: root fieldsĀ are fields that are fields that are not sub-fields of complex fields For instance 'specific_data.data.network_interfaces.ips' is NOT a root field, since it 'ips' is a sub field of 'specific_data.data.network_interfaces' """ fields = self.get() adapter = self.get_adapter_name(value=adapter) schemas = fields[adapter] matches = [x for x in schemas if x.get("selectable") and x.get("is_root")] return matches
[docs] def get_field_names_root(self, adapter: str, key: str = "name_qual") -> List[str]: """Get names of all root fields for a given adapter. Args: adapter: name of adapter to get all root fields for See Also: :meth:`get_field_schemas_root` """ schemas = self.get_field_schemas_root(adapter=adapter) names = [x[key] for x in schemas] return names
[docs] @staticmethod def fuzzy_filter( search: str, schemas: List[dict], root_only: bool = False, key: str = "name_qual", fuzzy_keys: List[str] = FUZZY_SCHEMAS_KEYS, **kwargs, ) -> List[dict]: """Perform a fuzzy search against a set of field schemas. Args: search: string to search for against the keys in fuzzy_keys schemas: field schemas to search through root_only: only search against schemas of root fields key: return the schema key value instead of the field schemas fuzzy_keys: list of keys to check search against in each field schema """ def do_skip(schema): is_details = schema["name"].endswith("_details") is_all = schema["name"] == "all" not_select = not schema.get("selectable", True) is_root = root_only and not schema["is_root"] if any([schema in matches, is_details, is_all, not_select, is_root]): return True return False matches = [] for schema in schemas: if do_skip(schema): continue values = [schema[x] for x in fuzzy_keys] if any([search.strip().lower() in x for x in values]): matches.append(schema) if not matches: for schema in schemas: if do_skip(schema): continue values = [schema[x] for x in fuzzy_keys] if list(fuzzyfinder(search, values)): matches.append(schema) return [x[key] for x in matches] if key else matches
[docs] def get_adapter_names(self, value: str) -> List[str]: """Find adapter names that regex match a value. Args: value: regex of adapter to match Raises: :exc:`NotFoundError`: when no adapter name matches supplied value """ fields = self.get() search = strip_right(obj=value.lower().strip(), fix="_adapter") if search in AGG_ADAPTER_ALTS: search = AGG_ADAPTER_NAME search = re.compile(search, re.I) matches = [x for x in fields if search.search(x)] if not matches: msg = ("No adapter found where name regex matches {!r}, valid adapters:\n {}").format( value, "\n ".join(list(fields)) ) raise NotFoundError(msg) return matches
[docs] def get_adapter_name(self, value: str) -> str: """Find an adapter name that equals a value. Args: value: name of adapter Raises: :exc:`NotFoundError`: when no adapter name equals supplied value """ fields = self.get() search = strip_right(obj=value.lower().strip(), fix="_adapter") if search in AGG_ADAPTER_ALTS: search = AGG_ADAPTER_NAME if search in fields: return search msg = "No adapter found where name equals {!r}, valid adapters:\n {}" msg = msg.format(value, "\n ".join(list(fields))) raise NotFoundError(msg)
[docs] def get_field_schemas( self, value: str, schemas: List[dict], keys: List[str] = GET_SCHEMAS_KEYS ) -> List[dict]: """Find field schemes that regex match a value. Args: value: regex of name to match schemas: list of field schemas to search through keys: list of keys to check regex value against """ search = re.compile(value.lower().strip(), re.I) matches = [] for schema in schemas: if not schema.get("selectable"): continue for key in keys: if search.search(schema[key]) and schema not in matches: matches.append(schema) return matches
[docs] def get_field_schema( self, value: str, schemas: List[dict], keys: List[str] = GET_SCHEMA_KEYS, **kwargs, ) -> dict: """Find a field name that equals a value. Args: value: name of field schemas: list of field schemas to search through keys: list of keys to check if value equals **kwargs: passed to :meth:`fuzzy_filter` to print fuzzy matches in error if no matches found Raises: :exc:`NotFoundError`: when no field name equals supplied value """ search = value.lower().strip() schemas = [x for x in schemas if x.get("selectable", True)] for schema in schemas: for key in keys: if search.lower().strip() == schema[key].lower(): return schema kwargs["search"] = value kwargs["schemas"] = schemas kwargs["key"] = "" fuzzy = self.fuzzy_filter(**kwargs) err = "No fuzzy matches, all valid fields:" if fuzzy: keys = kwargs.get("keys_fuzzy", FUZZY_SCHEMAS_KEYS) err = "Maybe you meant one of these fuzzy matches:" ktxt = " or ".join(keys) pre = f"No field found where {ktxt} equals {value!r}" errs = [pre, err, "", *self._prettify_schemas(schemas=fuzzy or schemas)] raise NotFoundError("\n".join(errs))
[docs] def split_searches(self, value: Union[List[str], str]) -> List[Tuple[str, List[str]]]: """Split a list of strings into adapter:field(s) format. Args: value: format of ``'adapter:field1,field2'`` or ``['adapter1:field1', 'adapter2:field2']`` """ return [self.split_search(value=x) for x in listify(obj=value)]
[docs] def _prettify_schemas( self, schemas: List[dict], tmpl: str = PRETTY_SCHEMA_TMPL, len_key: str = "name_base" ) -> List[str]: """Prettify a set of schemas for output in human friendly format. Args: schemas: field schemas to prettify tmpl: template to use to prettify schemas len_key: schema key to get max length of and pass into tmpl as "len_max" """ len_max = max([len(x[len_key]) for x in schemas]) return [tmpl.format(len_max=len_max, **x) for x in schemas]
[docs] def _get(self) -> json_api.generic.Metadata: """Private API method to get the schema of all fields.""" api_endpoint = ApiEndpoints.assets.fields return api_endpoint.perform_request(http=self.auth.http, asset_type=self.parent.ASSET_TYPE)