# -*- coding: utf-8 -*-
"""API for working with adapters."""
import datetime
import pathlib
import typing as t
from cachetools import TTLCache, cached
from ...constants.ctypes import PatternLikeListy
from ...exceptions import ApiError, NotFoundError # , StopFetch
from ...parsers.config import config_build, config_unchanged, config_unknown
from ...parsers.tables import tablize_adapters
from ...tools import path_read
from ..api_endpoints import ApiEndpoints
from ..json_api.adapters import (
Adapter,
AdapterFetchHistoryRequest,
AdapterSettings,
AdaptersList,
CnxLabels,
AdapterFetchHistory,
AdapterFetchHistoryFilters,
)
from ..json_api.paging_state import PagingState
from ..json_api.system_settings import SystemSettings
from ..json_api.time_range import UnitTypes
from ..mixins import ModelMixins
HIST_MOD = AdapterFetchHistory
HIST_GEN = t.Generator[HIST_MOD, None, None]
HIST_LIST = t.List[HIST_MOD]
CACHE_HISTORY_FILTERS: TTLCache = TTLCache(maxsize=4096, ttl=60)
CACHE_GET_BASIC: TTLCache = TTLCache(maxsize=1024, ttl=60)
[docs]class Adapters(ModelMixins):
"""API model for working with adapters.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
* Get metadata of all adapters: :meth:`get`
* Get an adapter by name: :meth:`get_by_name`
* Get the advanced settings for an adapter: :meth:`config_get`
* Update the advanced settings for an adapter: :meth:`config_update`
* Upload a file to an adapter: :meth:`file_upload`
* Work with adapter connections :obj:`axonius_api_client.api.adapters.cnx.Cnx`
Notes:
All methods use the Core instance by default, but you can work with another instance by
passing the name of the instance to ``node``.
Supplying unknown keys/values for configurations will throw an error showing the
valid keys/values.
"""
[docs] def get(self, get_clients: bool = False) -> t.List[dict]:
"""Get all adapters on all nodes.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`.
Get all adapters
>>> adapters = client.adapters.get()
Get details of each adapter
>>> for adapter in adapters:
... print(adapter["name"]) # name of adapter
... print(adapter["node_name"]) # name of node adapter is running on
Args:
get_clients (bool, optional): Include the connections and schemas in the response
Returns:
t.List[dict]: list of adapter metadata
"""
return [
adapter_node.to_dict_old()
for adapter in self._get(get_clients=get_clients)
for adapter_node in adapter.adapter_nodes
]
[docs] def get_by_name_basic(self, value: str) -> dict:
"""Get an adapters basic metadata (including display title) by name.
Args:
value (str): short name of adapter (i.e. ``aws``)
Returns:
dict: adapter basic metadata
"""
data = self.get_basic()
return data.find_by_name(value=value)
[docs] def get_by_name(
self, name: str, node: t.Optional[str] = None, get_clients: bool = False
) -> dict:
"""Get an adapter by name on a single node.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`.
Get an adapter by name
>>> adapter = client.adapters.get_by_name(name="aws")
Get details of adapter
>>> adapter['status'] # overall adapter status
'success'
>>> adapter['cnx_count_total'] # total connection count
1
>>> adapter['cnx_count_broken'] # broken connection count
0
>>> adapter['cnx_count_working'] # working connection count
1
Get details of each connection of the adapter
>>> for cnx in adapter["cnx"]:
... print(cnx["working"]) # bool if connection is working or not
... print(cnx["error"]) # error from last fetch attempt
... print(cnx["config"]) # configuration of connection
... print(cnx["id"]) # ID of connection
... print(cnx["uuid"]) # UUID of connection
Args:
name (str): name of adapter to get
node (Optional[str], optional): name of node to get adapter from
get_clients (bool, optional): Include the connections and schemas in the response
Raises:
NotFoundError: when no node found or when no adapter found on node
Returns:
dict: adapter metadata
"""
node_meta = self.instances.get_by_name_id_core(value=node)
adapters = self.get(get_clients=get_clients)
node_name = node_meta["name"]
adapters = [adapter for adapter in adapters if adapter["node_name"] == node_name]
keys = ["name", "name_raw", "name_plugin"]
for adapter in adapters:
if any([adapter[k].lower() == name.lower() for k in keys]):
adapter["node_meta"] = node_meta
return adapter
err = f"No adapter named {name!r} found on instance {node_name!r}"
raise NotFoundError(tablize_adapters(adapters=adapters, err=err))
[docs] @cached(cache=CACHE_GET_BASIC)
def get_basic_cached(self) -> AdaptersList:
"""Get basic adapter data cached."""
return self.get_basic()
[docs] def get_basic(self) -> AdaptersList:
"""Get basic adapter data."""
return self._get_basic()
[docs] @cached(cache=CACHE_HISTORY_FILTERS)
def get_fetch_history_filters(self) -> AdapterFetchHistoryFilters:
"""Get filter values for use in adapters history."""
return self._get_fetch_history_filters()
[docs] def get_fetch_history(self, generator: bool = False, **kwargs) -> t.Union[HIST_GEN, HIST_LIST]:
"""Get adapter fetch history.
Args:
generator (bool, optional): Return a generator or a list
**kwargs: passed to :meth:`get_fetch_history_generator`
Returns:
t.Union[HIST_GEN, HIST_LIST]: t.Generator or list of history event models
"""
gen = self.get_fetch_history_generator(**kwargs)
return gen if generator else list(gen)
[docs] def get_fetch_history_generator(
self,
adapters: t.Optional[PatternLikeListy] = None,
connection_labels: t.Optional[PatternLikeListy] = None,
clients: t.Optional[PatternLikeListy] = None,
instances: t.Optional[PatternLikeListy] = None,
statuses: t.Optional[PatternLikeListy] = None,
discoveries: t.Optional[PatternLikeListy] = None,
exclude_realtime: bool = False,
relative_unit_type: UnitTypes = UnitTypes.get_default(),
relative_unit_count: t.Optional[int] = None,
absolute_date_start: t.Optional[datetime.datetime] = None,
absolute_date_end: t.Optional[datetime.datetime] = None,
sort_attribute: t.Optional[str] = None,
sort_descending: bool = False,
search: t.Optional[str] = None,
filter: t.Optional[str] = None,
page_sleep: int = PagingState.page_sleep,
page_size: int = PagingState.page_size,
row_start: int = PagingState.row_start,
row_stop: t.Optional[int] = PagingState.row_stop,
log_level: t.Union[int, str] = PagingState.log_level,
history_filters: t.Optional[AdapterFetchHistoryFilters] = None,
request_obj: t.Optional[AdapterFetchHistoryRequest] = None,
) -> HIST_GEN:
"""Get adapter fetch history.
Notes:
Use ~ prefix for regex in adapters, connection_labels, clients, instances, statuses
Args:
adapters (t.Optional[PatternLikeListy], optional): Filter for records with matching
adapters
connection_labels (t.Optional[PatternLikeListy], optional): Filter for records with
connection labels
clients (t.Optional[PatternLikeListy], optional): Filter for records with matching
client ids
instances (t.Optional[PatternLikeListy], optional): Filter for records with matching
instances
statuses (t.Optional[PatternLikeListy], optional): Filter for records with matching
statuses
discoveries (t.Optional[PatternLikeListy], optional): Filter for records with matching
discovery IDs
exclude_realtime (bool, optional): Exclude records for realtime adapters
relative_unit_type (UnitTypes, optional): Type of unit to use when supplying
relative_unit_count
relative_unit_count (Optional[int], optional): Filter records for the past N units of
relative_unit_type
absolute_date_start (Optional[datetime.datetime], optional): Filter records that are
after this date. (overrides relative values)
absolute_date_end (Optional[datetime.datetime], optional): Filter records that are
before this date. (defaults to now if start but no end)
sort_attribute (Optional[str], optional): Sort records based on this attribute
sort_descending (bool, optional): Sort sort_attribute descending or ascending
page_sleep (int, optional): Sleep N seconds between pages
page_size (int, optional): Get N records per page
row_start (int, optional): Start at row N
row_stop (Optional[int], optional): Stop at row N
log_level (t.Union[int, str], optional): log level to use for paging
history_filters (Optional[AdapterFetchHistoryFilters], optional): response
from :meth:`get_fetch_history_filters` (will be fetched if not supplied)
request_obj (Optional[AdapterFetchHistoryRequest], optional): Request object to use
for options
"""
if not isinstance(history_filters, AdapterFetchHistoryFilters):
history_filters = self.get_fetch_history_filters()
if not isinstance(request_obj, AdapterFetchHistoryRequest):
request_obj = AdapterFetchHistoryRequest()
request_obj.set_filters(
history_filters=history_filters,
value_type="adapters",
value=adapters,
)
request_obj.set_filters(
history_filters=history_filters,
value_type="connection_labels",
value=connection_labels,
)
request_obj.set_filters(
history_filters=history_filters,
value_type="clients",
value=clients,
)
request_obj.set_filters(
history_filters=history_filters,
value_type="instances",
value=instances,
)
request_obj.set_filters(
history_filters=history_filters,
value_type="statuses",
value=statuses,
)
request_obj.set_filters(
history_filters=history_filters,
value_type="discoveries",
value=discoveries,
)
request_obj.set_sort(
value=sort_attribute,
descending=sort_descending,
)
request_obj.set_exclude_realtime(
value=exclude_realtime,
)
request_obj.set_time_range(
relative_unit_type=relative_unit_type,
relative_unit_count=relative_unit_count,
absolute_date_start=absolute_date_start,
absolute_date_end=absolute_date_end,
)
request_obj.set_search_filter(
search=search,
filter=filter,
)
with PagingState(
purpose="Get Adapter Fetch History Events",
page_sleep=page_sleep,
page_size=page_size,
row_start=row_start,
row_stop=row_stop,
log_level=log_level,
) as state:
while not state.stop_paging:
page = state.page(method=self._get_fetch_history, request_obj=request_obj)
yield from page.rows
[docs] def config_get(
self,
name: str,
node: t.Optional[str] = None,
config_type: str = "generic",
) -> dict:
"""Get the advanced settings for an adapter.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`.
Get the generic advanced settings for an adapter
>>> config = client.adapters.config_get(name="aws")
Get the adapter specific advanced settings for an adapter
>>> config = client.adapters.config_get(name="aws", config_type="specific")
Get the discovery advanced settings for an adapter
>>> config = client.adapters.config_get(name="aws", config_type="discovery")
See the current values of a configuration
>>> import pprint
>>> pprint.pprint(config['config'])
{'connect_client_timeout': 300,
'fetching_timeout': 43200,
'last_fetched_threshold_hours': 48,
'last_seen_prioritized': False,
'last_seen_threshold_hours': 24,
'minimum_time_until_next_fetch': None,
'realtime_adapter': False,
'user_last_fetched_threshold_hours': 48,
'user_last_seen_threshold_hours': None}
Investigate the schema and current values of a configuration
>>> for setting, info in config['schema'].items():
... current_value = config['config'][setting]
... title = info['title']
... description = info.get('description')
... print(f"name of setting: {setting}")
... print(f" title of setting in GUI: {title}")
... print(f" description of setting: {description}")
... print(f" current value of setting: {current_value}")
Args:
name (str): name of adapter to get advanced settings of
node (Optional[str], optional): name of node to get adapter from [NO LONGER USED]
config_type (str, optional): One of generic, specific, or discovery
Returns:
dict: configuration for ``config_type``
"""
adapter = self.get_by_name(name=name, node=node, get_clients=False)
adapters = self._config_get(adapter_name=adapter["name_raw"])
type_map = adapters.type_map
if config_type not in type_map:
valid = ", ".join(list(type_map))
raise ApiError(f"Adapter {name} has no config type {config_type!r}, valids: {valid}!")
adapter_config = type_map[config_type]
adapter_config["adapter"] = adapter
return adapter_config
[docs] def config_update(
self, name: str, node: t.Optional[str] = None, config_type: str = "generic", **kwargs
) -> dict:
"""Update the advanced settings for an adapter.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`.
Update the generic advanced settings for the adapter
>>> updated_config = client.adapters.config_update(
... name="aws", last_seen_threshold_hours=24
... )
Update the adapter specific advanced settings
>>> updated_config = client.adapters.config_update(
... name="aws", config_type="specific", fetch_s3=True
... )
Args:
name (str): name of adapter to update advanced settings of
node (Optional[str], optional): name of node to get adapter from
config_type (str, optional): One of generic, specific, or discovery
**kwargs: configuration to update advanced settings of config_type
Returns:
dict: updated configuration for ``config_type``
"""
kwargs_config = kwargs.pop("kwargs_config", {})
kwargs.update(kwargs_config)
config_map = self.config_get(name=name, config_type=config_type)
adapter_meta = config_map["adapter"]
old_config = config_map["config"]
schemas = config_map["schema"]
source = f"adapter {name!r} {config_type} advanced settings"
config_unknown(schemas=schemas, new_config=kwargs, source=source)
new_config = config_build(
schemas=schemas, old_config=old_config, new_config=kwargs, source=source
)
config_unchanged(
schemas=schemas, old_config=old_config, new_config=new_config, source=source
)
self._config_update(
adapter_name=adapter_meta["name_raw"],
config_name=config_map["config_name"],
config=new_config,
)
return self.config_get(name=name, config_type=config_type)
[docs] def file_upload(
self,
name: str,
field_name: str,
file_name: str,
file_content: t.Union[str, bytes],
file_content_type: t.Optional[str] = None,
node: t.Optional[str] = None,
) -> dict:
"""Upload a file to a specific adapter on a specific node.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`.
Upload content as a file for use in a connection later
>>> content = "content of file to upload"
>>> file_uuid = client.adapters.file_upload(
... name="aws",
... file_name="name_of_file",
... file_content=content,
... field_name="name_of_field",
... )
>>> file_uuid
{'uuid': '5f78b7dee33f0a113700a6fc', 'filename': 'name_of_file'}
Args:
name: name of adapter to upload file to
node: name of node to upload file to
field_name: name of field (should match configuration schema key name)
file_name: name of file to upload
file_content: content of file to upload
file_content_type: mime type of file to upload
Returns:
dict: with keys 'filename' and 'uuid'
"""
adapter = self.get_by_name(name=name, node=node, get_clients=False)
return self._file_upload(
adapter_name=adapter["name_raw"],
node_id=adapter["node_id"],
file_name=file_name,
field_name=field_name,
file_content=file_content,
file_content_type=file_content_type,
)
[docs] def file_upload_path(self, path: t.Union[str, pathlib.Path], **kwargs) -> dict:
"""Upload the contents of a file to a specific adapter on a specific node.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`.
Upload a file for use in a connection later
>>> file_uuid = client.adapters.file_upload_path(name="aws", path="test.csv")
>>> file_uuid
{'uuid': '5f78b674e33f0a113700a6fa', 'filename': 'test.csv'}
Args:
path (t.Union[str, pathlib.Path]): path to file containing contents to upload
**kwargs: passed to :meth:`file_upload`
Returns:
dict: with keys 'filename' and 'uuid'
"""
path, file_content = path_read(obj=path, binary=True, is_json=False)
if path.suffix == ".csv":
kwargs.setdefault("file_content_type", "text/csv")
kwargs.setdefault("field_name", path.name)
kwargs.setdefault("file_name", path.name)
kwargs["file_content"] = file_content
return self.file_upload(**kwargs)
[docs] def _init(self, **kwargs):
"""Post init method for subclasses to use for extra setup."""
from ..system.instances import Instances
from .cnx import Cnx
self.cnx: Cnx = Cnx(parent=self)
"""Work with adapter connections"""
self.instances: Instances = Instances(auth=self.auth)
"""Work with instances"""
[docs] def _get(self, get_clients: bool = False, filter: t.Optional[str] = None) -> t.List[Adapter]:
"""Private API method to get all adapters.
Args:
get_clients (bool, optional): Include the connections and schemas in the response
filter (Optional[str], optional): unk
Returns:
t.List[Adapter]: t.List of Adapter dataclass models
"""
api_endpoint = ApiEndpoints.adapters.get
request_obj = api_endpoint.load_request(get_clients=get_clients, filter=filter)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _config_update(self, adapter_name: str, config_name: str, config: dict) -> SystemSettings:
"""Private API method to set advanced settings for an adapter.
Args:
adapter_name (str): raw name of the adapter i.e. ``aws_adapter``
config_name (str): name of advanced settings to set
* ``AdapterBase`` for generic advanced settings
* ``AwsSettings`` for adapter specific advanced settings (name changes per adapter)
* ``DiscoverySchema`` for discovery advanced settings
config (dict): the advanced configuration key value pairs to set
Returns:
SystemSettings: dataclass model containing response
"""
api_endpoint = ApiEndpoints.adapters.settings_update
request_obj = api_endpoint.load_request(
pluginId=adapter_name, configName=config_name, config=config
)
return api_endpoint.perform_request(
http=self.auth.http,
request_obj=request_obj,
adapter_name=adapter_name,
config_name=config_name,
)
[docs] def _get_basic(self) -> AdaptersList:
"""Get the basic metadata for all adapters.
Returns:
AdaptersList: dataclass model containing response
"""
api_endpoint = ApiEndpoints.adapters.get_basic
return api_endpoint.perform_request(http=self.auth.http)
[docs] def _config_get(self, adapter_name: str) -> AdapterSettings:
"""Private API method to set advanced settings for an adapter.
Args:
adapter_name (str): raw name of the adapter, i.e. 'aws_adapter'
Returns:
AdapterSettings: dataclass model containing response
"""
api_endpoint = ApiEndpoints.adapters.settings_get
return api_endpoint.perform_request(http=self.auth.http, adapter_name=adapter_name)
[docs] def _file_upload(
self,
adapter_name: str,
node_id: str,
field_name: str,
file_name: str,
file_content: t.Union[bytes, str],
file_content_type: t.Optional[str] = None,
file_headers: t.Optional[dict] = None,
) -> dict:
"""Private API method to upload a file to a specific adapter on a specifc node.
Args:
adapter_name (str): raw name of the adapter i.e. ``aws_adapter``
node_id (str): ID of node running adapter
field_name (str): name of field (should match configuration schema key name)
file_name (str): name of file to upload
file_content (t.Union[bytes, str]): content of file to upload
file_content_type (Optional[str], optional): mime type of file to upload
file_headers (Optional[dict], optional): headers to use for file
Returns:
dict: containing filename and uuid keys
"""
api_endpoint = ApiEndpoints.adapters.file_upload
data = {"field_name": field_name}
files = {"userfile": (file_name, file_content, file_content_type, file_headers)}
http_args = {"files": files, "data": data}
response = api_endpoint.perform_request(
http=self.auth.http, http_args=http_args, adapter_name=adapter_name, node_id=node_id
)
parsed = {"filename": file_name, "uuid": response["data"]["id"]}
return parsed
[docs] def _get_labels(self) -> CnxLabels:
"""Get labels metadata for all connections.
Returns:
CnxLabels: dataclass model containing response
"""
api_endpoint = ApiEndpoints.adapters.cnx_get_labels
response = api_endpoint.perform_request(http=self.http)
return response
[docs] def _get_fetch_history_filters(self) -> AdapterFetchHistoryFilters:
"""Get filter values for use in adapters history."""
api_endpoint = ApiEndpoints.adapters.get_fetch_history_filters
response = api_endpoint.perform_request(http=self.http)
return response
[docs] def _get_fetch_history(
self, request_obj: t.Optional[AdapterFetchHistoryRequest] = None
) -> HIST_LIST:
"""Get adapter fetch history."""
api_endpoint = ApiEndpoints.adapters.get_fetch_history
if not request_obj:
request_obj = AdapterFetchHistoryRequest()
response = api_endpoint.perform_request(http=self.http, request_obj=request_obj)
return response