Source code for tenable.ie.ad_object.api

'''
AD Object
=============

Methods described in this section relate to the ad object API.
These methods can be accessed at ``TenableIE.ad_object``.

.. rst-class:: hide-signature
.. autoclass:: ADObjectAPI
    :members:
'''
from typing import List, Dict, Mapping
from restfly.utils import dict_clean
from tenable.ie.ad_object.schema import ADObjectSchema, ADObjectChangesSchema
from tenable.ie.base.iterator import ADIterator
from tenable.base.endpoint import APIEndpoint


class ADObjectIterator(ADIterator):
    '''
    The ad object iterator provides a scalable way to work through alert list
    result sets of any size. The iterator will walk through each page of data,
    returning one record at a time. If it reaches the end of a page of
    records, then it will request the next page of information and then
    continue to return records from the next page (and the next, and the next)
    until the counter reaches the total number of records that the API has
    reported.
    '''


[docs] class ADObjectAPI(APIEndpoint): _schema = ADObjectSchema()
[docs] def details(self, directory_id: str, infrastructure_id: str, ad_object_id: str ) -> Dict: ''' Retrieves the details of a specific AD object. Args: directory_id (str): The directory instance identifier. infrastructure_id (str): The infrastructure instance identifier. ad_object_id (str): The AD Object identifier. Returns: dict: The AD object. Examples: >>> tie.ad_object.details( ... directory_id='1', ... infrastructure_id='1', ... ad_object_id='1' ... ) ''' return self._schema.load( self._api.get(f"infrastructures/{infrastructure_id}/" f"directories/{directory_id}/" f"ad-objects/{ad_object_id}"))
[docs] def details_by_profile_and_checker(self, profile_id: str, checker_id: str, ad_object_id: str ) -> Dict: ''' Retrieves an AD object details by id that have deviances for a specific profile and checker Args: profile_id (str): The profile instance identifier. checker_id (str): The checker instance identifier. ad_object_id (str): The AD Object identifier. Returns: dict: The AD object. Examples: >>> tie.ad_object.details_by_profile_and_checker( ... profile_id='1', ... checker_id='1', ... ad_object_id='1' ... ) ''' return self._schema.load( self._api.get(f"profiles/{profile_id}/" f"checkers/{checker_id}/" f"ad-objects/{ad_object_id}"))
[docs] def details_by_event(self, directory_id: str, infrastructure_id: str, ad_object_id: str, event_id: str ) -> Dict: ''' Retrieves the details of a specific AD object. Args: directory_id (str): The directory instance identifier. infrastructure_id (str): The infrastructure instance identifier. ad_object_id (str): The AD Object identifier. event_id (str): The event identifier. Returns: dict: The AD object. Examples: >>> tie.ad_object.details_by_event( ... directory_id='1', ... infrastructure_id='1', ... ad_object_id='1', ... event_id='1' ... ) ''' return self._schema.load( self._api.get(f"infrastructures/{infrastructure_id}/" f"directories/{directory_id}/" f"events/{event_id}/" f"ad-objects/{ad_object_id}"))
[docs] def get_changes(self, directory_id: str, infrastructure_id: str, ad_object_id: str, event_id: str, **kwargs ) -> List[Dict]: ''' Get the AD object changes between a given event and event which precedes it. Args: directory_id (str): The directory instance identifier. infrastructure_id (str): The infrastructure instance identifier. ad_object_id (str): The AD Object identifier. event_id (str): The event identifier. wanted_values (optional, list[str]): Which values user wants to include. ``before`` to include the values just before the event, ``after`` to include the values just after the event or ``current`` to include the current values. Returns: list[dict]: The list of AD objects. Examples: >>> tie.ad_object.get_changes( ... directory_id='1', ... infrastructure_id='1', ... ad_object_id='1', ... event_id='1', ... wanted_values=['current', 'after'] ... ) ''' schema = ADObjectChangesSchema() params = self._schema.dump(self._schema.load(kwargs)) return schema.load( self._api.get(f"infrastructures/{infrastructure_id}/" f"directories/{directory_id}/" f"events/{event_id}/" f"ad-objects/{ad_object_id}/changes", params=params), many=True)
[docs] def search_all(self, profile_id: str, checker_id: str, expression: Mapping, directories: List[int], reasons: List[int], show_ignored: bool, **kwargs ) -> ADObjectIterator: ''' Search all AD objects having deviances by profile by checker Args: profile_id (str): The profile instance identifier. checker_id (str): The checker instance identifier. expression (mapping): An object describing a filter for searched items. directories (list[int]): The list of directory instance identifiers. reasons (list[int]): The list of reasons identifiers. show_ignored (bool): Whether AD Object that only have ignored deviances should be included? date_start (optional, str): The date after which the AD object deviances should have been emitted. date_end (optional, str): The date before which the AD object deviances should have been emitted. page (optional, int): The page number user wants to retrieve. per_page (optional, int): The number of records per page user wants to retrieve. max_items (optional, int): The maximum number of records to return before stopping iteration. max_pages (optional, int): The maximum number of pages to request before throwing stopping iteration. Returns: :obj:`ADObjectIterator`: An iterator that handles the page management of the requested records. Examples: >>> for ado in tie.ad_object.search_all( ... profile_id='1', ... checker_id='1', ... show_ignored=False, ... reasons=[1, 2], ... directories=[1], ... expression={'OR': [{ ... 'whencreated': '2021-07-29T12:27:50.0000000Z' ... }]}, ... date_end='2022-12-31T18:30:00.000Z', ... date_start='2021-12-31T18:30:00.000Z', ... page=1, ... per_page=20, ... max_pages=10, ... max_items=200 ... ): ... pprint(ado) ''' params = self._schema.dump(self._schema.load({ 'page': kwargs.get('page') or 1, 'perPage': kwargs.get('per_page'), 'maxItems': kwargs.get('max_items'), 'maxPages': kwargs.get('max_pages') })) payload = self._schema.dump(self._schema.load( dict_clean({ 'expression': expression, 'directories': directories, 'reasons': reasons, 'dateStart': kwargs.get('date_start'), 'dateEnd': kwargs.get('date_end'), 'showIgnored': show_ignored }) )) return ADObjectIterator( api=self._api, _path=f'profiles/{profile_id}/' f'checkers/{checker_id}/' f'ad-objects/search', _method='post', num_pages=params.get('page'), _per_page=params.get('perPage'), _query=params, _payload=payload, _schema=self._schema, max_pages=params.pop('maxPages', None), max_items=params.pop('maxItems', None) )