# -*- coding: utf-8 -*-
"""API for working with system users."""
from typing import Generator, List, Optional, Tuple, Union
from ...constants.api import MAX_PAGE_SIZE
from ...exceptions import ApiError, NotFoundError
from ...parsers.tables import tablize_users
from .. import json_api
from ..api_endpoints import ApiEndpoints
from ..mixins import ModelMixins
[docs]class SystemUsers(ModelMixins):
"""API for working with system users."""
[docs] def get(self, generator: bool = False) -> Union[Generator[dict, None, None], List[dict]]:
"""Get Axonius system users.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> users = client.system_users.get()
>>> [x['user_name'] for x in users]
['admin']
Args:
generator: return an iterator for objects that will yield rows as they are fetched
"""
gen = self.get_generator()
return gen if generator else list(gen)
[docs] def get_generator(self) -> Generator[dict, None, None]:
"""Get Axonius system users using a generator."""
system_roles = self.roles.get()
offset = 0
while True:
rows = self._get(offset=offset)
offset += len(rows)
if not rows:
break
for row in rows:
yield row.to_dict_old(system_roles=system_roles)
[docs] def get_by_name(self, name: str) -> dict:
"""Get a user by name.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.get_by_name(name="admin")
>>> user["uuid"]
'5f76721bebd8d8b5459b56c8'
Args:
name: user name
Raises:
:exc:`NotFoundError`: if user not found
"""
users = self.get()
found = [x for x in users if x["user_name"] == name]
if found:
return found[0]
err = f"User with name of {name!r} not found"
raise NotFoundError(tablize_users(users=users, err=err))
[docs] def get_by_uuid(self, uuid: str) -> dict:
"""Get a user by uuid.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.get_by_uuid(uuid="5f76721bebd8d8b5459b56cc")
>>> user["name"]
'administrator'
Args:
uuid: uuid
Raises:
:exc:`NotFoundError`: if user not found
"""
users = self.get()
found = [x for x in users if x["uuid"] == uuid]
if found:
return found[0]
err = f"User with uuid of {uuid!r} not found"
raise NotFoundError(tablize_users(users=users, err=err))
[docs] def add(
self,
name: str,
password: str,
role_name: str,
first: str = "",
last: str = "",
email: str = "",
) -> dict:
"""Create a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
Create a user with the Admin role
>>> user = client.system_users.add(
... name="test1",
... password="test1",
... role_name="Admin",
... first="bad",
... last="wolf",
... email="badwolf@badwolf.com",
... )
>>> user['uuid']
'5f7ca7f1e4557d5cba415f12'
Args:
name: user name
password: password
role_name: role name
first: first name
last: last name
email: email address
Raises:
:exc:`ApiError`: if user already exists matching name
"""
role = self.roles.get_by_name(name=role_name)
try:
self.get_by_name(name=name)
raise ApiError(f"User with name of {name!r} already exists")
except NotFoundError:
pass
self._add(
user_name=name,
role_id=role["uuid"],
password=password,
first_name=first,
last_name=last,
email=email,
)
return self.get_by_name(name=name)
[docs] def set_role(self, name: str, role_name: str) -> dict:
"""Change the role of a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.set_role(name='test1', role_name='Viewer')
>>> user['role_name']
'Viewer'
Args:
name: user name
role_name: name of role
"""
role = self.roles.get_by_name(name=role_name)
return self._update_user_attr(
name=name, must_be_internal=True, attr="role_id", value=role["uuid"]
)
[docs] def set_first_last(self, name: str, first: str, last: str) -> dict:
"""Change the first and last name of a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.set_first_last(name='test1', first='Bad', last='Wolf')
>>> user['first_name']
'Bad'
>>> user['last_name']
'Wolf'
>>> user['full_name']
'Bad Wolf'
Args:
name: user name
first: first name
last: last name
"""
value = {"first_name": first, "last_name": last}
attr = "first_name and last_name"
return self._update_user_attr(name=name, must_be_internal=True, attr=attr, value=value)
[docs] def set_password(self, name: str, password: str) -> dict:
"""Change the password for a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.set_password(name='test1', password='super_secure')
Args:
name: user name
password: password
"""
return self._update_user_attr(
name=name, must_be_internal=True, attr="password", value=password
)
[docs] def set_email(self, name: str, email: str) -> dict:
"""Change the email address for a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.set_email(name='test1', email='bad@wolf.com')
>>> user['email']
'bad@wolf.com'
Args:
name: user name
email: email
"""
return self._update_user_attr(name=name, must_be_internal=True, attr="email", value=email)
[docs] def set_ignore_role_assignment_rules(self, name: str, enabled: bool) -> dict:
"""Set the ignore role assignment rules flag of an external user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> user = client.system_users.set_ignore_role_assignment_rules(
... name='test1', enabled=True
... )
Args:
name: user name
enabled: enable/disable the flag
"""
return self._update_user_attr(
name=name, must_be_internal=False, attr="ignore_role_assignment_rules", value=enabled
)
[docs] def delete_by_name(self, name: str) -> str:
"""Delete a user by name.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> client.system_users.delete_by_name(name='test1')
Args:
name: user name
Raises:
:exc:`ApiError`: if name is "admin"
"""
if name == "admin":
raise ApiError(f"Unable to delete {name!r} user")
user = self.get_by_name(name=name)
return self._delete(uuid=user["uuid"]).document_meta
[docs] def get_password_reset_link(self, name: str) -> str:
"""Generate a password reset link for a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
>>> link = client.system_users.get_password_reset_link(name='test1')
>>> link
'https://10.20.0.61/login?token=LUayznSkfLUjm3A2fIs_zAF-4CzcxRZc7DHOfhOkMRI'
Notes:
This link can be provided to the user and they can browse to it, or they can use
``axonshell tools axonshell tools use-password-reset-token`` or use
:meth:`axonius_api_client.api.system.signup.Signup.use_password_reset_token`
Args:
name: user name
"""
user = self.get_by_name(name=name)
return self._tokens_generate(uuid=user["uuid"], user_name=user["user_name"])
[docs] def email_password_reset_link(
self,
name: str,
email: Optional[str] = None,
for_new_user: bool = False,
) -> Tuple[str, str]:
"""Email a password reset link for a user.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect`
Use the email address defined in the users account
>>> link, email_used = client.system_users.email_password_reset_link(name='test1')
>>> link
'https://10.20.0.61/login?token=V-YW8DnCverKTgvsToMAJwGPqXtQQeeNuBFkN4dE4ew'
>>> email_used
'jim@badwolf.com'
Use a custom email address
>>> link, email_used = client.system_users.email_password_reset_link(
... name='test1', email='james@badwolf.com'
... )
>>> link
'https://10.20.0.61/login?token=V-YW8DnCverKTgvsToMAJwGPqXtQQeeNuBFkN4dE4ew'
>>> email_used
'james@badwolf.com'
Use the template for a new user instead of password reset
>>> link, email_used = client.system_users.email_password_reset_link(
... name='test1', email='james@badwolf.com', for_new_user=True
... )
Notes:
This returns the password reset link like :meth:`get_password_reset_link` and the email
address that was used to send the email.
Args:
name: user name
email: use a custom email address to send the email instead of the users defined email
address. required if the user has no defined email address.
for_new_user: use the new user email template instead of the password reset email
template
"""
user = self.get_by_name(name=name)
email = email or user.get("email")
if not email:
raise ApiError(
f"User {name!r} has no email address defined, must supply a custom email address"
)
link = self._tokens_generate(uuid=user["uuid"], user_name=user["user_name"])
self._tokens_notify(uuid=user["uuid"], email=email, invite=for_new_user)
return link, email
[docs] def _get(
self, limit: int = MAX_PAGE_SIZE, offset: int = 0
) -> List[json_api.system_users.SystemUser]:
"""Direct API method to get all users.
Args:
limit: limit to N rows per page
offset: start at row N
"""
api_endpoint = ApiEndpoints.system_users.get
request_obj = api_endpoint.load_request(page={"limit": limit, "offset": offset})
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _add(
self,
user_name: str,
role_id: str,
password: str = "", # nosec
auto_generated_password: bool = False,
first_name: str = "",
last_name: str = "",
email: str = "",
) -> json_api.system_users.SystemUser:
"""Direct API method to add a user.
Args:
user_name: user name
password: password
role_id: role UUID
first: first name
last: last name
email: email address
auto_generated_password: generate a password
"""
api_endpoint = ApiEndpoints.system_users.create
request_obj = api_endpoint.load_request(
user_name=user_name,
first_name=first_name,
last_name=last_name,
password=password,
email=email,
role_id=role_id,
auto_generated_password=auto_generated_password,
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _delete(self, uuid: str) -> json_api.system_users.SystemUser:
"""Direct API method to delete a user.
Args:
uuid: user UUID
"""
api_endpoint = ApiEndpoints.system_users.delete
request_obj = api_endpoint.load_request(uuid=uuid)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _update(
self,
uuid: str,
user_name: str,
role_id: str,
password: str,
first_name: str,
last_name: str,
email: str,
last_updated: str,
source: str,
pic_name: str,
**kwargs,
) -> json_api.system_users.SystemUser:
"""Direct API method to update a user.
Args:
uuid: user UUID
user: user metadata
"""
api_endpoint = ApiEndpoints.system_users.update
request_obj = api_endpoint.load_request(
user_name=user_name,
first_name=first_name,
last_name=last_name,
password=password,
email=email,
role_id=role_id,
uuid=uuid,
last_updated=last_updated,
source=source,
pic_name=pic_name,
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _tokens_generate(self, uuid: str, user_name: str) -> str:
"""Direct API method to generate a password reset link for a user.
Args:
uuid: user UUID
"""
api_endpoint = ApiEndpoints.password_reset.create
request_obj = api_endpoint.load_request(
user_id=uuid,
user_name=user_name,
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _tokens_notify(self, uuid: str, email: str, invite: bool = False) -> str:
"""Direct API method to send a password reset link to a user.
Args:
uuid: user UUID
email: email address to send link to
invite: use welcome template instead of reset password template in email body
"""
api_endpoint = ApiEndpoints.password_reset.send
request_obj = api_endpoint.load_request(
user_id=uuid,
email=email,
invite=invite,
)
return api_endpoint.perform_request(http=self.auth.http, request_obj=request_obj)
[docs] def _init(self, **kwargs):
"""Post init method for subclasses to use for extra setup."""
from .system_roles import SystemRoles
self.roles: SystemRoles = SystemRoles(auth=self.auth)
"""Work with roles"""
[docs] def _update_user_attr(
self, name: str, must_be_internal: bool, attr: str, value: Union[str, bool, dict]
) -> dict:
"""Set an attribute on a user.
Args:
name: user name
must_be_internal: user must be internal or external for this attr to be set
attr: attribute of user object to set
value: value to set on attribute
Raises:
:exc:`ApiError`:
* if user is external but must be internal for the attribute to be set
* if user is internal but must be external for the attribute to be set
"""
user = self.get_by_name(name=name)
source = user.get("source", "")
name = user["user_name"]
is_internal = source == "internal"
pre = f"Unable to set {attr} for user {name!r} with source {source}"
if isinstance(must_be_internal, bool):
if is_internal and not must_be_internal:
raise ApiError(f"{pre}, must be external user")
if not is_internal and must_be_internal:
raise ApiError(f"{pre}, must be internal user")
if isinstance(value, dict):
user.update(value)
else:
user[attr] = value
self.LOG.debug(f"Updating user {name!r} attribute {attr!r}: {value!r}")
self._update(**user)
return self.get_by_name(name=name)