Source code for axonius_api_client.api.system.dashboard

# -*- coding: utf-8 -*-
"""API for working with dashboards and discovery lifecycle."""
import dataclasses
import datetime
import time
import typing as t

from ...data import PropsData
from ...tools import coerce_int, dt_now, dt_parse, trim_float
from .. import json_api
from ..api_endpoints import ApiEndpoints
from ..mixins import ModelMixins

PROPERTIES_PHASE: t.List[str] = ["name", "human_name", "is_done", "progress"]
PROPERTIES: t.List[str] = [
    "is_running",
    "is_correlation_finished",
    "status",
    "current_run_duration_in_minutes",
    "last_run_finish_date",
    "last_run_start_date",
    "last_run_duration_in_minutes",
    "last_run_minutes_ago",
    "next_run_start_date",
    "next_run_starts_in_minutes",
]


[docs]@dataclasses.dataclass class DiscoverPhase(PropsData): """Pass.""" raw: dict
[docs] def to_str_properties(self) -> t.List[str]: """Pass.""" return [f"Name: {self.human_name}", f"Is Done: {self.is_done}"]
[docs] def to_str_progress(self) -> t.List[str]: """Pass.""" return [f"{k}: {', '.join(v)}" for k, v in self.progress.items()]
@property def _properties(self) -> t.List[str]: return PROPERTIES_PHASE @property def name(self) -> str: """Pass.""" return self.raw["name"] @property def human_name(self) -> str: """Pass.""" return self.name_map.get(self.name, self._human_key(self.name)) @property def is_done(self) -> bool: """Pass.""" return self.raw["status"] == 1 @property def progress(self) -> t.Dict[str, t.List[str]]: """Pass.""" items = self.raw["additional_data"].items() return {status: [k for k, v in items if v == status] for _, status in items} @property def name_map(self) -> dict: """Pass.""" return { "Fetch_Devices": "Fetch Stage 1", "Fetch_Scanners": "Fetch Stage 2", "Clean_Devices": "Clean Assets", "Pre_Correlation": "Correlation Pre", "Run_Correlations": "Correlation Run", "Post_Correlation": "Correlation Post", "Run_Queries": "Calculate Queries", "Save_Historical": "Save History Snapshot", }
[docs]@dataclasses.dataclass class DiscoverData(PropsData): """Pass.""" raw: dict adapters: t.List[dict] = dataclasses.field(default_factory=list, repr=False) @property def _properties(self) -> t.List[str]: return PROPERTIES
[docs] def to_str_progress(self) -> t.List[str]: """Pass.""" return [x["str"] for x in self.progress]
[docs] def to_str_phases(self) -> t.List[str]: """Pass.""" return [f"{x.human_name}: {x.status}" for x in self.phases]
@property def phases_dict(self) -> dict: """Pass.""" return {x.name: x for x in self.phases}
[docs] def to_dict(self, dt_obj: bool = False) -> dict: """Pass.""" ret = super().to_dict(dt_obj=dt_obj) ret["phases"] = [x.to_dict() for x in self.phases] ret["progress"] = self.progress return ret
@property def last_run_finish_date(self) -> t.Optional[datetime.datetime]: """Pass.""" dt = self.raw["last_finished_time"] return dt_parse(obj=dt) if dt else None @property def last_run_start_date(self) -> t.Optional[datetime.datetime]: """Pass.""" dt = self.raw["last_start_time"] return dt_parse(obj=dt) if dt else None @property def current_run_duration_in_minutes(self) -> t.Optional[float]: """Pass.""" dt = self.last_run_start_date return trim_float(value=(dt_now() - dt).total_seconds() / 60) if self.is_running else None @property def last_run_duration_in_minutes(self) -> t.Optional[float]: """Pass.""" start = self.last_run_start_date finish = self.last_run_finish_date check = (start and finish) and finish >= start return trim_float(value=(finish - start).total_seconds() / 60) if check else None @property def last_run_minutes_ago(self) -> t.Optional[float]: """Pass.""" finish = self.last_run_finish_date return trim_float(value=(dt_now() - finish).total_seconds() / 60) if finish else None @property def next_run_starts_in_minutes(self) -> float: """Pass.""" return trim_float(value=self.raw["next_run_time"] / 60) @property def next_run_start_date(self) -> datetime.datetime: """Pass.""" return dt_now() + datetime.timedelta(seconds=self.raw["next_run_time"]) @property def correlation_stage(self) -> str: """Pass.""" return "Post_Correlation" @property def correlation_phase(self) -> t.Optional[DiscoverPhase]: """Pass.""" return self.phases_dict.get(self.correlation_stage) @property def is_correlation_finished(self) -> bool: """Pass.""" if not self.is_running: return True elif isinstance(self.correlation_phase, DiscoverPhase) and self.correlation_phase.is_done: return True return False @property def running_status_map(self) -> dict: """Pass.""" return { "done": False, "stopping": False, "starting": True, "running": True, } @property def is_running(self) -> bool: """Pass.""" return self.running_status_map.get(self.status, False) @property def status(self) -> str: """Pass.""" return self.raw["status"] @property def progress(self) -> t.List[dict]: """Pass.""" plugin_map = {x["name_plugin"]: x for x in self.adapters} ret = [] for phase in self.phases: for status, plugin_names in phase.progress.items(): for plugin_name in plugin_names: # pragma: no cover adapter = plugin_map.get(plugin_name, {}) value = { "node": adapter.get("node_name", "unknown"), "adapter": adapter.get("name", plugin_name), "status": status, } value["str"] = ", ".join(f"{self._human_key(k)}: {v}" for k, v in value.items()) ret.append(value) return ret @property def phases(self) -> t.List[DiscoverPhase]: """Pass.""" self._has_running = False def get_status(phase): # pragma: no cover if not self.is_running: return "n/a" if phase.is_done: return "done" if self._has_running: return "pending" self._has_running = True return "running" def get_phase(raw): phase = DiscoverPhase(raw=raw) phase.status = get_status(phase) return phase return [get_phase(x) for x in self.raw["sub_phases"]]
[docs] def next_run_within_minutes(self, value: t.Union[int, str]) -> bool: """Pass.""" return coerce_int(obj=value, min_value=0) >= int(self.next_run_starts_in_minutes)
[docs] def get_stability( self, for_next_minutes: t.Optional[int] = None, start_check: t.Union[int, float] = 0.5 ) -> t.Tuple[str, bool]: """Pass.""" current_run = self.current_run_duration_in_minutes if self.is_running: pre = f"Discover is running ({current_run} minutes so far)" if ( isinstance(current_run, (int, float)) and isinstance(start_check, (int, float)) ) and current_run <= start_check: return f"{pre} - started less than {start_check} minutes ago", False if self.is_correlation_finished: # pragma: no cover return f"{pre} - correlation has finished", True return f"{pre} - correlation has NOT finished", False next_mins = self.next_run_starts_in_minutes reason = f"Discover is not running and next is in {next_mins} minutes" if isinstance(for_next_minutes, int): if self.next_run_within_minutes(for_next_minutes): return f"{reason} (less than {for_next_minutes} minutes)", False return f"{reason} (more than {for_next_minutes} minutes)", True return reason, True
[docs]class Dashboard(ModelMixins): """API for working with discovery lifecycle. Examples: * Get discover lifecycle metadata: :meth:`get` * See if a lifecycle is currently running: :meth:`is_running` * Start a discover lifecycle: :meth:`start` * Stop a discover lifecycle: :meth:`stop` """
[docs] def get(self) -> DiscoverData: """Get lifecycle metadata. Examples: Create a ``client`` using :obj:`axonius_api_client.connect.Connect` >>> data = client.dashboard.get() >>> data.next_run_starts_in_minutes 551 >>> data.is_running False """ return DiscoverData( raw=self._get().to_dict(), adapters=self.adapters.get(get_clients=False) )
@property def is_running(self) -> bool: """Check if discovery cycle is running. Examples: Create a ``client`` using :obj:`axonius_api_client.connect.Connect` >>> data = client.dashboard.is_running False """ return self.get().is_running
[docs] def start(self) -> DiscoverData: """Start a discovery cycle if one is not running. Examples: Create a ``client`` using :obj:`axonius_api_client.connect.Connect` >>> data = client.dashboard.start() >>> data.is_running True >>> j(data['phases_pending']) [ "Fetch_Devices", "Fetch_Scanners", "Clean_Devices", "Pre_Correlation", "Run_Correlations", "Post_Correlation", "Run_Queries", "Save_Historical" ] >>> j(data['phases_done']) [] """ if not self.is_running: self._start() time.sleep(2) return self.get()
[docs] def stop(self) -> DiscoverData: """Stop a discovery cycle if one is running. Examples: Create a ``client`` using :obj:`axonius_api_client.connect.Connect` >>> data = client.dashboard.start() >>> data.is_running True """ if self.is_running: self._stop() time.sleep(2) return self.get()
[docs] def _get(self) -> json_api.lifecycle.Lifecycle: """Direct API method to get discovery cycle metadata.""" api_endpoint = ApiEndpoints.lifecycle.get return api_endpoint.perform_request(http=self.auth.http)
[docs] def _start(self) -> str: """Direct API method to start a discovery cycle.""" api_endpoint = ApiEndpoints.lifecycle.start return api_endpoint.perform_request(http=self.auth.http)
[docs] def _stop(self) -> str: """Direct API method to stop a discovery cycle.""" api_endpoint = ApiEndpoints.lifecycle.stop return api_endpoint.perform_request(http=self.auth.http)
[docs] def _init(self, **kwargs): """Post init method for subclasses to use for extra setup.""" from ..adapters.adapters import Adapters self.adapters: Adapters = Adapters(auth=self.auth) """Work with adapters"""