# -*- coding: utf-8 -*-
"""Parser for AQL queries and GUI expressions from python objects."""
import logging
from typing import List, Optional, Tuple, Union
from ...constants.fields import ALL_NAME, CUSTOM_FIELDS_MAP, Operator, OperatorTypeMaps
from ...constants.logs import LOG_LEVEL_WIZARD
from ...constants.wizards import (
Docs,
Entry,
Expr,
Fields,
Flags,
Patterns,
Results,
Sources,
Templates,
Types,
)
from ...exceptions import WizardError
from ...logs import get_obj_log
from ...parsers.wizards import WizardParser
from ...tools import check_type, listify
[docs]class Wizard:
"""Parser for AQL queries and GUI expressions from python objects.
Examples:
Create a ``client`` using :obj:`axonius_api_client.connect.Connect` and assume
``apiobj`` is either ``client.devices`` or ``client.users``
>>> apiobj = client.devices # or client.users
Define some entries to parse
>>> entries = [
... {
... 'value': 'hostname contains test',
... 'type': 'simple',
... },
... {
... 'value': 'installed_software // name contains chrome // version earlier_than 82',
... 'type': 'complex'
... },
... ]
Parse the entries into a query and GUI expressions
>>> parsed = apiobj.wizard.parse(entries=entries)
>>> list(parsed)
['expressions', 'query']
Get the query produced by the wizard
>>> query = parsed["query"]
>>> query[:80]
'(specific_data.data.hostname == regex("test", "i")) and (specific_data.data.inst'
Get the GUI expressions produced by the wizard
>>> expressions = parsed["expressions"]
>>> expressions[0]['filter']
'(specific_data.data.hostname == regex("test", "i"))'
>>> expressions[1]['filter'][:80]
'and (specific_data.data.installed_software == match([(name == regex("chrome", "i'
Use the query to get assets
>>> assets = apiobj.get(query=query)
>>> len(assets)
2
Use the query to get a count of assets
>>> count = apiobj.count(query=query)
>>> count
2
Use the query and expressions to create a saved query that the GUI understands
>>> sq = apiobj.saved_query.add(name="test", query=query, expressions=expressions)
"""
DOCS: str = Docs.DICT
[docs] def __init__(self, apiobj, log_level: Union[str, int] = LOG_LEVEL_WIZARD):
"""Query wizard builder.
Args:
apiobj (:obj:`axonius_api_client.api.assets.asset_mixin.AssetMixin`): Asset object
log_level: logging level for this object
"""
self.LOG: logging.Logger = get_obj_log(obj=self, level=log_level)
"""Logger for this object."""
self.APIOBJ = apiobj
""":obj:`axonius_api_client.api.assets.asset_mixin.AssetMixin`: Asset object."""
self.PARSER = WizardParser(apiobj=apiobj)
""":obj:`axonius_api_client.parsers.wizards.WizardParser`: Value parser."""
self._init()
[docs] def parse(self, entries: List[dict], source: str = Sources.LOD) -> dict:
"""Parse a list of entries into a query and the associated GUI query wizard expressions.
Args:
entries: list of entries to parse
source: where entries came from
"""
check_type(value=entries, exp=(list, tuple), exp_items=dict)
entries = [x for x in entries if x]
entries = self._parse_entries(entries=entries, source=source)
exprs = self._parse_exprs(entries=entries)
query = Expr.get_query(exprs=exprs)
return {Results.EXPRS: exprs, Results.QUERY: query}
[docs] def _parse_entries(self, entries: List[dict], source: str) -> List[dict]:
"""Parse a list of entries into a query and the associated GUI query wizard expressions.
Args:
entries: list of entries to parse
source: where entries came from
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if an entry fails to be parsed
"""
is_open, tracker = False, 0
for idx, entry in enumerate(entries):
src = f"{source} entry #{idx + 1}/{len(entries)}"
entry[Entry.SRC] = entry.get(Entry.SRC, src)
try:
self._check_entry_keys(entry=entry, keys=Entry.REQ)
etype = entry[Entry.TYPE]
entry[Entry.TYPE] = self._check_entry_type(etype=etype, types=Types.DICT)
entry, is_open, tracker = self._parse_flags(
entry=entry,
idx=idx,
entries=entries,
is_open=is_open,
tracker=tracker,
)
except Exception as exc:
err = f"Error parsing entry from {src}:\n{exc}"
raise WizardError("\n".join([err, self.DOCS, err]))
return entries
[docs] def _parse_flags(
self, entry: dict, idx: int, entries: List[dict], tracker: int, is_open: bool
) -> dict:
"""Parse flags from an entry.
Args:
entry: entry to parse with `Entry.VALUE` key
idx: index of this entry
entries: all entries
tracker: tracker for `Entry.WEIGHT` key
is_open: parenthesis are currently open
"""
value_raw = entry[Entry.VALUE]
flags = listify(entry.get(Entry.FLAGS) or [])
flags, value = self._split_flags(value_raw=value_raw, flags=flags)
entry[Entry.VALUE] = value
entry[Entry.FLAGS] = flags
is_last = len(entries) - 1 == idx
msgs = [
f"parsing flags of entry {idx + 1}/{len(entries)}",
f"flags={entry[Entry.FLAGS]}",
f"is_open={is_open}",
f"tracker={tracker}",
]
if is_open and Flags.LEFTB in entry[Entry.FLAGS]:
prev_idx = idx - 1
msgs += [
f"is_open=True but {Flags.LEFTB} in flags",
f"adding {Flags.RIGHTB} to previous entry flags",
]
entries[prev_idx][Entry.FLAGS].append(Flags.RIGHTB)
if not is_open and Flags.RIGHTB in entry[Entry.FLAGS]:
entry[Entry.FLAGS].append(Flags.LEFTB)
entry[Entry.WEIGHT] = -1
tracker = 0
msgs += [
f"is_open=False but {Flags.RIGHTB} in flags",
f"adding {Flags.LEFTB} to this entries flags",
"set final weight=-1",
"set tracker=0",
]
if is_open:
tracker += 1
entry[Entry.WEIGHT] = tracker
msgs += ["increment tracker", f"set weight={tracker}"]
if not is_open and Flags.LEFTB not in entry[Entry.FLAGS]:
msgs += ["set final weight=0"]
entry[Entry.WEIGHT] = 0
if Flags.LEFTB in entry[Entry.FLAGS]:
msgs += ["set weight=-1", "set tracker=0", "set is_open=True"]
entry[Entry.WEIGHT] = -1
tracker = 0
is_open = True
if Flags.RIGHTB in entry[Entry.FLAGS]:
msgs += ["set is_open=False", "set tracker=0"]
is_open = False
tracker = 0
if is_last and is_open and Flags.RIGHTB not in entry[Entry.FLAGS]:
msgs += [
"last entry but bracket is left open",
"set tracker=0",
f"adding {Flags.RIGHTB} to last entries flags",
]
entry[Entry.FLAGS].append(Flags.RIGHTB)
tracker = 0
msgs.append(f"final weight={entry[Entry.WEIGHT]}")
self.LOG.debug("\n".join(msgs))
return entry, is_open, tracker
[docs] def _parse_exprs(self, entries: List[dict]) -> List[dict]:
"""Parse a list of entries into GUI expressions.
Args:
entries: list of entries to parse
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if an entry fails to be parsed
"""
exprs = []
for idx, entry in enumerate(entries):
src = f"entry #{idx + 1}/{len(entries)}"
src = entry.get(Entry.SRC, src)
try:
method = getattr(self, f"_parse_{entry[Entry.TYPE]}")
exprs.append(method(entry=entry, idx=idx))
except Exception as exc:
err = f"Error parsing expression from {src}:\n{exc}"
raise WizardError("\n".join([err, self.DOCS, err]))
return exprs
[docs] def _parse_simple(self, entry: dict, idx: int) -> dict:
"""Parse an entry of type simple into GUI expressions.
Args:
entry: entry to parse
idx: index of this entry
"""
value_raw = entry[Entry.VALUE]
field, operator, value = self._split_simple(value_raw=value_raw)
field = self._get_field(value=field, value_raw=value_raw)
operator = self._get_operator(operator=operator, field=field, value_raw=value_raw)
aql_value, expr_value = self.PARSER(
enum=field.get("enum", []),
enum_items=field.get("items", {}).get("enum"),
parser=operator.parser.name,
value=value,
)
query = operator.template.format(field=field[Fields.NAME], aql_value=aql_value)
expr = Expr.build(
entry=entry,
field=field,
idx=idx,
value=expr_value,
op_comp=operator.name_map.op,
field_name_override=operator.field_name_override,
query=query,
)
return expr
[docs] def _parse_complex(self, entry: dict, idx: int) -> dict:
"""Parse an entry of type complex into GUI expressions.
Args:
entry: entry to parse
idx: index of this entry
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if sub expr fails to parse
"""
value_raw = entry[Entry.VALUE]
field, subs_raw = self._split_complex(value_raw=value_raw)
field = self._get_field_complex(value=field, value_raw=value_raw)
sub_exprs = []
for sub_idx, sub_raw in enumerate(subs_raw):
try:
sub_expr = self._parse_sub(field=field, idx=sub_idx, value_raw=sub_raw)
sub_exprs.append(sub_expr)
except Exception as exc:
raise WizardError(f"Error parsing sub field from '{value_raw}'\n{exc}")
sub_queries = Expr.get_subs_query(sub_exprs=sub_exprs)
query = Templates.COMPLEX.format(field=field[Fields.NAME], sub_queries=sub_queries)
expr = Expr.build(
entry=entry,
field=field,
idx=idx,
query=query,
value=None,
op_comp="",
children=sub_exprs,
is_complex=True,
)
return expr
[docs] def _parse_sub(self, field: dict, value_raw: str, idx: int) -> dict:
"""Parse sub expression of an entry of type complex.
Args:
field: complex field schema
value_raw: the value split from the complex filter line
idx: index of this entry
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if sub-field supplied is not a valid sub-field of the complex field
"""
sub_field, operator, sub_value = self._split_simple(value_raw=value_raw)
field_subs = {x[Fields.NAME]: x for x in field[Fields.SUBS]}
if sub_field not in field_subs:
fname = field[Fields.NAME]
valid = ", ".join(list(field_subs))
err = (
f"Unable to find SUB-FIELD named {sub_field!r} of COMPLEX field {fname}"
f" from value '{value_raw}'\n\nValid sub_fields: {valid}"
)
raise WizardError(err)
sub_field = field_subs[sub_field]
operator = self._get_operator(operator=operator, field=sub_field, value_raw=value_raw)
aql_value, expr_value = self.PARSER(
enum=sub_field.get("enum", []),
enum_items=sub_field.get("items", {}).get("enum"),
parser=operator.parser.name,
value=sub_value,
)
query = operator.template.format(field=sub_field[Fields.NAME], aql_value=aql_value)
expr = Expr.build_child(
field=sub_field[Fields.NAME],
idx=idx,
value=expr_value,
op_comp=operator.name_map.op,
query=query,
)
return expr
[docs] def _split_flags(
self, value_raw: str, flags: Optional[List[str]] = None
) -> Tuple[List[str], str]:
"""Parse an expression and get the flags from the beginning and end.
Args:
value_raw: the value to get the flags from
flags: flags from the entry
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if value is empty after removing flags
"""
value = value_raw
flags = flags or []
pattern = Patterns.FLAGS.pattern
match = Patterns.FLAGS.search(value_raw)
self.LOG.debug(f"Value {value_raw!r} regex match {match} using pattern {pattern}")
check_right = [f" {Flags.RIGHTB}", Flags.RIGHTB]
if match:
groups = match.groupdict()
self.LOG.debug(f"Parsed value {value_raw!r} into {groups}")
if not groups.get("value"):
raise WizardError(f"Empty value after parsing {value_raw!r} into {groups}")
value = groups.get("value")
flags += [x.strip() for x in list(groups.get("flags", []) or []) if x.strip()]
for check in check_right:
if value.endswith(check):
plen = len(check)
value = value[:-plen]
flags.append(Flags.RIGHTB)
break
return flags, value
[docs] def _split_simple(self, value_raw: str) -> Tuple[str, str, str]:
"""Split a simple query wizard expression into field, operator, and value.
Args:
value_raw: the raw unparsed value to parse
"""
splitter = Entry.SPLIT
split = value_raw.split(splitter, maxsplit=2)
field = ""
operator = ""
value = ""
if split:
field = split.pop(0).strip()
self.LOG.debug(f"Got field {field!r} from {split} from '{value_raw}'")
if split:
operator = split.pop(0).lower().strip()
self.LOG.debug(f"Got operator {operator!r} from {split} from '{value_raw}'")
if split:
value = split.pop(0).lstrip()
self.LOG.debug(f"Got value {value!r} from {split} from '{value_raw}'")
self._check_patterns(
value_raw=value_raw,
value=field,
src="FIELD",
patterns=Patterns.FIELD,
)
self._check_patterns(
value_raw=value_raw,
value=operator,
src="OPERATOR",
patterns=Patterns.OP,
)
return field, operator, value
[docs] def _split_complex(self, value_raw: str) -> Tuple[str, List[str]]:
"""Split a complex query wizard expression into field and sub-field expressions.
Args:
value_raw: the raw unparsed value to parse
"""
splitter = Entry.CSPLIT
if splitter not in value_raw:
raise WizardError(f"No {splitter} found in value '{value_raw}'")
split = value_raw.split(splitter)
field = ""
subs_raw = []
if split:
field = split.pop(0).strip()
self.LOG.debug(f"Got complex field {field!r} from {split} from '{value_raw}'")
if split:
subs_raw = [x.lstrip() for x in split if x.lstrip()]
self.LOG.debug(f"Got sub fields {subs_raw!r} from {split} from '{value_raw}'")
self._check_patterns(
value_raw=value_raw,
value=field,
src="FIELD",
patterns=Patterns.FIELD,
)
self._check_patterns(value_raw=value_raw, value=subs_raw, src="SUB-FIELD(s)", patterns=[])
return field, subs_raw
[docs] def _get_operator(self, operator: str, field: dict, value_raw: str) -> Operator:
"""Validate the supplied operator for an expression for the type of field.
Args:
operator: operator supplied by user
field: field schema to check that operator is valid for
value_raw: raw unparsed value where operator and field came from
"""
err = f"Invalid OPERATOR name {operator!r} from value '{value_raw}'\n\n"
return OperatorTypeMaps.get_operator(operator=operator, field=field, err=err)
[docs] def _get_field(self, value: str, value_raw: str) -> dict:
"""Find a field schema for the supplied field name.
Args:
value: name of field to find schema for
value_raw: raw unparsed value where field name came from
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if field can not be found or is "all" field
"""
try:
field = self.APIOBJ.fields.get_field_name(
value=value,
key=None,
fields_custom=CUSTOM_FIELDS_MAP,
)
except Exception as exc:
msg = f"Unable to find FIELD named {value!r} from value '{value_raw}'"
raise WizardError(f"{msg}\n\n{exc}\n\n{msg}")
if field[Fields.IS_ALL]:
name = field[Fields.NAME]
raise WizardError(f"Can not use {name!r} field in queries")
return field
[docs] def _get_field_complex(self, value: str, value_raw: str) -> dict:
"""Find a field schema for the supplied complex field name.
Args:
value: name of complex field to find schema for
value_raw: raw unparsed value where complex field name came from
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if field can not be found, is not a complex field, or is "all" field
"""
try:
field = self.APIOBJ.fields.get_field_name(
value=value,
key=None,
fields_custom=CUSTOM_FIELDS_MAP,
)
except Exception as exc:
msg = f"Unable to find COMPLEX-FIELD named {value!r} from value '{value_raw}'"
raise WizardError(f"{msg}\n\n{exc}\n\n{msg}")
if field[Fields.NAME] == ALL_NAME:
raise WizardError(f"Can not use {ALL_NAME!r} field in queries")
if not field[Fields.IS_COMPLEX]:
aname = field[Fields.ANAME]
fname = field[Fields.NAME]
afields = self.APIOBJ.fields.get()[aname]
schemas = [
x
for x in afields
if x[Fields.IS_COMPLEX] and not x[Fields.IS_ALL] and not x[Fields.IS_DETAILS]
]
msg = [
f"Invalid COMPLEX-FIELD {fname!r} for adapter {aname!r}, valids:",
*self.APIOBJ.fields._prettify_schemas(schemas=schemas),
]
raise WizardError("\n".join(msg))
return field
[docs] def _check_entry_type(self, etype: str, types: List[str]) -> str:
"""Check that a supplied entry type is valid.
Args:
etype: entry type to check
types: valid entry types to check etype against
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if etype is not on of types
"""
etype = etype.lower().strip()
if etype not in types:
valid = ", ".join(types)
raise WizardError(f"Invalid type {etype!r}, valid types: {valid}")
return etype
[docs] def _check_entry_keys(self, entry: dict, keys: List[str]) -> dict:
"""Check that an entry has the required keys.
Args:
entry: entry to check keys against
keys: list of keys that entry must have
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if key is not in entry, value is empty, or value is not a string
"""
for key in keys:
if key not in entry:
found = ", ".join(list(entry))
raise WizardError(f"Missing required key {key!r}, found keys: {found}")
value = entry[key]
if not value:
raise WizardError(f"Empty required key {key!r} with value {value}")
if not isinstance(value, str):
vtype = type(value).__name__
raise WizardError(
f"Invalid type {vtype} for required key {key!r} with value {value}"
)
return entry
[docs] def _check_patterns(self, value_raw: str, value: str, src: str, patterns: List[str]):
"""Check that a value matches a list of regex patterns.
Args:
value_raw: raw unparsed value where value came from
value: value to check patterns against
src: identifier of what value represents
patterns: list of regex patterns to check value against
Raises:
:exc:`axonius_api_client.exceptions.WizardError`:
if value is empty or does not match one of the patterns
"""
if not value:
raise WizardError(f"Empty required {src} as {value!r} from value '{value_raw}'")
for check in patterns:
match = check.search(value)
if not match:
continue
chars = "".join(list(match.groups()))
raise WizardError(
f"Using regex: {check.pattern}\n"
f"Found invalid characters '{chars}' in {src} from value '{value}' "
f"from '{value_raw}'"
)
[docs] def _init(self):
"""Post init setup."""
pass