Source code for axonius_api_client.connect

"""Easy all-in-one connection handler."""
import logging
import logging.handlers
import platform
import re
import types
import typing as t

import requests

from . import api, logs, tools, version
from .auth import AuthApiKey, AuthCredentials, AuthModel, AuthNull
from .constants.ctypes import PathLike
from .constants.logs import (
    LOG_FILE_MAX_FILES,
    LOG_FILE_MAX_MB,
    LOG_FILE_NAME,
    LOG_FILE_PATH,
    LOG_FMT_BRIEF,
    LOG_FMT_VERBOSE,
    LOG_LEVEL_API,
    LOG_LEVEL_AUTH,
    LOG_LEVEL_CONSOLE,
    LOG_LEVEL_ENDPOINTS,
    LOG_LEVEL_FILE,
    LOG_LEVEL_PACKAGE,
)
from .exceptions import ConnectError, InvalidCredentials
from .http import Http, T_Cookies, T_Headers
from .projects import cert_human
from .projects.cf_token import constants as cf_constants
from .setup_env import get_env_ax


[docs]class Connect: """Easy all-in-one connection handler for using the API client. Examples: >>> #!/usr/bin/env python >>> # -*- coding: utf-8 -*- >>> '''Base example for setting up the API client.''' >>> import axonius_api_client as axonapi >>> >>> # get the URL, API key, API secret, & certwarn from the default ".env" file >>> client_args = axonapi.get_env_connect() >>> >>> # OR override OS env vars with the values from a custom .env file >>> # client_args = axonapi.get_env_connect(ax_env="/path/to/envfile", override=True) >>> >>> # create a client using the url, key, and secret from OS env >>> client = axonapi.Connect(**client_args) >>> >>> j = client.jdump # json dump helper >>> >>> client.start() # connect to axonius >>> >>> # client.activity_logs # get audit logs >>> # client.adapters # get adapters and update adapter settings >>> # client.adapters.cnx # CRUD for adapter connections >>> # client.dashboard # get/start/stop discovery cycles >>> # client.dashboard_spaces # CRUD for dashboard spaces >>> # client.data_scopes # CRUD for data scopes >>> # client.devices # get device assets >>> # client.devices.fields # get field schemas for device assets >>> # client.devices.labels # add/remove/get tags for device assets >>> # client.devices.saved_queries # CRUD for saved queries for device assets >>> # client.enforcements # CRUD for enforcements >>> # client.folders # CRUD for folders >>> # client.folders.enforcements # CRUD for enforcements folders >>> # client.folders.queries # CRUD for queries folders >>> # client.instances # get instances and instance meta data >>> # client.openapi # get openapi spec >>> # client.meta # get product meta data >>> # client.remote_support # include_output/disable remote support settings >>> # client.settings_global # get/update global system settings >>> # client.settings_gui # get/update gui system settings >>> # client.settings_ip # get/update identity provider system settings >>> # client.settings_lifecycle # get/update lifecycle system settings >>> # client.signup # initial signup, password resets >>> # client.system_roles # CRUD for system roles >>> # client.system_users # CRUD for system users >>> # client.users # get user assets >>> # client.users.fields # get field schemas for user assets >>> # client.users.labels # add/remove/get tags for user assets >>> # client.users.saved_queries # CRUD for saved queries for user assets >>> # client.vulnerabilities # get vulnerability assets >>> # client.vulnerabilities.fields # get field schemas for vulnerability assets >>> # client.vulnerabilities.labels # add/remove/get tags for vulnerability assets >>> # client.vulnerabilities.saved_queries # CRUD for saved queries for vulnerability assets """ TOOLS: types.ModuleType = tools """Tools module.""" LOG_LOGGER: logging.Logger = logs.LOG """Logger for the entire package, where console and file output handlers will be attached to.""" LOG: t.Optional[logging.Logger] = None """Logger for this class.""" LOG_HTTP_MAX: bool = False """Shortcut to include_output ALL http logging *warning: very heavy log output*.""" STARTED: bool = False """Flag to indicate if client has been started.""" WRAPERROR: bool = True """Flag to indicate if client should wrap exceptions.""" _url: str = "" """Initially supplied URL of the Axonius instance.""" ARGS_HANDLER_CON: dict = {} """Arguments to use when setting up console logging.""" ARGS_HANDLER_FILE: dict = {} """Arguments to use when setting up file logging.""" ARGS_API: dict = {} """Arguments to use when setting up models.""" ARGS_ORIG: dict = {} """Original arguments supplied to the constructor.""" HANDLER_CON: t.Optional[logging.StreamHandler] = None """Console logging handler.""" HANDLER_FILE: t.Optional[logging.handlers.RotatingFileHandler] = None """File logging handler.""" http: Http = None HTTP: Http = None """HTTP client.""" auth: AuthModel = None AUTH: AuthModel = None """Authentication handler.""" AUTH_NULL: AuthModel = None """Auth model for authenticating with no auth.""" CREDENTIALS: bool = False """Flag to indicate if key & secret are actually username & password.""" API_CACHE: t.Dict[t.Type[api.ModelMixins], api.ModelMixins] = None """Cache for API Models.""" API_ATTRS: t.List[str] = [ "activity_logs", "adapters", "dashboard", "dashboard_spaces", "data_scopes", "devices", "enforcements", "folders", "instances", "meta", "openapi", "remote_support", "settings_global", "settings_gui", "settings_ip", "settings_lifecycle", "signup", "system_roles", "system_users", "users", "vulnerabilities", ] """Attributes that are API Models.""" API_LOG_LEVEL: t.Union[int, str] = LOG_LEVEL_API """Log level for API Models.""" REASON_RES: t.List[t.Pattern] = [ re.compile(r".*?object at.*?>: ([a-zA-Z0-9\]\[: ]+)"), re.compile(r".*?] (.*) "), ] """Patterns to look for in exceptions that we can pretty up for user display.""" PKG_VERSION: str = version.__version__ """Version of this package.""" PY_VERSION: str = platform.python_version() """Version of Python that this package is running on.""" ABOUT_CACHE: t.Optional[dict] = None """Cached data from the /about endpoint.""" HTTP_MAX: str = """log_request_body = True log_response_body = True log_level_http = "debug" log_level_package = "debug" log_level_console = "debug" log_level_file = "debug" log_request_attrs = "all" log_response_attrs = "all" log_body_lines = 10000 """ """Override values used when log_http_max is True.""" HTTP_MAX_CLI: str = ", ".join(HTTP_MAX.splitlines()) """CLI Help string for log_http_max."""
[docs] def __init__( # noqa: PLR0913 self, url: str, key: str, secret: str, log_console: bool = False, log_file: bool = False, log_file_rotate: bool = False, certpath: t.Optional[PathLike] = None, certverify: bool = False, certwarn: bool = True, proxy: t.Optional[str] = None, headers: t.Optional[T_Headers] = None, cookies: t.Optional[T_Cookies] = None, credentials: bool = False, timeout_connect: t.Optional[t.Union[int, float]] = Http.CONNECT_TIMEOUT, timeout_response: t.Optional[t.Union[int, float]] = Http.RESPONSE_TIMEOUT, cert_client_key: t.Optional[PathLike] = None, cert_client_cert: t.Optional[PathLike] = None, cert_client_both: t.Optional[PathLike] = None, save_history: bool = False, log_level: t.Union[str, int] = "debug", log_request_attrs: t.Optional[t.Union[str, t.Iterable[str]]] = None, log_response_attrs: t.Optional[t.Union[str, t.Iterable[str]]] = None, log_request_body: bool = False, log_response_body: bool = False, log_logger: logging.Logger = LOG_LOGGER, log_level_package: t.Union[str, int] = LOG_LEVEL_PACKAGE, log_level_endpoints: t.Union[str, int] = LOG_LEVEL_ENDPOINTS, log_level_http: t.Union[str, int] = Http.LOG_LEVEL, log_level_auth: t.Union[str, int] = LOG_LEVEL_AUTH, log_level_api: t.Union[str, int] = LOG_LEVEL_API, log_level_console: t.Union[str, int] = LOG_LEVEL_CONSOLE, log_level_file: t.Union[str, int] = LOG_LEVEL_FILE, log_console_fmt: str = LOG_FMT_BRIEF, log_http_max: bool = LOG_HTTP_MAX, log_file_fmt: str = LOG_FMT_VERBOSE, log_file_name: t.Optional[PathLike] = LOG_FILE_NAME, log_file_path: t.Optional[PathLike] = LOG_FILE_PATH, log_file_max_mb: int = LOG_FILE_MAX_MB, log_file_max_files: int = LOG_FILE_MAX_FILES, log_hide_secrets: bool = True, log_body_lines: int = Http.LOG_BODY_LINES, wraperror: bool = True, cf_token: t.Optional[str] = None, cf_url: t.Optional[str] = None, cf_path: t.Optional[PathLike] = cf_constants.CF_PATH, cf_run: bool = cf_constants.CLIENT_RUN, cf_run_login: bool = cf_constants.FLOW_RUN_LOGIN, cf_run_access: bool = cf_constants.FLOW_RUN_ACCESS, cf_env: bool = cf_constants.FLOW_ENV, cf_echo: bool = cf_constants.FLOW_ECHO, cf_echo_verbose: bool = cf_constants.FLOW_ECHO_VERBOSE, cf_error: bool = cf_constants.CLIENT_ERROR, cf_error_login: bool = cf_constants.FLOW_ERROR, cf_error_access: bool = cf_constants.FLOW_ERROR, cf_timeout_access: t.Optional[int] = cf_constants.TIMEOUT_ACCESS, cf_timeout_login: t.Optional[int] = cf_constants.TIMEOUT_LOGIN, http: t.Optional[Http] = None, auth: t.Optional[AuthModel] = None, auth_null: t.Optional[AuthModel] = None, max_retries: t.Optional[int] = Http.MAX_RETRIES, retry_backoff: t.Optional[int] = Http.RETRY_BACKOFF, **kwargs: t.Dict[str, t.Any], ) -> None: """Easy all-in-one connection handler. Args: url: URL, hostname, or IP address of Axonius instance key: API Key from account page in Axonius instance secret: API Secret from account page in Axonius instance log_console: include_output logging to console log_file: include_output logging to file certpath: path to CA bundle file to use when verifying certs offered by url certverify: raise exception if cert is self-signed or only if cert is invalid certwarn: show insecure warning once or never show insecure warning proxy: proxy to use when making https requests to url headers: additional headers to supply with every request cookies: additional cookies to supply with every request credentials: treat key as username as secret as password timeout_connect: seconds to wait for connections to open to url timeout_response: seconds to wait for responses from url cert_client_key: file with private key to offer to url cert_client_cert: file with client cert to offer to url cert_client_both: file with client cert and private key to offer to url save_history: save history of responses to Http.HISTORY log_level: log level to use for this object log_request_attrs: list of request attributes to log log_response_attrs: list of response attributes to log log_request_body: log request body log_response_body: log response body log_logger: Logger for the entire package, where console and file output handlers will be attached to log_level_package: log level to use for package root logger log_level_endpoints: log level to use for endpoint loggers log_level_http: log level to use for http loggers log_level_auth: log level to use for auth loggers log_level_api: log level to use for api loggers log_level_console: log level to use for console loggers log_level_file: log level to use for file loggers log_console_fmt: format string to use for console logging log_file_fmt: format string to use for file logging log_file_name: name of file to log to log_file_path: path to directory to log to log_file_max_mb: max size of log file in MB log_file_max_files: max number of log files to keep log_file_rotate: rotate log file on startup log_body_lines: max length of request/response body to log log_hide_secrets: hide secrets in logs log_http_max: Shortcut to include_output ALL http logging *warning: heavy log output* wraperror: wrap certain errors in a more user friendly format cf_url: URL to use in `access token` and `access login` commands, will fallback to url if not supplied cf_token: access token supplied by user, will be checked for validity if not empty cf_env: if no token supplied, try to get token from OS env var CF_TOKEN cf_run: if no token supplied or in OS env vars, try to get token from `access token` and `access login` commands cf_run_access: if run is True, try to get token from `access token`, cf_run_login: if run is True and no token returned from `access token` command, try to get token from `access login` command cf_path: path to cloudflared binary to run, can be full path or path in OS env var $PATH cf_timeout_access: timeout for `access token` command in seconds cf_timeout_login: timeout for `access login` command in seconds cf_error: raise error if an invalid token is found or no token can be found cf_error_access: raise exc if `access token` command fails and login is False cf_error_login: raise exc if `access login` command fails cf_echo: echo commands and results to stderr cf_echo_verbose: echo more to stderr http: http object to use for this connection auth: auth model to use for this connection auth_null: null auth model to use for this connection max_retries: number of times to retry a failed connection retry_backoff: number of seconds to wait between retries, will be multiplied against the current retry attempt **kwargs: unused """ self._url: str = url self.__key: str = key self.__secret: str = secret self.CLIENT = self self.API_CACHE: dict = {} self.ARGS_ORIG: dict = kwargs self.LOG_LOGGER: logging.Logger = log_logger self.LOG: logging.Logger = logs.get_obj_log(obj=self, log_level=log_level) self.LOG_HTTP_MAX: bool = tools.coerce_bool(log_http_max) self.CREDENTIALS: bool = tools.coerce_bool(credentials) if self.LOG_HTTP_MAX: log_request_body = True log_response_body = True log_level_http = "debug" log_level_package = "debug" log_level_console = "debug" log_level_file = "debug" log_request_attrs = "all" log_response_attrs = "all" if not isinstance(log_body_lines, int) or ( isinstance(log_body_lines, int) and log_body_lines < 10000 # noqa: PLR2004 ): log_body_lines = 10000 self.ARGS_HANDLER_CON: dict = { "obj": log_logger, "level": log_level_console, "fmt": log_console_fmt, } self.ARGS_HANDLER_FILE: dict = { "obj": log_logger, "level": log_level_file, "file_path": log_file_path, "file_name": log_file_name, "max_mb": log_file_max_mb, "max_files": log_file_max_files, "fmt": log_file_fmt, } self.ARGS_HTTP: dict = { "url": url, "https_proxy": proxy, "certpath": certpath, "certwarn": certwarn, "certverify": certverify, "cert_client_both": cert_client_both, "cert_client_cert": cert_client_cert, "cert_client_key": cert_client_key, "log_level": log_level_http, "log_body_lines": log_body_lines, "log_request_attrs": log_request_attrs, "log_response_attrs": log_response_attrs, "log_request_body": log_request_body, "log_response_body": log_response_body, "save_history": save_history, "connect_timeout": timeout_connect, "response_timeout": timeout_response, "headers": headers, "cookies": cookies, "cf_url": cf_url, "cf_token": cf_token, "cf_env": cf_env, "cf_run": cf_run, "cf_run_login": cf_run_login, "cf_run_access": cf_run_access, "cf_path": cf_path, "cf_timeout_access": cf_timeout_access, "cf_timeout_login": cf_timeout_login, "cf_error": cf_error, "cf_error_access": cf_error_access, "cf_error_login": cf_error_login, "cf_echo": cf_echo, "cf_echo_verbose": cf_echo_verbose, "max_retries": max_retries, "retry_backoff": retry_backoff, } self.set_wraperror(wraperror) self.set_log_hide_secrets(value=log_hide_secrets) self.set_log_level_api(value=log_level_api) self.set_log_level_package(value=log_level_package) self.set_log_level_endpoints(value=log_level_endpoints) self.control_log_file(enable=log_file, rotate=log_file_rotate) self.control_log_console(enable=log_console) self.HTTP = self.http = self._init_http(http=http) self.AUTH = self.auth = self._init_auth(auth=auth, log_level=log_level_auth) self.AUTH_NULL: AuthModel = self._init_auth_null( auth_null=auth_null, log_level=log_level_auth, ) self._init()
[docs] def start(self) -> None: """Connect to and authenticate with Axonius.""" if not self.STARTED: sysinfo_dump: dict = tools.sysinfo() self.LOG.debug(f"SYSTEM INFO: {tools.json_dump(sysinfo_dump)}") try: self.AUTH.login() except Exception as exc: # noqa: BLE001 if not self.WRAPERROR: raise pre = f"Unable to connect to {self.url!r}" connect_exc = ConnectError(f"{pre}: {exc}") if isinstance(exc, requests.ConnectTimeout): timeout = self.HTTP.CONNECT_TIMEOUT msg = f"{pre}: connection timed out after {timeout} seconds" connect_exc = ConnectError(msg) elif isinstance(exc, requests.ConnectionError): reason = self._get_exc_reason(exc=exc) connect_exc = ConnectError(f"{pre}: {reason}") elif isinstance(exc, InvalidCredentials): connect_exc = ConnectError(f"{pre}: Invalid Credentials supplied") connect_exc.exc = exc raise connect_exc from exc self.STARTED = True self.LOG.info(str(self))
# --> MODELS @property def activity_logs(self) -> api.ActivityLogs: """Work with activity logs.""" return self._get_model(model=api.ActivityLogs) @property def adapters(self) -> api.Adapters: """Work with adapters and adapter connections.""" return self._get_model(model=api.Adapters) @property def dashboard(self) -> api.Dashboard: """Work with discovery cycles.""" return self._get_model(model=api.Dashboard) @property def dashboard_spaces(self) -> api.DashboardSpaces: """Work with dashboard spaces.""" return self._get_model(model=api.DashboardSpaces) @property def data_scopes(self) -> api.DataScopes: """Work with data scopes.""" return self._get_model(model=api.DataScopes) @property def devices(self) -> api.Devices: """Work with device assets.""" return self._get_model(model=api.Devices) @property def enforcements(self) -> api.Enforcements: """Work with Enforcement Center.""" return self._get_model(model=api.Enforcements) @property def folders(self) -> api.Folders: """Work with folders for enforcements and queries.""" return self._get_model(model=api.Folders) @property def instances(self) -> api.Instances: """Work with instances.""" return self._get_model(model=api.Instances) @property def openapi(self) -> api.OpenAPISpec: """Work with the OpenAPI specification file.""" return self._get_model(model=api.OpenAPISpec) @property def meta(self) -> api.Meta: """Work with instance metadata.""" return self._get_model(model=api.Meta) @property def remote_support(self) -> api.RemoteSupport: """Work with configuring remote support.""" return self._get_model(model=api.RemoteSupport) @property def settings_global(self) -> api.SettingsGlobal: """Work with core system settings.""" return self._get_model(model=api.SettingsGlobal) @property def settings_gui(self) -> api.SettingsGui: """Work with gui system settings.""" return self._get_model(model=api.SettingsGui) @property def settings_ip(self) -> api.SettingsIdentityProviders: """Work with identity providers settings.""" return self._get_model(model=api.SettingsIdentityProviders) @property def settings_lifecycle(self) -> api.SettingsLifecycle: """Work with lifecycle system settings.""" return self._get_model(model=api.SettingsLifecycle) @property def signup(self) -> api.Signup: """Perform initial signup, password reset, and other unauthenticated endpoints.""" return self._get_model(model=api.Signup, start=False, auth=self.AUTH_NULL) @property def system_users(self) -> api.SystemUsers: """Work with system users.""" return self._get_model(model=api.SystemUsers) @property def system_roles(self) -> api.SystemRoles: """Work with system roles.""" return self._get_model(model=api.SystemRoles) @property def users(self) -> api.Users: """Work with user assets.""" return self._get_model(model=api.Users) @property def vulnerabilities(self) -> api.Vulnerabilities: """Work with vulnerability assets.""" return self._get_model(model=api.Vulnerabilities)
[docs] def set_wraperror(self, value: bool = True) -> None: """Set whether to wrap errors in a more user-friendly format.""" self.WRAPERROR = tools.coerce_bool(value)
# <-- METHODS
[docs] @staticmethod def set_log_hide_secrets(value: bool = True) -> None: """Set whether to hide secrets in logs.""" logs.HideFormatter.HIDE_ENABLED = tools.coerce_bool(value)
[docs] def set_log_level_console( self, value: t.Union[str, int] = LOG_LEVEL_CONSOLE, ) -> None: """Set the log level for this client's console output.""" if isinstance(self.ARGS_HANDLER_CON, dict): self.ARGS_HANDLER_CON["level"] = logs.str_level(value) if self.HANDLER_CON: logs.set_log_level(obj=self.HANDLER_CON, level=value)
[docs] def set_log_level_file(self, value: t.Union[str, int] = LOG_LEVEL_FILE) -> None: """Set the log level for this client's file output.""" if isinstance(self.ARGS_HANDLER_FILE, dict): self.ARGS_HANDLER_FILE["level"] = logs.str_level(value) if self.HANDLER_FILE: logs.set_log_level(obj=self.HANDLER_FILE, level=value)
[docs] def set_log_level_api(self, value: t.Union[str, int] = LOG_LEVEL_API) -> None: """Set the log level for this client's api objects.""" self.API_LOG_LEVEL: str = logs.str_level(value) for obj in self.API_CACHE.values(): if isinstance(obj, api.ModelMixins): logs.set_log_level(obj=obj.LOG, level=self.API_LOG_LEVEL)
[docs] def set_log_level_connect(self, value: t.Union[str, int] = "debug") -> None: """Set the log level for this client.""" if self.LOG: logs.set_log_level(obj=self.LOG, level=value)
[docs] def set_log_level_http(self, value: t.Union[str, int] = Http.LOG_LEVEL) -> None: """Set the log level for this client's http object.""" if isinstance(self.HTTP, Http): logs.set_log_level(obj=self.HTTP.LOG, level=value)
[docs] def set_log_level_auth(self, value: t.Union[str, int] = LOG_LEVEL_AUTH) -> None: """Set the log level for this client's auth objects.""" for obj in self.AUTH, self.AUTH_NULL: if isinstance(obj, AuthModel): logs.set_log_level(obj=obj.LOG, level=value)
[docs] def set_log_level_package( self, value: t.Union[str, int] = LOG_LEVEL_PACKAGE, ) -> None: """Set the log level for this client's package.""" logs.set_log_level(obj=self.LOG_LOGGER, level=value)
[docs] @staticmethod def set_log_level_endpoints(value: t.Union[str, int] = LOG_LEVEL_ENDPOINTS) -> None: """Set the log level for this client's endpoints.""" from axonius_api_client.api.api_endpoint import ( LOGGER as LOGGER_ENDPOINT, ) logs.set_log_level(obj=LOGGER_ENDPOINT, level=value)
[docs] def control_log_console(self, enable: bool = False) -> bool: """Add logging to console for this client.""" enable = tools.coerce_bool(enable) if enable and not self.HANDLER_CON: self.HANDLER_CON = logs.add_stderr(**self.ARGS_HANDLER_CON) self.LOG.debug("Logging to console enabled.") return True if not enable and self.HANDLER_CON: self.LOG.debug("Logging to console disabled.") self.HANDLER_CON.close() logs.del_stderr(obj=self.LOG_LOGGER) self.HANDLER_CON = None return True return False
[docs] def control_log_file(self, enable: bool = False, rotate: bool = False) -> bool: """Add logging to file for this client.""" enable = tools.coerce_bool(enable) if enable and not self.HANDLER_FILE: self.HANDLER_FILE = logs.add_file(**self.ARGS_HANDLER_FILE) self.LOG.debug("Logging to file enabled.") return True self.rotate_log_files(value=rotate) if not enable and self.HANDLER_FILE: self.LOG.debug("Logging to file disabled.") self.HANDLER_FILE.close() logs.del_file(obj=self.LOG_LOGGER) self.HANDLER_FILE = None return True return False
[docs] def rotate_log_files(self, value: bool = False) -> None: """Rollover log file.""" value = tools.coerce_bool(value) if value and self.HANDLER_FILE: self.LOG.debug("Forcing file logs to rotate") self.HANDLER_FILE.flush() try: self.HANDLER_FILE.doRollover() except Exception as exc: # pragma: no cover # noqa: BLE001 self.LOG.exception("Failed to force file logs to rotate: %s", exc) else: self.LOG.debug("Forced file logs to rotate")
@property def url(self) -> str: """Get the URL of the current instance.""" return self.HTTP.url if self.HTTP else self._url @property def api_keys(self) -> dict: """Get the API keys for the current user.""" return self.AUTH.get_api_keys() @property def current_user(self) -> t.Optional[api.json_api.account.CurrentUser]: """Get the current user (returns 404 for service accounts).""" try: return self.AUTH.get_current_user() except Exception: # noqa: BLE001 return None @property def about(self) -> dict: """Cached data from the /about endpoint.""" if self.ABOUT_CACHE: return self.ABOUT_CACHE value = self.meta.about(error=False) if value: self.ABOUT_CACHE = value return value @property def version(self) -> str: """Get the Axonius instance version.""" data = "none yet" if self.STARTED: data = ( self.about.get("Version") or self.about.get("Installed Version") or "DEMO" ) data = data.replace("_", ".") return data @property def build_date(self) -> str: """Get the Axonius instance build date.""" data = "none yet" if self.STARTED: data = self.about.get("Build Date", "UNKNOWN") return data @property def str_ax_version(self) -> str: """Get the Axonius instance version & build date for use in str.""" days = f"({tools.dt_days_ago(self.build_date)} days ago)" return ( f"Axonius Version {self.version!r}, Build Date: {self.build_date!r} {days}" ) @property def str_ax_user(self) -> str: """Get the Axonius instance user for use in str.""" value = "User: ??" if self.STARTED and self.current_user: value = self.current_user.str_connect return value @property def ssl_days_left(self) -> t.Optional[int]: """Get the number of days left until the SSL certificate expires.""" value = None if isinstance(self.HTTP, Http): cert: t.Optional[cert_human.Cert] = self.HTTP.get_cert() if isinstance(cert, cert_human.Cert): value = tools.dt_days_ago(cert.not_valid_after, from_now=False) return value @property def str_ax_cert(self) -> str: """Get the Axonius instance SSL certificate for use in str.""" value = "SSL: ??" if isinstance(self.HTTP, Http): cert: t.Optional[cert_human.Cert] = self.HTTP.get_cert() if isinstance(cert, cert_human.Cert): not_valid_after = str(cert.not_valid_after) value = f"SSL Issued To: {cert.subject_short!r}, Expires On: {not_valid_after!r}" return value @property def str_state(self) -> str: """Get the connection state for use in str.""" value = "Not connected" if self.STARTED: value = "Connected" value = f"{value} to {self.url!r}, CLIENT v{self.PKG_VERSION}, PYTHON v{self.PY_VERSION}" banner = get_env_ax().get("AX_BANNER") if banner: value = f"{value} [{banner}]" return value
[docs] @staticmethod def jdump(obj: t.Any, **kwargs) -> None: """Print object as JSON.""" tools.jdump(obj=obj, **kwargs)
[docs] def __str__(self) -> str: """Show object info.""" items: t.List[str] = [ self.str_state, self.str_ax_version, self.str_ax_cert, self.str_ax_user, ] return "\n".join(x for x in items if x)
[docs] def __repr__(self) -> str: """Show object info.""" return self.__str__()
[docs] @classmethod def _get_exc_reason(cls, exc: Exception) -> str: """Trim exceptions down to a more user-friendly display. Uses :attr:`REASON_RES` to do regex substitutions. """ reason = str(exc) for reason_re in cls.REASON_RES: if reason_re.search(reason): return reason_re.sub(r"\1", reason).rstrip("')") return reason
[docs] def _check_binding(self, value: t.Any) -> t.Any: """Check if an object is already bound to a different client.""" client = getattr(value, "CLIENT", value) if isinstance(client, self.__class__) and client is not self: err = f"{value} is already set to {client!r} and cannot be set to {self!r}" raise ConnectError(err) value.CLIENT = self return value
[docs] def _init_http(self, http: t.Optional[Http] = None) -> Http: """Initialize the HTTP object.""" if not isinstance(http, Http): http: Http = Http(**self.ARGS_HTTP) return self._check_binding(http)
[docs] def _init_auth( self, auth: t.Optional[AuthModel] = None, log_level: t.Union[str, int] = LOG_LEVEL_AUTH, ) -> AuthModel: """Initialize the Auth object.""" if not isinstance(auth, AuthModel): if self.CREDENTIALS: auth: AuthCredentials = AuthCredentials( username=self.__key, password=self.__secret, http=self.http, log_level=log_level, ) else: auth: AuthApiKey = AuthApiKey( key=self.__key, secret=self.__secret, http=self.http, log_level=log_level, ) return self._check_binding(auth)
[docs] def _init_auth_null( self, auth_null: t.Optional[AuthModel] = None, log_level: t.Union[str, int] = LOG_LEVEL_AUTH, ) -> AuthModel: """Initialize the null Auth object.""" if not isinstance(auth_null, AuthModel): auth_null: AuthNull = AuthNull(http=self.http, log_level=log_level) return self._check_binding(auth_null)
[docs] def _init(self): """Custom init for this class."""
[docs] def _get_model( self, model: t.Type[api.ModelMixins], start: bool = True, auth: t.Optional[AuthModel] = None, ) -> t.Any: """Create or get an API model. Args: model: model to create or get start: start :attr:`AUTH` if not already started auth: auth to use for this model, if not supplied default to :attr:`AUTH` Returns: model instance """ if start: self.start() if model in self.API_CACHE: return self.API_CACHE[model] if not isinstance(auth, AuthModel): auth = self.AUTH self.API_CACHE[model] = model(auth=auth, log_level=self.API_LOG_LEVEL) return self.API_CACHE[model]