Source code for axonius_api_client.api.adapters.cnx

# -*- coding: utf-8 -*-
"""API for working with adapter connections."""
import dataclasses
import re
from typing import Callable, List, Optional, Type, Union

import requests

from ...constants.adapters import CNX_SANE_DEFAULTS
from ...exceptions import (
    CnxAddError,
    CnxError,
    CnxGoneError,
    CnxTestError,
    CnxUpdateError,
    ConfigInvalidValue,
    ConfigRequired,
    NotFoundError,
)
from ...http import Http
from ...parsers.config import (
    config_build,
    config_default,
    config_empty,
    config_info,
    config_required,
    config_unchanged,
    config_unknown,
)
from ...parsers.tables import tablize_cnxs, tablize_schemas
from ...tools import (
    combo_dicts,
    is_existing_file,
    json_dump,
    json_load,
    listify,
    pathify,
    pathlib,
    strip_right,
)
from ..api_endpoints import ApiEndpoints
from ..json_api.adapters import CnxCreate, CnxDelete, CnxLabels, Cnxs, CnxUpdate
from ..json_api.instances import Tunnel
from ..mixins import ChildMixins


[docs]class Cnx(ChildMixins): """API model for working with adapter connections. Examples: Create a ``client`` using :obj:`axonius_api_client.connect.Connect` * Add a connection: :meth:`add` * Get all connections for an adapter: :meth:`get_by_adapter` * Get a connection for an adapter by UUID: :meth:`get_by_uuid` * Get a connection for an adapter by connection label: :meth:`get_by_label` * Get a connection for an adapter by ID: :meth:`get_by_id` * Update a connection for an adapter by ID: :meth:`update_by_id` * Delete a connection for an adapter by ID: :meth:`delete_by_id` * Test a connections parameters for an adapter without creating the connection: :meth:`test` * Work with adapters :obj:`axonius_api_client.api.adapters.adapters.Adapters` Notes: All methods use the Core instance by default, but you can work with another instance by passing the name of the instance to ``adapter_node``. Supplying unknown keys/values for configurations will throw an error showing the valid keys/values. """
[docs] def get_by_adapter( self, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, ) -> List[dict]: """Get all connections of an adapter on a node. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Get all connections for an adapter on the Core instance >>> cnxs = client.adapters.cnx.get_by_adapter(adapter_name="active_directory") Args: adapter_name (str): name of adapter adapter_node (Optional[str], optional): name of node running adapter tunnel (Optional[str], optional): name or ID of tunnel Returns: List[dict]: connection metadata for adapter """ def is_match(cnx): if cnx.node_id != adapter["node_meta"]["node_id"]: return False if (tunnel_obj and cnx.tunnel_id) and tunnel_obj.id != cnx.tunnel_id: return False return True adapter = self.parent.get_by_name(name=adapter_name, node=adapter_node, get_clients=False) tunnel_obj = self.parent.instances.get_tunnel(value=tunnel) cnxs_obj = self._get(adapter_name=adapter["name_raw"]) ret = [x for x in cnxs_obj.cnxs if is_match(x)] return [x.to_dict_old() for x in ret]
[docs] def get_by_uuid( self, cnx_uuid: str, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, **kwargs, ) -> dict: """Get a connection for an adapter on a node by UUID. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Get a single connection by UUID >>> cnx = client.adapters.cnx.get_by_uuid( ... cnx_id='5f76735be4557d5cba94237f', adapter_name='aws' ... ) Notes: UUID of connections change when a connection configuration is updated, the more persistent way to get a connection is :meth:`get_by_id`. Args: cnx_uuid: UUID to search for adapter_name: name of adapter adapter_node: name of node running adapter tunnel (Optional[str], optional): name or ID of tunnel **kwargs: passed to :meth:`get_by_key` Raises: NotFoundError: when no connections found with supplied uuid Returns: dict: connection metadata for uuid on adapter """ def is_match(cnx): if cnx.node_id != adapter["node_meta"]["node_id"]: return False if (tunnel_obj and cnx.tunnel_id) and tunnel_obj.id != cnx.tunnel_id: return False if cnx_uuid in [cnx.client_id, cnx.uuid]: return True return False adapter = self.parent.get_by_name(name=adapter_name, node=adapter_node, get_clients=False) tunnel_obj = self.parent.instances.get_tunnel(value=tunnel) node_name = adapter["node_meta"]["node_name"] cnxs_obj = self._get(adapter_name=adapter["name_raw"]) for cnx in cnxs_obj.cnxs: if is_match(cnx=cnx): return cnx.to_dict_old() err = ( f"No connection found on adapter {adapter_name!r} node {node_name!r} " f"with client ID or UUID of {cnx_uuid!r}" ) if tunnel_obj: err += f"on tunnel {tunnel_obj}" cnxs = [x.to_dict_old() for x in cnxs_obj.cnxs] raise NotFoundError(tablize_cnxs(cnxs=cnxs, err=err))
[docs] def get_by_label( self, value: str, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, ) -> dict: """Get a connection for an adapter on a node using a specific connection identifier key. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Get a single connection by connection label >>> cnx = client.adapters.cnx.get_by_label( ... value='test label', adapter_name='active_directory' ... ) Args: value (str): value that connection_label must match for a connection adapter_name (str): name of adapter adapter_node (Optional[str], optional): name of node running adapter tunnel (Optional[str], optional): name or ID of tunnel Raises: NotFoundError: when no connections found with supplied connection label Returns: dict: connection metadata for label on adapter """ def is_match(cnx): if cnx.node_id != node_id: return False if (tunnel_obj and cnx.tunnel_id) and tunnel_obj.id != cnx.tunnel_id: return False if value == cnx.connection_label: return True return False adapter = self.parent.get_by_name(name=adapter_name, node=adapter_node, get_clients=False) node_id = adapter["node_meta"]["node_id"] node_name = adapter["node_meta"]["node_name"] cnxs_obj = self._get(adapter_name=adapter["name_raw"]) tunnel_obj = self.parent.instances.get_tunnel(value=tunnel) for cnx in cnxs_obj.cnxs: if is_match(cnx=cnx): return cnx.to_dict_old() err = ( f"No connection found on adapter {adapter_name!r} node {node_name!r} " f"with a connection label of {value!r}" ) if tunnel_obj: err += f"on tunnel {tunnel_obj}" cnxs = [x.to_dict_old() for x in cnxs_obj.cnxs] raise NotFoundError(tablize_cnxs(cnxs=cnxs, err=err))
[docs] def get_by_id( self, cnx_id: str, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, **kwargs, ) -> dict: """Get a connection for an adapter on a node by ID. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Get a single connection by ID >>> cnx = client.adapters.cnx.get_by_id( ... cnx_id='192.168.1.10', adapter_name='active_directory' ... ) Notes: ID is constructed from some variance of connection keys, usually "domain" (i.e. for active_directory ``TestDomain.test``) Args: cnx_id (str): connection ID to get adapter_name (str): name of adapter adapter_node (Optional[str], optional): name of node running tunnel (Optional[str], optional): name or ID of tunnel **kwargs: passed to :meth:`get_by_key` Raises: NotFoundError: when no connections found with supplied id Returns: dict: connection metadata for id on adapter """ def is_match(cnx): if cnx.node_id != node_id: return False if (tunnel_obj and cnx.tunnel_id) and tunnel_obj.id != cnx.tunnel_id: return False if cnx_id in [cnx.client_id, cnx.uuid]: return True return False adapter = self.parent.get_by_name(name=adapter_name, node=adapter_node, get_clients=False) node_id = adapter["node_meta"]["node_id"] node_name = adapter["node_meta"]["node_name"] cnxs_obj = self._get(adapter_name=adapter["name_raw"]) tunnel_obj = self.parent.instances.get_tunnel(value=tunnel) for cnx in cnxs_obj.cnxs: if is_match(cnx=cnx): return cnx.to_dict_old() err = ( f"No connection found on adapter {adapter_name!r} node {node_name!r} " f"with client ID or UUID of {cnx_id!r}" ) if tunnel_obj: err += f"on tunnel {tunnel_obj}" cnxs = [x.to_dict_old() for x in cnxs_obj.cnxs] raise NotFoundError(tablize_cnxs(cnxs=cnxs, err=err))
[docs] def update_by_id( self, cnx_id: str, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, **kwargs, ) -> dict: """Update a connection for an adapter on a node by ID. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Change the connection label for a connection >>> cnx = client.adapters.cnx.update_by_id( ... cnx_id='TestDomain.test', ... adapter_name='active_directory', ... connection_label="new label", ... ) Args: cnx_id: connection ID to update adapter_name: name of adapter adapter_node: name of node running adapter tunnel (Optional[str], optional): name or ID of tunnel **kwargs: passed to :meth:`update_cnx` Returns: dict: updated connection metadata """ cnx = self.get_by_id( cnx_id=cnx_id, adapter_name=adapter_name, adapter_node=adapter_node, tunnel=tunnel ) return self.update_cnx(cnx_update=cnx, **kwargs)
[docs] def delete_by_id( self, cnx_id: str, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, delete_entities: bool = False, ) -> str: """Delete a connection for an adapter on a node by connection ID. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Delete a connection by ID >>> cnx = client.adapters.cnx.delete_by_id( ... cnx_id='192.168.1.10', adapter_name='active_directory' ... ) Args: cnx_id: connection ID to delete adapter_name: name of adapter adapter_node: name of node running adapter tunnel (Optional[str], optional): name or ID of tunnel delete_entities: delete all assets fetched by this connection """ cnx = self.get_by_id( cnx_id=cnx_id, adapter_name=adapter_name, adapter_node=adapter_node, tunnel=tunnel ) return self.delete_cnx(cnx_delete=cnx, delete_entities=delete_entities)
[docs] def test_by_id(self, **kwargs) -> dict: """Test a connection for an adapter on a node by ID. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Test the reachability of a connection by ID >>> cnx = client.adapters.cnx.test_by_id( ... cnx_id='192.168.1.10', adapter_name='active_directory' ... ) Args: **kwargs: passed to :meth:`get_by_id` """ cnx = self.get_by_id(**kwargs) return self.test_cnx(cnx_test=cnx)
[docs] def add( self, adapter_name: str, adapter_node: Optional[str] = None, save_and_fetch: bool = True, active: bool = True, tunnel: Optional[Union[Tunnel, str]] = None, connection_label: Optional[str] = None, kwargs_config: Optional[dict] = None, new_config: Optional[dict] = None, parse_config: bool = True, internal_axon_tenant_id: Optional[str] = None, **kwargs, ) -> dict: """Add a connection to an adapter on a node. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Establish a connection dictionary >>> config = dict( ... dc_name="192.168.1.10", ... user="svc_user", ... password="test", ... do_not_fetch_users=False, ... fetch_disabled_users=False, ... fetch_disabled_devices=False, ... is_ad_gc=False, ... ) Add a connection for an adapter to the Core instance >>> cnx = client.adapters.cnx.add(adapter_name="active_directory", **config) Args: adapter_name: name of adapter adapter_node: name of node running adapter save_and_fetch: perform a fetch when saving, or just save without fetching active: set the connection as active after creating connection_label: label to assign to connection tunnel: tunnel ID or name to use for new connection kwargs_config: connection args that conflict with this methods signature new_config: connection args that conflict with this methods signature parse_config: perform api client side parsing of connection args internal_axon_tenant_id: The ID of the Tenant that the connection is associated with **kwargs: configuration of new connection Raises: :exc:`CnxAddError`: when an error happens while adding the connection """ new_config = combo_dicts(kwargs_config, new_config, kwargs) adapter = self.parent.get_by_name(name=adapter_name, node=adapter_node, get_clients=False) tunnel_id = self.parent.instances.get_tunnel(value=tunnel, return_id=True) schemas = self._get(adapter_name=adapter["name_raw"]).schema_cnx config_label = new_config.pop("connection_label", None) connection_label = connection_label or config_label cnx_to_add = cnx_from_adapter(adapter) cnx_to_add = combo_dicts(cnx_to_add, config=new_config, schemas=schemas) if parse_config: cnx_str = ", ".join(get_cnx_strs(cnx=cnx_to_add)) source = f"adding connection {cnx_str}" new_config = self.build_config( cnx_schemas=schemas, new_config=new_config, source=source, adapter_name=adapter["name"], adapter_node=adapter["node_meta"]["name"], ) config_default( schemas=schemas, new_config=new_config, source=source, sane_defaults=self.get_sane_defaults(adapter_name=adapter["name"]), ) config_empty(schemas=schemas, new_config=new_config, source=source) config_required(schemas=schemas, new_config=new_config, source=source) response_status_hook = self.get_response_status_hook(cnx=cnx_to_add) result = self._add( connection=new_config, adapter_name=adapter["name_raw"], instance_name=adapter["node_meta"]["name"], instance_id=adapter["node_meta"]["id"], is_instances_mode=not adapter["node_meta"]["is_master"], save_and_fetch=save_and_fetch, active=active, connection_label=connection_label, response_status_hook=response_status_hook, tunnel_id=tunnel_id, internal_axon_tenant_id=internal_axon_tenant_id ) cnx_new = self.get_by_uuid( cnx_uuid=result.id, adapter_name=adapter["name"], adapter_node=adapter["node_meta"]["name"], tunnel=tunnel_id, ) if not result.working and not cnx_new["working"]: err = f"Connection was added but had a failure connecting:\n{result}" exc = CnxAddError(err) exc.result = result exc.cnx_new = cnx_new raise exc return cnx_new
[docs] def test( self, adapter_name: str, adapter_node: Optional[str] = None, tunnel: Optional[Union[Tunnel, str]] = None, kwargs_config: Optional[dict] = None, new_config: Optional[dict] = None, parse_config: bool = True, **kwargs, ) -> dict: """Test a connection to an adapter on a node. Examples: First, create a ``client`` using :obj:`axonius_api_client.connect.Connect`. Test the reachability of a connection without creating the connection >>> config = dict(dc_name="192.168.1.10", user="svc_user", password="test") >>> cnx = client.adapters.cnx.test(adapter_name="active_directory", **config) Notes: This can be used to test the configuration of a connection before creating the connection. Usually you need just whatever configuration keys are related to hostname/domain/ip address to test a connection. Args: adapter_name (str): name of adapter adapter_node (Optional[str], optional): name of node running adapter tunnel: tunnel ID or name to use for testing connection kwargs_config (Optional[dict], optional): configuration of connection to test new_config (Optional[dict], optional): configuration of connection to test parse_config (bool, optional): parse configuration for unknowns, requireds, etc **kwargs: configuration of connection to test Raises: CnxTestError: When a connection test fails ConfigRequired: When not enough arguments are supplied to test the connection """ new_config = combo_dicts(kwargs_config, new_config, kwargs) adapter = self.parent.get_by_name(name=adapter_name, node=adapter_node, get_clients=False) schemas = self._get(adapter_name=adapter["name_raw"]).schema_cnx tunnel_id = self.parent.instances.get_tunnel(value=tunnel, return_id=True) cnx_to_test = cnx_from_adapter(adapter) cnx_to_test = combo_dicts(cnx_to_test, config=new_config, schemas=schemas) if parse_config: cnx_str = ", ".join(get_cnx_strs(cnx=cnx_to_test)) source = f"testing connectivity for connection {cnx_str}" new_config = self.build_config( cnx_schemas=schemas, old_config={}, new_config=new_config, source=source, adapter_name=adapter["name"], adapter_node=adapter["node_name"], ) config_empty(schemas=schemas, new_config=new_config, source=source) cnx_to_test = combo_dicts(cnx_to_test, config=new_config) response_status_hook = self.get_response_status_hook(cnx=cnx_to_test) return self._test( adapter_name=adapter["name_raw"], instance=adapter["node_id"], connection=new_config, tunnel_id=tunnel_id, response_status_hook=response_status_hook, )
[docs] def test_cnx(self, cnx_test: dict, **kwargs) -> dict: """Test a connection for an adapter on a node. Args: cnx_test: connection fetched previously """ response_status_hook = self.get_response_status_hook(cnx=cnx_test) return self._test( adapter_name=cnx_test["adapter_name_raw"], instance=cnx_test["node_id"], connection=cnx_test["config"], tunnel_id=cnx_test.get("tunnel_id", None), response_status_hook=response_status_hook, )
[docs] def update_cnx( self, cnx_update: dict, save_and_fetch: bool = True, active: Optional[bool] = None, new_node: Optional[str] = None, new_tunnel: Optional[Union[Tunnel, str]] = None, kwargs_config: Optional[dict] = None, new_config: Optional[dict] = None, parse_config: bool = True, **kwargs, ) -> dict: """Update a connection for an adapter on a node. Args: cnx_update: connection fetched previously save_and_fetch: perform a fetch when saving, or just save without fetching active: set the connection as active or inactive after updating new_node: move connection to a new node new_tunnel: move connection to a new tunnel ID or name kwargs_config: connection args that conflict with this methods signature new_config: connection args that conflict with this methods signature parse_config: perform api client side parsing of connection args **kwargs: configuration of connection to update Raises: CnxUpdateError: When an error occurs while updating the connection Returns: dict: updated connection metadata """ node_name = cnx_update["node_name"] adapter_name = cnx_update["adapter_name"] adapter_old = self.parent.get_by_name(name=adapter_name, node=node_name) tunnel_id_old = cnx_update.get("tunnel_id", None) if new_node: adapter_new = self.parent.get_by_name(name=adapter_name, node=new_node) else: adapter_new = adapter_old if new_tunnel: tunnel_id_new = self.parent.instances.get_tunnel(value=new_tunnel, return_id=True) else: tunnel_id_new = tunnel_id_old new_config = combo_dicts(kwargs_config, new_config, kwargs) active = active if isinstance(active, bool) else cnx_update["active"] cnx_to_update = combo_dicts( cnx_update, node_id=adapter_new["node_meta"]["id"], node_name=adapter_new["node_meta"]["name"], ) if parse_config: cnx_str = ", ".join(get_cnx_strs(cnx=cnx_to_update)) source = f"updating settings for connection {cnx_str}" new_config = self.build_config( cnx_schemas=cnx_update["schemas"], old_config=cnx_update["config"], new_config=new_config, source=source, adapter_name=adapter_new["name"], adapter_node=adapter_new["node_meta"]["name"], ) config_empty(schemas=cnx_update["schemas"], new_config=new_config, source=source) config_unchanged( schemas=cnx_update["schemas"], old_config=cnx_update["config"], new_config=new_config, source=source, ) cnx_to_update = combo_dicts(cnx_update, config=new_config) response_status_hook = self.get_response_status_hook(cnx=cnx_to_update) result = self._update( uuid=cnx_update["uuid"], connection=new_config, adapter_name=cnx_update["adapter_name_raw"], instance_name=adapter_new["node_meta"]["name"], instance_id=adapter_new["node_meta"]["id"], instance_prev=adapter_old["node_meta"]["id"], instance_prev_name=adapter_old["node_meta"]["name"], is_instances_mode=not adapter_new["node_meta"]["is_master"], tunnel_id=tunnel_id_new, save_and_fetch=save_and_fetch, active=active, connection_label=cnx_update["connection_label"], response_status_hook=response_status_hook, ) cnx_new = self.get_by_uuid( cnx_uuid=result.id, adapter_name=cnx_update["adapter_name"], adapter_node=adapter_new["node_meta"]["name"], tunnel=tunnel_id_new, ) if not result.working and not cnx_new["working"]: err = f"Connection configuration was updated but had a failure connecting:\n{result}" exc = CnxUpdateError(err) exc.result = result exc.cnx_old = cnx_update exc.cnx_new = cnx_new raise exc return cnx_new
[docs] def delete_cnx(self, cnx_delete: dict, delete_entities: bool = False) -> CnxDelete: """Delete a connection for an adapter on a node. Args: cnx_delete (dict): connection fetched previously delete_entities (bool, optional): delete all assets fetched by this connection Returns: CnxDelete: dataclass model containing response """ node = self.parent.instances.get_by_name(name=cnx_delete["node_name"]) response_status_hook = self.get_response_status_hook(cnx=cnx_delete) return self._delete( adapter_name=cnx_delete["adapter_name_raw"], uuid=cnx_delete["uuid"], delete_entities=delete_entities, instance_id=cnx_delete["node_id"], instance_name=cnx_delete["node_name"], is_instances_mode=not node["is_master"], response_status_hook=response_status_hook, )
[docs] def set_cnx_active(self, cnx: dict, value: bool, save_and_fetch: bool = False) -> dict: """Set a connection to active. Args: cnx (dict): Previously fetched connection object value (bool): Set cnx as active or inactive save_and_fetch (bool, optional): When saving, perform a fetch as well Returns: dict: updated cnx object """ node = self.parent.instances.get_by_name(name=cnx["node_name"]) response_status_hook = self.get_response_status_hook(cnx=cnx) result = self._update( uuid=cnx["uuid"], connection=cnx["config"], adapter_name=cnx["adapter_name_raw"], instance_name=node["name"], instance_id=node["id"], instance_prev=node["id"], instance_prev_name=node["name"], tunnel_id=cnx.get("tunnel_id", None), is_instances_mode=not node["is_master"], save_and_fetch=save_and_fetch, active=value, connection_label=cnx["connection_label"], response_status_hook=response_status_hook, ) cnx = self.get_by_uuid( cnx_uuid=result.id, adapter_name=cnx["adapter_name"], adapter_node=node["name"], tunnel=cnx.get("tunnel_id", None), ) return cnx
[docs] def set_cnx_label(self, cnx: dict, value: str, save_and_fetch: bool = False) -> dict: """Set a connections label. Args: cnx (dict): Previously fetched connection object value (bool): label to apply to cnx save_and_fetch (bool, optional): When saving, perform a fetch as well Returns: dict: updated cnx object """ node = self.parent.instances.get_by_name(name=cnx["node_name"]) response_status_hook = self.get_response_status_hook(cnx=cnx) result = self._update( uuid=cnx["uuid"], connection=cnx["config"], adapter_name=cnx["adapter_name_raw"], instance_name=node["name"], instance_id=node["id"], instance_prev=node["id"], instance_prev_name=node["name"], is_instances_mode=not node["is_master"], tunnel_id=cnx.get("tunnel_id", None), save_and_fetch=save_and_fetch, active=cnx["active"], connection_label=value, response_status_hook=response_status_hook, ) cnx = self.get_by_uuid( cnx_uuid=result.id, adapter_name=cnx["adapter_name"], adapter_node=node["name"], tunnel=cnx.get("tunnel_id", None), ) return cnx
[docs] def build_config( self, cnx_schemas: List[dict], new_config: dict, source: str, adapter_name: str, adapter_node: str, old_config: Optional[dict] = None, ) -> dict: """Build and parse a configuration for a connection. Args: cnx_schemas: configuration schemas for connection new_config: new configuration for connection source: description of what called this method adapter_name: name of adapter adapter_node: name of node running adapter old_config: configuration that is being updated """ old_config = old_config or {} callbacks = { "cb_file": self.cb_file_upload, "adapter_name": adapter_name, "adapter_node": adapter_node, } config_unknown( schemas=cnx_schemas, new_config=new_config, source=source, callbacks=callbacks, ) new_config = config_build( schemas=cnx_schemas, old_config=old_config, new_config=new_config, source=source, callbacks=callbacks, ) return new_config
[docs] def get_sane_defaults(self, adapter_name: str) -> dict: """Get the API client defined sane defaults for a specific adapter. Args: adapter_name: name of adapter """ return CNX_SANE_DEFAULTS.get(adapter_name, CNX_SANE_DEFAULTS["all"])
[docs] def cb_file_upload( self, value: Union[str, pathlib.Path, dict], schema: dict, callbacks: dict, source: str, ) -> dict: """Config parsing callback to upload a file for a connection. Args: value: file to upload schema: connection configuration schema of type "file" callbacks: callbacks supplied source: description of what called this method Raises: :exc:`ConfigInvalidValue`: When value is a path that does not exist, or a dictionary that does not have 'uuid' and 'filename' keys, or if value is not a file """ def fail(msg): sinfo = config_info(schema=schema, value=str(value), source=source) msgs = [sinfo, "", f"supplied value: {orig!r}", f"parsed value: {value!r}", msg] raise ConfigInvalidValue("\n".join(msgs)) orig = value adapter_name = callbacks["adapter_name"] adapter_node = callbacks["adapter_node"] field_name = schema["name"] value = json_load(obj=value, error=False, load_file=False) file_content = None if isinstance(value, dict): if value.get("uuid") and value.get("filename"): return {"uuid": value["uuid"], "filename": value["filename"]} fail("supplied dictionary must have uuid and filename keys") if isinstance(value, pathlib.Path): if not value.is_file(): fail("supplied pathlib does not exist as a file") file_content = value.read_text() if isinstance(value, str): value = pathify(value) if not is_existing_file(value): fail("supplied string does not exist as a file") file_content = value.read_text() if file_content: return self.parent.file_upload( name=adapter_name, field_name=field_name, file_name=value.name, file_content=file_content, node=adapter_node, ) fail("Unable to load contents from file")
[docs] def _add( self, connection: dict, instance_id: str, instance_name: str, adapter_name: str, connection_discovery: Optional[dict] = None, is_instances_mode: bool = False, save_and_fetch: bool = True, active: bool = True, connection_label: Optional[str] = None, tunnel_id: Optional[str] = None, response_status_hook: Optional[Callable] = None, internal_axon_tenant_id: Optional[str] = None, ) -> CnxCreate: """Pass.""" api_endpoint = ApiEndpoints.adapters.cnx_create request_payload: dict = { "connection": connection, "connection_discovery": connection_discovery, "instance": instance_id, "instance_name": instance_name, "is_instances_mode": is_instances_mode, "active": active, "save_and_fetch": save_and_fetch, "connection_label": connection_label, "tunnel_id": tunnel_id, } if internal_axon_tenant_id: request_payload["internal_axon_tenant_id"] = internal_axon_tenant_id request_obj = api_endpoint.load_request(**request_payload) return api_endpoint.perform_request( http=self.auth.http, request_obj=request_obj, adapter_name=adapter_name, response_status_hook=response_status_hook, )
[docs] def _test( self, adapter_name: str, instance: str, connection: dict, tunnel_id: Optional[str] = None, response_status_hook: Optional[Callable] = None, ) -> dict: """Direct API method to add a connection to an adapter. Args: adapter_name (str): raw name of the adapter i.e. ``aws_adapter`` instance (str): id of node running adapter connection (dict): configuration to test tunnel_id (Optional[str], optional): tunnel ID to use response_status_hook (Optional[Callable], optional): callable to use when checking status codes of response Returns: dict: containing response """ api_endpoint = ApiEndpoints.adapters.cnx_test request_obj = api_endpoint.load_request( connection=connection, instance=instance, tunnel_id=tunnel_id, ) return api_endpoint.perform_request( http=self.auth.http, request_obj=request_obj, adapter_name=adapter_name, response_status_hook=response_status_hook, )
[docs] def _get(self, adapter_name: str) -> Cnxs: """Get all connections for a given adapter. Args: adapter_name (str): name of adapter Returns: Cnxs: dataclass model containing response """ adapter_name += "" if adapter_name.endswith("_adapter") else "_adapter" api_endpoint = ApiEndpoints.adapters.cnx_get ret = api_endpoint.perform_request(http=self.auth.http, adapter_name=adapter_name) ret.adapter_name = strip_right(obj=adapter_name, fix="_adapter") return ret
[docs] def _get_labels(self) -> CnxLabels: """Get all connection labels. Returns: CnxLabels: dataclass model containing response """ return ApiEndpoints.adapters.cnx_get_labels.perform_request(http=self.auth.http)
[docs] def _delete( self, uuid: str, adapter_name: str, instance_id: str, instance_name: str, delete_entities: bool = False, is_instances_mode: bool = False, response_status_hook: Optional[Callable] = None, ) -> CnxDelete: """Direct API method to delete a connection from an adapter. Args: uuid (str): uuid of connection adapter_name (str): name of adapter instance_id (str): UUID of instance to delete connection from instance_name (str): NAME of instance to delete connection from delete_entities (bool, optional): delete all assets fetched by this connection is_instances_mode (bool, optional): instance_id & instance_name is NOT the Core instance response_status_hook (Optional[Callable], optional): callable to use when checking status codes of response Returns: CnxDelete: dataclass model containing response """ api_endpoint = ApiEndpoints.adapters.cnx_delete request_obj = api_endpoint.load_request( instance=instance_id, instance_name=instance_name, delete_entities=delete_entities, is_instances_mode=is_instances_mode, ) return api_endpoint.perform_request( http=self.auth.http, request_obj=request_obj, adapter_name=adapter_name, uuid=uuid, response_status_hook=response_status_hook, )
[docs] def _update( self, uuid: str, connection: dict, instance_id: str, instance_name: str, adapter_name: str, instance_prev: Optional[str] = None, instance_prev_name: Optional[str] = None, tunnel_id: Optional[str] = None, connection_discovery: Optional[dict] = None, is_instances_mode: bool = False, save_and_fetch: bool = True, active: bool = True, connection_label: Optional[str] = None, response_status_hook: Optional[Callable] = None, ) -> CnxUpdate: """Pass. Args: uuid (str): uuid of connection connection (dict): new configuration to apply instance_id (str): UUID of instance to move connection to instance_name (str): NAME of instance to move connection to adapter_name (str): name of adapter tunnel_id (Optional[str], optional): tunnel ID to use instance_prev (Optional[str], optional): UUID of instance to move connection from instance_prev_name (Optional[str], optional): NAME of instance to move connection from connection_discovery (Optional[dict], optional): connection specific discovery settings to update is_instances_mode (bool, optional): instance_id & instance_name is NOT the Core instance save_and_fetch (bool, optional): save and fetch the connection, or just save it active (bool, optional): set connection as active connection_label (Optional[str], optional): label to set on connection response_status_hook (Optional[Callable], optional): callable to use when checking status codes of response Returns: CnxModifyResponse: dataclass model containing response """ api_endpoint = ApiEndpoints.adapters.cnx_update request_obj = api_endpoint.load_request( connection=connection, connection_discovery=connection_discovery, instance=instance_id, instance_name=instance_name, instance_prev=instance_prev, instance_prev_name=instance_prev_name, tunnel_id=tunnel_id, is_instances_mode=is_instances_mode, active=active, save_and_fetch=save_and_fetch, connection_label=connection_label, ) return api_endpoint.perform_request( http=self.auth.http, request_obj=request_obj, adapter_name=adapter_name, uuid=uuid, response_status_hook=response_status_hook, )
[docs] def get_response_status_hook(self, cnx: dict) -> Callable: """Check if the result of updating a connection shows that the connection is gone. Args: cnx (dict): Connection metadata to use in response hook Returns: Callable: response status hook """ def response_status_hook(http: Http, response: requests.Response, **kwargs) -> bool: """Pass.""" ret = False for error_map in ERROR_MAPS: ret = error_map.handle_exc(response=response, cnx=cnx, apiobj=self) return ret return response_status_hook
[docs]@dataclasses.dataclass class ErrorMap: """Mapping container for use in :meth:`Cnx.get_response_status_hook`.""" response_regexes: List[str] """List of regexes that if they match response body, will throw :attr:`exc`""" err: str """Error string to include in exception if :attr:`response_regexes` match.""" exc: Type[Exception] """Exception class to throw if :attr:`response_regexes` match.""" endpoint_regexes: List[str] = dataclasses.field(default_factory=list) """List of regexes that if they match request URL, will check :attr:`response_regexes`.""" with_schemas: bool = True """Include connection configuration schemas in error output.""" with_config: bool = True """Include connection configuration in error output.""" with_cnxs: bool = True """Include all connections for adapter in question in error output.""" skip_status: bool = False """Skip the rest of the built in status checks."""
[docs] def __post_init__(self): """Post init dataclass setup.""" self.response_regexes = [re.compile(x, re.I) for x in listify(self.response_regexes)] self.endpoint_regexes = [re.compile(x, re.I) for x in listify(self.endpoint_regexes)]
[docs] def is_response_error(self, response: requests.Response) -> bool: """Check if response matches :attr:`endpoint_regexes` and :attr:`response_regexes`. Args: response (requests.Response): response object to check Returns: bool: if response matches this error map """ def any_match(regexes, value): return any([x.search(value) for x in listify(regexes)]) if self.endpoint_regexes and not any_match( self.endpoint_regexes, response.url ): # pragma: no cover return False if any_match(self.response_regexes, response.text): return True return False
[docs] def handle_exc(self, response: requests.Response, cnx: dict, apiobj: Cnx) -> bool: """Handle raising an exception if :meth:`is_response_error`. Args: response (requests.Response): response object to check cnx (dict): connection metadata apiobj (Cnx): API model to use to fetch connections Returns: bool: return :attr:`skip_status` Raises: Exception: of type :attr:`exc` """ if self.is_response_error(response=response): cnx_strs = get_cnx_strs( cnx=cnx, with_schemas=self.with_schemas, with_config=self.with_config ) errs = [self.err, "", *cnx_strs] err = "\n".join(errs) if self.with_cnxs: cnxs = apiobj.get_by_adapter( adapter_name=cnx["adapter_name"], adapter_node=cnx["node_name"] ) err = tablize_cnxs(cnxs=cnxs, err=err) exc = self.exc(err) exc.cnx = cnx raise exc return self.skip_status
CNX_STRS: dict = { "adapter_name": "Adapter Name: {adapter_name!r}", "node_name": "Instance name: {node_name!r}", "node_id": "Instance ID: {node_id!r}", "id": "Connection ID: {node_id!r}", "uuid": "Connection UUID: {node_id!r}", "active": "Is active: {active}", "status": "Status: {status}", }
[docs]def get_cnx_strs(cnx: dict, with_config: bool = False, with_schemas: bool = False) -> List[str]: """Build a string with details about a connection. Args: cnx (dict): connection to build string for with_config (bool, optional): include connection config in output with_schemas (bool, optional): include connection config schemas in output Returns: List[str]: connection details """ ret = [v.format(**cnx) for k, v in CNX_STRS.items() if k in cnx] if with_config and "config" in cnx: ret += ["Configuration:", f"{json_dump(cnx['config'])}"] if with_schemas and "schemas" in cnx: ret += ["Configuration Schema:", tablize_schemas(schemas=cnx["schemas"])] return ret
[docs]def cnx_from_adapter(adapter: dict) -> dict: """Build a base dict for a connection with details from an adapter metadata dict. Args: adapter (dict): adapter to get details from Returns: dict: base dict for connection """ ret = {} ret["adapter_name"] = adapter["name"] ret["adapter_name_raw"] = adapter["name_raw"] ret["node_name"] = adapter["node_name"] ret["node_id"] = adapter["node_id"] return ret
# NB: Could be dataclass ERROR_MAPS: List[ErrorMap] = [ ErrorMap( response_regexes="type.*InvalidId", exc=CnxError, err="Invalid instance ID or instance name", ), ErrorMap( response_regexes="type.*JSONDecodeError", exc=ConfigRequired, err="Connection configuration missing required keys", ), ErrorMap( response_regexes="Adapter name and connection data are required", exc=ConfigRequired, err="Connection configuration missing required keys", ), ErrorMap( response_regexes="invalid client", with_cnxs=False, exc=ConfigRequired, err="Connection configuration missing required keys", ), ErrorMap( response_regexes="Adapter.*not found", with_cnxs=False, exc=CnxError, err="Invalid adapter name", ), ErrorMap( response_regexes="Server is already gone", exc=CnxGoneError, err="Connection is gone - updated or deleted by someone else!", ), ErrorMap( response_regexes="Client is not reachable", with_cnxs=False, exc=CnxTestError, err="Connection test failed", ), ]