# -*- coding: utf-8 -*-
"""API for working with enforcements."""
import typing as t
import warnings
from cachetools import TTLCache, cached
from ...constants.api import MAX_PAGE_SIZE
from ...exceptions import AlreadyExists, ApiError, ApiWarning, NotAllowedError, NotFoundError
from ...parsers.tables import tablize
from ...tools import listify
from .. import json_api
from ..api_endpoints import ApiEndpoints
from ..folders import FoldersEnforcements
from ..json_api.enforcements import (
ActionCategory,
ActionType,
CreateEnforcementModel,
EnforcementBasicModel,
EnforcementDefaults,
EnforcementFullModel,
EnforcementSchedule,
MoveEnforcementsRequestModel,
MoveEnforcementsResponseModel,
UpdateEnforcementResponseModel,
)
from ..json_api.folders.base import FolderDefaults
from ..json_api.folders.enforcements import Folder, FolderModel, FoldersModel
from ..json_api.saved_queries import QueryTypes, SavedQuery
from ..mixins import ModelMixins
from .tasks import Tasks
MULTI_SQ = t.Union[str, dict, SavedQuery]
MULTI_SET = t.Union[
str, dict, EnforcementBasicModel, EnforcementFullModel, UpdateEnforcementResponseModel
]
MULTI_ACTION_TYPE = t.Union[str, dict, ActionType]
CACHE_GET = TTLCache(maxsize=1024, ttl=60)
[docs]class Enforcements(ModelMixins):
"""API working with enforcements.
What's the deal with BASIC vs FULL?
The REST API exposes an endpoint to page through the enforcement sets,
but the details about the actions and triggers configured are limited.
In order to get the full details of the actions and triggers, one call
must be made to fetch the FULL details of each enforcement set by UUID.
To make things more fun, the FULL model is lacking details that are
only provided by the BASIC model:
triggers_last_triggered, triggers_times_triggered, updated_by, last_triggered,
last_updated.
Finally, the "human friendly" description of trigger schedule is only available from BASIC.
We overcome this by getting the FULL object and attaching the BASIC object as FULL.BASIC.
"""
@property
def folders(self) -> FoldersEnforcements:
"""Get the folders api for this object type."""
# noinspection PyUnresolvedReferences
return self.auth.http.CLIENT.folders.enforcements
[docs] def get_set(
self,
value: MULTI_SET,
refetch: bool = True,
cache: t.Optional[t.List[EnforcementFullModel]] = None,
) -> EnforcementFullModel:
"""Get an enforcement set by name or UUID.
Args:
value: enforcement set model or str with name or uuid
refetch: refetch the enforcement set from the API
cache: cache of enforcement sets
Raises:
ApiError: if invalid type supplied for value
NotFoundError: if not found
Returns:
EnforcementFullModel: enforcement set model
"""
if isinstance(value, str):
name = value
uuid = value
elif isinstance(
value,
(EnforcementBasicModel, EnforcementFullModel, UpdateEnforcementResponseModel),
):
if not refetch and isinstance(value, EnforcementFullModel):
return value
name = value.name
uuid = value.uuid
elif isinstance(value, dict) and value.get("name") and value.get("uuid"):
name = value["name"]
uuid = value["uuid"]
else:
raise ApiError(f"Unknown type {type(value)}, must be {MULTI_SET}")
items = []
if (
isinstance(cache, list)
and cache
and all(
[
isinstance(
x,
(
EnforcementBasicModel,
EnforcementFullModel,
UpdateEnforcementResponseModel,
),
)
for x in cache
]
)
):
data = cache
else:
data = self.get_sets_generator(full=True)
for item in data:
if name == item.name or uuid == item.uuid:
return item
items.append(item)
raise NotFoundError(
tablize(
value=[x.to_tablize() for x in items],
err=f"Enforcement Set with name of {name!r} or UUID of {uuid!r} not found",
)
)
[docs] @cached(cache=CACHE_GET)
def get_sets_cached(
self, **kwargs
) -> t.List[t.Union[EnforcementBasicModel, EnforcementFullModel]]:
"""Get all enforcements cached."""
return list(self.get_sets_generator(**kwargs))
[docs] def get_sets(
self, generator: bool = False, full: bool = True
) -> t.Union[
t.Generator[t.Union[EnforcementBasicModel, EnforcementFullModel], None, None],
t.List[t.Union[EnforcementBasicModel, EnforcementFullModel]],
]:
"""Get all enforcement sets.
Args:
generator: return an iterator for objects
full: get the full model of each enforcement set
"""
gen = self.get_sets_generator(full=full)
return gen if generator else list(gen)
[docs] def get_sets_generator(
self, full: bool = True
) -> t.Generator[t.Union[EnforcementBasicModel, EnforcementFullModel], None, None]:
"""Get all enforcement sets using a generator.
Args:
full: get the full model of each enforcement set
Yields:
t.Generator[t.Union[EnforcementBasicModel, EnforcementFullModel], None, None]: Generator
"""
offset = 0
while True:
rows = self._get_sets(offset=offset)
offset += len(rows)
if not rows:
break
for row in rows:
yield row.get_full() if full else row
[docs] def check_set_exists(self, value: MULTI_SET):
"""Check if an enforcement set already exists.
Args:
value: enforcement set model or str with name or uuid
Raises:
ApiError: if enforcement set already exists
"""
try:
existing = self.get_set(value=value)
except NotFoundError:
return
exc = AlreadyExists(f"enforcement set matching {value!r} already exists:\n{existing}")
exc.obj = existing
raise exc
[docs] def get_action_type(self, value: MULTI_ACTION_TYPE) -> ActionType:
"""Get an action type.
Args:
value: action type model or str with name
Returns:
ActionType: action type model
Raises:
ApiError: if value is incorrect type
NotFoundError: if not found
"""
if isinstance(value, str):
name = value
elif isinstance(value, dict) and value.get("id"):
name = value["id"]
elif isinstance(value, ActionType):
name = value.name
else:
raise ApiError(f"Unknown type {type(value)}, must be str, dict, or {ActionType}")
items = self.get_action_types()
for item in items:
if item.name == name:
return item
raise NotFoundError(
tablize(
value=[x.to_tablize() for x in items],
err=f"Action Type with name of {name!r} not found",
)
)
[docs] def get_action_types(self) -> t.List[ActionType]:
"""Get all action types.
Returns:
t.List[ActionType]: action type models
"""
return self._get_action_types()
[docs] def copy(
self,
value: MULTI_SET,
name: str,
copy_triggers: bool = True,
folder: t.Optional[t.Union[str, Folder]] = None,
create: bool = FolderDefaults.create_action,
echo: bool = FolderDefaults.echo_action,
) -> EnforcementFullModel:
"""Copy an enforcement set.
Args:
value: enforcement set model or str with name or uuid
name: name to assign to copy
copy_triggers: copy triggers to new set
folder: folder to put object in
create: if folder supplied does not exist, create it
echo: echo output to console during create/etc
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
root: FoldersModel = self.folders.get()
folder: FolderModel = root.resolve_folder(folder=folder, create=create, echo=echo)
created = self._copy(uuid=existing.uuid, name=name, clone_triggers=copy_triggers)
# NEAT: can not move to/from public /private in enforcements
if folder.depth > 1 and folder.id != created.folder.id:
self._move_sets(folder_id=folder.id, enforcements_ids=created.uuid)
return self.get_set(value=created)
[docs] def update_folder(
self,
value: MULTI_SET,
folder: t.Union[str, FolderModel],
create: bool = FolderDefaults.create_action,
echo: bool = FolderDefaults.echo_action,
) -> EnforcementFullModel:
"""Update the name of an enforcement set.
Args:
value: enforcement set model or str with name or uuid
folder: folder to put object in
create: if folder supplied does not exist, create it
echo: echo output to console during create/etc
Returns:
EnforcementFullModel: updated enforcement set
"""
existing: EnforcementFullModel = self.get_set(value=value)
updated_obj: EnforcementFullModel = existing.move(
folder=folder,
create=create,
echo=echo,
refresh=False,
)
return updated_obj
[docs] def update_name(self, value: MULTI_SET, name: str) -> EnforcementFullModel:
"""Update the name of an enforcement set.
Args:
value: enforcement set model or str with name or uuid
name: name to update
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
self.check_set_exists(value=name)
existing.name = name
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_description(
self, value: MULTI_SET, description: str, append: bool = False
) -> EnforcementFullModel:
"""Update the description of an enforcement set.
Args:
value: enforcement set model or str with name or uuid
description: description to update
append: append to existing description
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.update_description(value=description, append=append)
return self.get_set(value=existing)
[docs] def update_action_main(
self, value: MULTI_SET, name: str, action_type: str, config: t.Optional[dict] = None
) -> EnforcementFullModel:
"""Update the main action of an enforcement set.
Args:
value: enforcement set model or str with name or uuid
name: name to assign to action
action_type: action type
config: action configuration
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
action = self.get_set_action(name=name, action_type=action_type, config=config)
existing.main_action = action
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_action_add(
self,
value: MULTI_SET,
category: t.Union[ActionCategory, str],
name: str,
action_type: str,
config: t.Optional[dict] = None,
) -> EnforcementFullModel:
"""Add an action to an enforcement set.
Args:
value: enforcement set model or str with name or uuid
category: action category to add action to
name: name of action to add
action_type: action type
config: action configuration
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
action = self.get_set_action(name=name, action_type=action_type, config=config)
existing.add_action(category=category, action=action)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_action_remove(
self, value: MULTI_SET, category: t.Union[ActionCategory, str], name: str
) -> EnforcementFullModel:
"""Remove an action from an enforcement set.
Args:
value: enforcement set model or str with name or uuid
category: action category to remove action from
name: name of action to remove
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.remove_action(category=category, name=name)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_query(
self,
value: MULTI_SET,
query_name: str,
query_type: t.Union[QueryTypes, str] = EnforcementDefaults.query_type,
) -> EnforcementFullModel:
"""Update the query of an enforcement set.
Args:
value: enforcement set model or str with name or uuid
query_name: name of saved query
query_type: type of saved query
Returns:
EnforcementFullModel: updated enforcement set
"""
sq = self.get_trigger_view(query_name=query_name, query_type=query_type)
existing = self.get_set(value=value)
existing.query_update(query_uuid=sq.uuid, query_type=query_type)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_query_remove(self, value: MULTI_SET) -> EnforcementFullModel:
"""Remove the query from an enforcement set.
Args:
value: enforcement set model or str with name or uuid
Returns:
EnforcementFullModel: updated enforcement set
"""
raise NotAllowedError("Enforcement Sets now require a Saved Query to be defined")
[docs] def update_schedule_never(self, value: MULTI_SET) -> EnforcementFullModel:
"""Set the schedule of an enforcement set to never run.
Args:
value: enforcement set model or str with name or uuid
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.set_schedule_never()
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_schedule_discovery(self, value: MULTI_SET) -> EnforcementFullModel:
"""Set the schedule of an enforcement set to run every discovery.
Args:
value: enforcement set model or str with name or uuid
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.set_schedule_discovery()
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_schedule_hourly(self, value: MULTI_SET, recurrence: int) -> EnforcementFullModel:
"""Set the schedule of an enforcement set to run hourly.
Args:
value: enforcement set model or str with name or uuid
recurrence: run schedule every N hours (N = 1-24)
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.set_schedule_hourly(recurrence=recurrence)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_schedule_daily(
self,
value: MULTI_SET,
recurrence: int,
hour: int = EnforcementDefaults.schedule_hour,
minute: int = EnforcementDefaults.schedule_minute,
) -> EnforcementFullModel:
"""Set the schedule of an enforcement set to run daily.
Args:
value: enforcement set model or str with name or uuid
recurrence: run enforcement every N days (N = 1-~)
hour: hour of day to run schedule
minute: minute of hour to run schedule
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.set_schedule_daily(recurrence=recurrence, hour=hour, minute=minute)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_schedule_weekly(
self,
value: MULTI_SET,
recurrence: t.Union[str, t.List[t.Union[str, int]]],
hour: int = EnforcementDefaults.schedule_hour,
minute: int = EnforcementDefaults.schedule_minute,
) -> EnforcementFullModel:
"""Set the schedule of an enforcement set to run weekly.
Args:
value: enforcement set model or str with name or uuid
recurrence: run enforcement on days of week
hour: hour of day to run schedule
minute: minute of hour to run schedule
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.set_schedule_weekly(recurrence=recurrence, hour=hour, minute=minute)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_schedule_monthly(
self,
value: MULTI_SET,
recurrence: t.Union[str, t.List[t.Union[int, str]]],
hour: int = EnforcementDefaults.schedule_hour,
minute: int = EnforcementDefaults.schedule_minute,
) -> EnforcementFullModel:
"""Set the schedule of an enforcement set to run monthly.
Args:
value: enforcement set model or str with name or uuid
recurrence: run enforcement on days of month
hour: hour of day to run schedule
minute: minute of hour to run schedule
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.set_schedule_monthly(recurrence=recurrence, hour=hour, minute=minute)
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_only_new_assets(self, value: MULTI_SET, update: bool) -> EnforcementFullModel:
"""Update enforcement set to only run against newly added assets from last automated run.
Args:
value: enforcement set model or str with name or uuid
update: False=run against all assets each automated run, True=only run against
newly added assets from last automated run
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.only_new_assets = update
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_on_count_increased(self, value: MULTI_SET, update: bool) -> EnforcementFullModel:
"""Update enforcement set to only run if asset count increased from last automated run.
Args:
value: enforcement set model or str with name or uuid
update: False=run regardless if asset count increased, True=only perform
automated run if asset count increased
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.on_count_increased = update
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_on_count_decreased(self, value: MULTI_SET, update: bool) -> EnforcementFullModel:
"""Update enforcement set to only run if asset count decreased from last automated run.
Args:
value: enforcement set model or str with name or uuid
update: False=run regardless if asset count decreased, True=only perform
automated run if asset count decreased
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.on_count_decreased = update
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_on_count_above(
self, value: MULTI_SET, update: t.Optional[int]
) -> EnforcementFullModel:
"""Update enforcement set to only run automatically if asset count is above N.
Args:
value: enforcement set model or str with name or uuid
update: None to always run automatically regardless of asset count,
integer to only run automatically if asset count is above N
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.on_count_above = update
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_on_count_below(
self, value: MULTI_SET, update: t.Optional[int]
) -> EnforcementFullModel:
"""Update enforcement set to only run automatically if asset count is below N.
Args:
value: enforcement set model or str with name or uuid
update: None to always run automatically regardless of asset count,
integer to only run automatically if asset count is below N
Returns:
EnforcementFullModel: updated enforcement set
"""
existing = self.get_set(value=value)
existing.on_count_below = update
updated = self.update_from_model(value=existing)
return self.get_set(value=updated)
[docs] def update_from_model(self, value: EnforcementFullModel) -> EnforcementFullModel:
"""Update an enforcement set from the values in a model.
Args:
value: enforcement set model to update
Returns:
EnforcementFullModel: updated enforcement set
"""
return self._update(
uuid=value.uuid,
name=value.name,
actions=value.actions,
triggers=value.triggers,
folder_id=value.folder_id,
)
# noinspection PyDefaultArgument
[docs] def create(
self,
name: str,
main_action_name: str,
main_action_type: str,
query_name: t.Optional[str] = EnforcementDefaults.query_name,
query_type: str = EnforcementDefaults.query_type,
main_action_config: t.Optional[dict] = None,
description: t.Optional[str] = "",
schedule_type: t.Union[EnforcementSchedule, str] = EnforcementDefaults.schedule_type,
schedule_hour: int = EnforcementDefaults.schedule_hour,
schedule_minute: int = EnforcementDefaults.schedule_minute,
schedule_recurrence: t.Optional[
t.Union[int, t.List[str]]
] = EnforcementDefaults.schedule_recurrence,
only_new_assets: bool = EnforcementDefaults.only_new_assets,
on_count_above: t.Optional[int] = EnforcementDefaults.on_count_above,
on_count_below: t.Optional[int] = EnforcementDefaults.on_count_below,
on_count_increased: bool = EnforcementDefaults.on_count_increased,
on_count_decreased: bool = EnforcementDefaults.on_count_decreased,
folder: t.Optional[t.Union[str, FolderModel]] = None,
create: bool = FolderDefaults.create_action,
echo: bool = FolderDefaults.echo_action,
) -> EnforcementFullModel:
"""Create an enforcement set.
Args:
name: Name to assign to enforcement set
description: Description to assign to enforcement set
query_name: Saved Query name to use for trigger
query_type: Saved Query type
main_action_name: name to assign to main action
main_action_type: action type to use for main action
main_action_config: action config for main action
schedule_type: EnforcementSchedule type
for automation
schedule_hour: Hour of day to use for schedule_type
schedule_minute: Minute of hour to use for schedule_type
schedule_recurrence: recurrence value to use for schedule_type
only_new_assets: only run set against assets added since last run
on_count_above: only run if asset count above N
on_count_below: only run if asset count below N
on_count_increased: only run if asset count increased since last run
on_count_decreased: only run if asset count decreased since last run
folder: folder to create set in
create: create folder if it doesn't exist
echo: echo folder creation
Returns:
EnforcementFullModel: created enforcement set
"""
self.check_set_exists(value=name)
root: FoldersModel = self.folders.get()
folder: FolderModel = root.resolve_folder(folder=folder, create=create, echo=echo)
main = self.get_set_action(
name=main_action_name, action_type=main_action_type, config=main_action_config
)
triggers = []
sq = self.get_trigger_view(query_name=query_name, query_type=query_type)
if sq:
triggers.append(
EnforcementSchedule.get_trigger(
query_uuid=sq.uuid,
query_type=query_type,
schedule_type=schedule_type,
schedule_hour=schedule_hour,
schedule_minute=schedule_minute,
schedule_recurrence=schedule_recurrence,
only_new_assets=only_new_assets,
on_count_above=on_count_above,
on_count_below=on_count_below,
on_count_increased=on_count_increased,
on_count_decreased=on_count_decreased,
)
)
create_response = self._create(
name=name, main=main, triggers=triggers, description=description, folder_id=folder.id
)
# NEAT: can not move to/from public /private in enforcements
if folder.depth > 1 and folder.id != create_response.folder.id:
self._move_sets(folder_id=folder.id, enforcements_ids=create_response.uuid)
created = self.get_set(value=create_response)
return created
[docs] def delete(self, value: MULTI_SET) -> EnforcementFullModel:
"""Delete an enforcement set.
Args:
value: enforcement set model or str with name or uuid
Returns:
EnforcementFullModel: deleted enforcement set
"""
obj = self.get_set(value=value)
self._delete(uuid=obj.uuid)
return obj
[docs] def get_set_action(
self,
name: str,
action_type: MULTI_ACTION_TYPE,
config: t.Optional[dict] = None,
) -> dict:
"""Get the action dictionary needed to add an action to an enforcement set.
Args:
name: name to assign to action
action_type: action type to use
config: action config
Returns:
dict: action dictionary
"""
action_type = self.get_action_type(value=action_type)
config = action_type.check_config(config=config)
msg = """CAUTION:
Enforcement Sets will be forced to the /Drafts folder if:
- Any configuration supplied for any action is invalid
- No Saved Query is supplied for the trigger
Easiest way to learn the correct configuration for an action:
- Create an Enforcement Set using the GUI
- export the Enforcement Set in JSON format using:
axonshell enforcements get -v "enforcement name" -xf json
- Get the 'config' dictionary for the action you want to use from the JSON output
- Use the 'config' dictionary as appropriate 'config' argument for this method
"""
warnings.warn(message=msg, category=ApiWarning)
return EnforcementFullModel.get_action_obj(
name=name, action_type=action_type, config=config
)
[docs] def get_trigger_view(
self,
query_name: t.Optional[MULTI_SQ] = EnforcementDefaults.query_name,
query_type: t.Union[QueryTypes, str] = EnforcementDefaults.query_type,
) -> t.Optional[SavedQuery]:
"""Get the saved query for use in adding a query to an enforcement.
Args:
query_name: Name of Saved Query
query_type: Type of Saved Query
Returns:
SavedQuery: None if query_name is not supplied, otherwise saved query model
"""
if query_name:
query_type = QueryTypes.get_value_by_value(query_type)
return self._triggers_map[query_type].saved_query.get_by_multi(
sq=query_name, as_dataclass=True
)
return None
[docs] def _init(self, **kwargs):
"""Post init method for subclasses to use for extra setup."""
from .. import Devices, Instances, Users, Vulnerabilities
self.api_devices: Devices = Devices(auth=self.auth, **kwargs)
"""API model for cross reference."""
self.api_users: Users = Users(auth=self.auth, **kwargs)
"""API model for cross reference."""
self.api_vulnerabilities: Vulnerabilities = Vulnerabilities(auth=self.auth, **kwargs)
"""API model for cross reference."""
self.api_instances: Instances = Instances(auth=self.auth, **kwargs)
"""API model for cross reference."""
self.tasks: Tasks = Tasks(auth=self.auth, **kwargs)
"""API model for working with tasks for enforcements."""
@property
def _triggers_map(self) -> dict:
"""Map of query types to api objects."""
return {
QueryTypes.devices.value: self.api_devices,
QueryTypes.users.value: self.api_users,
QueryTypes.vulnerabilities.value: self.api_vulnerabilities,
}
[docs] def _delete(self, uuid: str) -> json_api.generic.Deleted:
"""Delete an enforcement set by UUID.
Args:
uuid: UUID of set to delete
Returns:
json_api.generic.Deleted: deleted model
"""
api_endpoint = ApiEndpoints.enforcements.delete_set
request_obj = api_endpoint.load_request(value={"ids": [uuid], "include": True})
response = api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
self.get_sets_cached.cache_clear()
return response
[docs] def _copy(self, uuid: str, name: str, clone_triggers: bool) -> EnforcementFullModel:
"""Copy an enforcement set.
Args:
uuid: UUID of set to copy
name: name to give copy
clone_triggers: copy triggers into new set
Returns:
EnforcementFullModel: copied set
"""
api_endpoint = ApiEndpoints.enforcements.copy_set
request_obj = api_endpoint.load_request(
id=uuid, uuid=uuid, name=name, clone_triggers=clone_triggers
)
response = api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
self.get_sets_cached.cache_clear()
return response
[docs] def _update_description(self, uuid: str, description: str) -> None:
"""Update the description of an enforcement set.
Args:
uuid: UUID of set to update
description: description to set
"""
api_endpoint = ApiEndpoints.enforcements.update_description
request_obj = api_endpoint.load_request(description=description)
response = api_endpoint.perform_request(
http=self.auth.http, request_obj=request_obj, uuid=uuid
)
self.get_sets_cached.cache_clear()
return response
[docs] def _update(
self,
uuid: str,
name: str,
actions: dict,
folder_id: str = "",
triggers: t.Optional[t.List[dict]] = None,
) -> EnforcementFullModel:
"""Update an enforcement set.
Args:
uuid: UUID of set to update
name: name of set
actions: actions of set
triggers: triggers of set
Returns:
EnforcementFullModel: updated set
"""
api_endpoint = ApiEndpoints.enforcements.update_set
request_obj = api_endpoint.load_request(
uuid=uuid,
id=uuid,
name=name,
actions=actions,
triggers=triggers or [],
folder_id=folder_id,
)
response = api_endpoint.perform_request(
http=self.auth.http, request_obj=request_obj, uuid=uuid
)
self.get_sets_cached.cache_clear()
return response
[docs] def _create_from_model(self, request_obj: CreateEnforcementModel) -> EnforcementFullModel:
"""Pass."""
api_endpoint = ApiEndpoints.enforcements.create_set
response: EnforcementFullModel = api_endpoint.perform_request(
http=self.auth.http, request_obj=request_obj
)
self.get_sets_cached.cache_clear()
return response
[docs] def _create(
self,
name: str,
main: dict,
folder_id: str = "",
description: str = "",
success: t.Optional[t.List[dict]] = None,
failure: t.Optional[t.List[dict]] = None,
post: t.Optional[t.List[dict]] = None,
triggers: t.Optional[t.List[dict]] = None,
) -> EnforcementFullModel:
"""Create an enforcement set.
Args:
name: name of enforcement to create
main: main action
success: success actions
failure: failure actions
post: post actions
triggers: saved query trigger
Returns:
EnforcementFullModel: created set
"""
actions = {
"main": main,
"success": success or [],
"failure": failure or [],
"post": post or [],
}
api_endpoint = ApiEndpoints.enforcements.create_set
request_obj = api_endpoint.load_request(
name=name,
actions=actions,
triggers=triggers or [],
description=description,
folder_id=folder_id,
)
response = api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
self.get_sets_cached.cache_clear()
return response
# noinspection PyShadowingBuiltins
[docs] def _get_sets(
self,
limit: int = MAX_PAGE_SIZE,
offset: int = 0,
sort: t.Optional[str] = None,
filter: t.Optional[str] = None,
search: str = "",
) -> t.List[EnforcementBasicModel]:
"""Get enforcement sets in basic model.
Args:
limit: limit to N rows per page
offset: start at row N
sort: sort based on a model attribute
filter: AQL filter
search: search string
Returns:
t.List[EnforcementBasicModel]: basic models
"""
api_endpoint = ApiEndpoints.enforcements.get_sets
request_obj = api_endpoint.load_request(
page={"limit": limit, "offset": offset}, filter=filter, search=search, sort=sort
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _get_set(self, uuid: str) -> EnforcementFullModel:
"""Get an enforcement set in full model.
Args:
uuid: UUID of set to get
Returns:
EnforcementFullModel: full model
"""
api_endpoint = ApiEndpoints.enforcements.get_set
return api_endpoint.perform_request(http=self.auth.http, uuid=uuid)
[docs] def _get_action_types(self) -> t.List[ActionType]:
"""Get all action types.
Returns:
t.List[ActionType]: action type models
"""
api_endpoint = ApiEndpoints.enforcements.get_action_types
return api_endpoint.perform_request(http=self.auth.http)
[docs] def _run_set_against_trigger(
self, uuid: str, ec_page_run: bool = False, use_conditions: bool = False
) -> json_api.generic.Name:
"""Run an enforcement set against its trigger.
Args:
uuid: UUID of enforcement set to trigger
ec_page_run: this was triggered from the EC Page in the GUI
use_conditions: use conditions configured on enforcement set to determine execution
"""
api_endpoint = ApiEndpoints.enforcements.run_set_against_trigger
request_obj = api_endpoint.load_request(
ec_page_run=ec_page_run, use_conditions=use_conditions
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj, uuid=uuid)
[docs] def _move_sets(
self, folder_id: str, enforcements_ids: t.Union[str, t.List[str]]
) -> MoveEnforcementsResponseModel:
"""Move enforcements to a folder.
Args:
folder_id: ID of folder to move enforcements to
enforcements_ids: list of uuids to move
"""
api_endpoint = ApiEndpoints.enforcements.move_sets
request_obj: MoveEnforcementsRequestModel = api_endpoint.load_request(
folder_id=folder_id, enforcements_ids=listify(enforcements_ids)
)
response: MoveEnforcementsResponseModel = api_endpoint.perform_request(
http=self.auth.http, request_obj=request_obj
)
return response
[docs] def _run_sets_against_trigger(
self, uuids: t.List[str], include: bool = True, use_conditions: bool = False
) -> json_api.generic.ListDictValue:
"""Run enforcement sets against their triggers.
Args:
uuids: UUIDs of enforcement sets to trigger
include: select UUIDs in DB or UUIDs NOT in DB
use_conditions: use conditions configured on enforcement set to determine execution
"""
api_endpoint = ApiEndpoints.enforcements.run_sets_against_trigger
value = {"ids": listify(uuids), "include": include}
request_obj = api_endpoint.load_request(
value=value,
include=include,
use_conditions=use_conditions,
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def run(
self, values: t.List[str], use_conditions: bool = False, error: bool = True
) -> t.List[EnforcementFullModel]:
"""Run enforcement sets against their triggers.
Args:
values: names or UUIDs of enforcement sets to trigger
use_conditions: use conditions
configured on enforcement set to determine execution
error: throw error if an enforcement set has no trigger
"""
cache = self.get_sets()
items = [self.get_set(value=x, cache=cache) for x in listify(values)]
to_run = []
for item in items:
if item.check_trigger_exists(msg="run enforcement set", error=error):
to_run.append(item)
if not to_run:
raise ApiError(f"No enforcement sets with triggers found from values: {values!r}")
uuids = [x.uuid for x in to_run]
if len(uuids) == 1:
result = self._run_set_against_trigger(uuid=uuids[0], use_conditions=use_conditions)
else:
result = self._run_sets_against_trigger(uuids=uuids, use_conditions=use_conditions)
self.LOG.info(
f"Ran enforcement sets uuids {uuids} with use_conditions={use_conditions}\n"
f"Result: {result}"
)
return to_run