Source code for cvpConfigletUploader

#!/usr/local/bin/python

import sys
import argparse
import json
import logging
import os
import time
from datetime import datetime
from datetime import timedelta
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError
from cvprac.cvp_client_errors import CvpApiError
import requests
# Import to optionnaly disable HTTPS cert validation
from requests.packages.urllib3.exceptions import InsecureRequestWarning

r"""Script to manage configlet on a CVP server.

It provides mechanismes to create / update / delete configlet
through CVP APIs. It is based on cvprack module with its API
method to manage CVP.
Idea is to make a generic script to enhance reuse factor and
provides a generic tools to implement automation workflow

CLI Examples
------------
- Short Update method::

    $ python cvp-configlet-uploader.py -c configlet.examples/VLANs

- Generic method using JSON inputs::

    $ python cvp-configlet-uploader.py -j actions.json

Python Examples
---------------
>>> import cvprack
>>> client = CvpClient()
>>> try:
>>>     client.connect([CVP_HOST], CVP_USER, CVP_PASS, 10, CVP_PROTO,
    CVP_PORT)
>>>     logging.info('Connected to %s', CVP_HOST )
>>> except CvpLoginError, e :
>>>     # If error, then, printout message and quit program
>>>     # If server cannot be reached, then no need to go further
>>>     logging.error('Can\'t connect to %s', CVP_HOST )
>>>     logging.error('Error message is: %s',str(e).replace('\n',' '))
>>>     quit()
>>> logging.info('- Starting working with configlet.examples/VLANs')
>>> my_configlet = CvpConfiglet(cvp_server=client,
        configlet_file='configlet.examples/VLANs')
>>> logging.info('Start to deploy new version of VLANs configlet')
>>> my_configlet.update_configlet()
>>> my_configlet.deploy_bulk(at=None)

WARNING
-------

Due to a change in CVP API, change-control needs to get snapshot referenced per
task. Current version of ``cvprack`` does not support it in version 1.0.1

Fix is available in develop version. To install development version, use pip::

    $ pip install git+https://github.com/aristanetworks/cvprac.git@develop

Attributes:
-----------
    ``CERT_VALIDATION``: bool
        Describe if we have to validate or not server certificate

    ``CVP_HOST``: str
        Hostname / IP address of CVP server

    ``CVP_PASS``: str
        Password to connect to CVP

    ``CVP_PORT``: int
        Port to use for connection

    ``CVP_PROTO``: str
        HTTP or HTTPS transport layer

    ``CVP_USER``: str
        Username to use for CVP connection

    ``CVP_TZ``: str
        Time-zone to use to configure change-control

    ``CVP_COUNTRY``: str
        Country to use to schedule change-control task

    ``LOG_LEVEL``: str
        Log level for getting information on stdout
        (DEBUG / INFO / WARNING / ERROR)

    ``JSON_CONFIG``: str
        PATH to default JSON configuration file

Todo
----
    Following features are planned to be implemented in coming versions:

    - Implement method to ADD/REMOVE device to configlet.
    - Fallback to update if configlet exists.

"""

# Code version
__version__ = '0.9.4'

# Author information
__author__ = 'Thomas Grimonet'
__email__ = 'tgrimonet@arista.com'

# LOG_LEVEL definition (DEBUG / INFO / WARNING / ERROR)
LOG_LEVEL = logging.DEBUG

# Default JSON file to get list of actions
JSON_CONFIG = 'actions.json'

###############################################################################


[docs]class CvpInventory(object): """CVP Inventory Class. Get complete inventory from CVP and expose some functions to get data. It is RO only and nothing is pushed to CVP with this object. """
[docs] def __init__(self, cvp_server): """Class Constructor. Instantiate an Inventory with a REST call to get device list Parameters ---------- cvp_server : cvprack.CvpClient() Your CVP Rack server """ self._cvp_server = cvp_server self._inventory = self._cvp_server.api.get_inventory()
[docs] def get_device_dict(self, name): """Get information for a give device. Parameters ---------- ``name``: str Hostname to lookup Returns ------- dict: Complete dictionnary sent by CVP """ for device in self._inventory: if device['hostname'] == name: return device return None
[docs] def get_devices(self): """Give a dict of all devices. Returns ------- dict: All devices attached to CVP inventory """ return self._inventory
###############################################################################
[docs]class CvpConfiglet(object): """Configlet class to provide generic method to manage CVP configlet. **Data Structure** Configlet structure is a name based dictionnary with following keys: - ``name``: Name of configlet. This name is built from filename - ``file``: Complete path of the local configlet file - ``content``: Local Configlet content read from ``configlet['file']`` - ``key``: Key ID defined by CVP to identify configlet. it is found by our instance during update, addition or deletion - ``devices``: List of devices structure compliant with ``CvpApi.get_device_by_name()`` It can be found by using ``CvpInventory`` object. **List of attributes:** Attributes ---------- _cvp_server cvprac.CvpClient() object to manage CVP connection _devices_configlet List of devices attached to configlet _configlet Dictionary with all configlet information: ``name``, ``file``, ``content``, ``key``, ``devices`` _cvp_found Boolean to get status of configlet on CVP: True if configlet is on server, False other cases **List of Available methods:** Methods ------- get_devices() Get list of devices for this specific configlet update_configlet() Start update process for that configlet. Do not deploy content to devices deploy_configlet() Start configlet creation process. Do not deploy content to devices delete_configlet() Start configlet deletion process. Do not deploy content to devices deploy() Deploy (add/update) change to a single device deploy_bulk() Deploy (add/update) change to all devices on_cvp() Inform about configlet available on CVP Note ---- This class use call to ``cvprac`` to get and push data to CVP server. """
[docs] def __init__(self, cvp_server, configlet_file=None, configlet_name=None): r"""Class Constructor. Parameters ---------- ``cvp_server``: CvpClient CvpClient object from cvprack. Gives methods to manage CVP API ``configlet_file``: str Path to configlet file. """ self._cvp_server = cvp_server # create data storage for configlet # and tasks related to deployment self._configlet_init() self._task_init() self._devices_configlet = list() # Extract information from configlet filename if configlet_file is not None: self._configlet['file'] = configlet_file self._configlet['name'] = os.path.basename(configlet_file) logging.debug('configlet basename is [%s]', self._configlet['name']) if configlet_name is not None: self._configlet['name'] = configlet_name # check if configlet is present on CVP server if self._configlet_lookup(): logging.info('Configlet [%s] found on %s', self._configlet['name'], CVP_HOST) logging.info('Get list of applied devices from server') self.get_devices(refresh=True) self._cvp_found = True else: self._cvp_found = False logging.warning('Configlet NOT found on %s', CVP_HOST)
[docs] def _configlet_lookup(self): """Check if a configlet is already present on CVP. Check if CVP has already a configlet configured with the same name. If yes return True and report key under self._configlet['key'] If no, return False Returns ------- bool Return ``True`` or ``False`` if configlet name is already configured on CVP """ try: result = self._cvp_server.api.get_configlet_by_name(name=self._configlet['name']) # noqa E501 if 'key' in self._configlet: self._configlet['key'] = result['key'] return True except CvpApiError: # noqa E722 return False
[docs] def _configlet_init(self): """Create an empty dict for configlet.""" logging.debug('Init configlet object') self._configlet = dict() self._configlet['name'] = None self._configlet['file'] = None self._configlet['content'] = None self._configlet['key'] = None self._configlet['devices'] = list()
[docs] def _task_init(self): """Create an empty dict for task.""" logging.debug('Init task object') self._task = dict() self._task['id'] = None self._task['run_at'] = None self._task['result'] = None
[docs] def name(self): """Expose name of the configlet. Returns ------- str Name of configlet built by ``__init__`` """ return self._configlet['name']
[docs] def on_cvp(self): """Expose flag about configlet configured on CVP. Return True if configlet is configured on CVP and can be updated. If configlet is not present, then, False Returns ------- bool True if configlet already configured on CVP, False otherwise """ return self._cvp_found
[docs] def _retireve_devices(self): r"""Get list of devices attached to the configlet. If configlet exists, then, retrieve a complete list of devices attached to it. Returns ------- list List of devices from CVP """ self._devices_configlet = list() inventory = CvpInventory(self._cvp_server).get_devices() logging.info('Start looking for devices attached to [%s]', self._configlet['name']) for device in inventory: if 'systemMacAddress' in device: try: configlet_list = self._cvp_server.api.get_configlets_by_device_id(mac=device['systemMacAddress']) # noqa E501 except CvpLoginError, e: logging.error('Error when getting list of configlet for device %s: %s', device['hostname'], str(e).replace('\n', ' ')) # noqa E501 quit() for configlet in configlet_list: logging.debug(' > Found configlet: %s for device [%s]', configlet['name'], device['hostname']) # noqa E501 if configlet['name'] == self._configlet['name']: self._configlet['devices'].append(device) logging.info(' > Configlet [%s] is applied to %s with sysMacAddr %s', self._configlet['name'], device['hostname'], device['systemMacAddress']) # noqa E501 return self._devices_configlet
[docs] def get_configlet_info(self): """To share configlet information. Returns ------- dict dictionnary with configlet information """ return self._configlet
[docs] def get_devices(self, refresh=False): r"""To share list of devices attached to the configlet. If list is empty or if refresh trigger is active, function will get a new list of device from self._retireve_devices() Otherwise, just send back list to the caller Parameters ---------- refresh : bool Update device list from CVP (Optional) Returns ------- list List of devices from CVP """ if refresh or 'devices' not in self._configlet: self._retireve_devices() return self._configlet['devices']
[docs] def update_configlet(self): """Update configlet on CVP with content from object. Check if configlet is configured on CVP server before pushing an update. If configlet is not there, then, stop method execution. Returns ------- str : message from server with result """ logging.info('%s is going to be updated with local config', self._configlet['name']) try: with open(self._configlet['file'], 'r') as content: configlet = content.read() status_deployment = self._cvp_server.api.update_configlet(key=self._configlet['key'], # noqa E501 name=self._configlet['name'], # noqa E501 config=configlet) # noqa E501 logging.warning('Server response: %s', status_deployment['data']) return status_deployment['data'] except IOError: logging.critical('!! File not found: %s', self._configlet['file']) sys.exit()
[docs] def deploy_configlet(self, device_hostnames): """Create configlet on CVP with content from object. Create a new configlet on CVP server and attached it to all devices you provide in your JSON file. Device attachement is managed with a CvpInventory call to get all information from CVP. It means you just have to provide existing hostname in your JSON Each time a device is attached to configlet on CVP, it is also added in CvpConfiglet object for futur use Parameters ---------- devices_hostname : list List of hostname to attached to configlet """ logging.info('Create configlet %s', self._configlet['name']) with open(self._configlet['file'], 'r') as content: self._configlet['content'] = content.read() self._configlet['key'] = self._cvp_server.api.add_configlet(name=self._configlet['name'], # noqa E501 config=self._configlet['content']) # noqa E501 # Use method to attach device to configlet. self.add_device(device_hostnames=device_hostnames)
[docs] def delete_configlet(self): """Delete a configlet from CVP. To protect, function first check if configlet exists, if not, we stop and return to next action out of this function. Remove configlet from all devices where it is configured Then if configlet exist, remove configlet from CVP DB Returns ------- bool ``True`` if able to remove configlet / False otherwise """ logging.info('start to remove %s from CVP', self._configlet['name']) if self._configlet['key'] is None: logging.critical('Configlet not configured. Can\'t remove it') return False impacted_devices = self.get_devices() for device in impacted_devices: logging.info('[%s] - remove configlet %s', device['hostname'], self._configlet['name']) self._cvp_server.api.remove_configlets_from_device(app_name='CVP Configlet Python Manager', dev=device, del_configlets=[self._configlet]) # noqa E501 logging.info('remove %s from CVP', self._configlet['name']) self._cvp_server.api.delete_configlet(name=self._configlet['name'], key=self._configlet['key']) return True
[docs] def add_device(self, device_hostnames): """Remove device(s) from a configlet. Remove device from configlet and create a task on CVP to remove configuration generated by configlet from device. For every hostname defined in devices_hostnames, a lookup is done to get a complete data set for that device and a call to remove device is sent. Warnings -------- This function never send a call to execute task. it is managed by logic out of that object Arguments: devices_hostnames {list} -- List of devices hostname to remove from the configlet. """ cvp_inventory = CvpInventory(cvp_server=self._cvp_server) logging.info('add devices to configlet %s', self._configlet['name']) for host in device_hostnames: eos = cvp_inventory.get_device_dict(name=host) if eos is not None: logging.info('Apply configlet %s to %s', self._configlet['name'], eos['hostname']) self._configlet['devices'].append(eos) self._cvp_server.api.apply_configlets_to_device(app_name='Apply configlet to device', dev=eos, new_configlets=[self._configlet]) # noqa E501 logging.info('Configlet %s has been applied to all devices', self._configlet['name'])
[docs] def remove_device(self, devices_hostnames): """Remove device(s) from a configlet. Remove device from configlet and create a task on CVP to remove configuration generated by configlet from device. For every hostname defined in devices_hostnames, a lookup is done to get a complete data set for that device and a call to remove device is sent. Warnings -------- This function never send a call to execute task. it is managed by logic out of that object Arguments: devices_hostnames {list} -- List of devices hostname to remove from the configlet. """ cvp_inventory = CvpInventory(cvp_server=self._cvp_server) logging.info('remove devices from configlet %s', self._configlet['name']) for host in devices_hostnames: eos = cvp_inventory.get_device_dict(name=host) if eos is not None: logging.info('remove configlet %s from %s', self._configlet['name'], eos['hostname']) self._cvp_server.api.remove_configlets_from_device(app_name='Python Configlet Remove device', dev=eos, del_configlets=[self._configlet]) # noqa E501 logging.info('Configlet %s has been updated by removing some devices', self._configlet['name'])
@classmethod def _get_task_id(cls, task_reply): """Extract task ID from server reply. When running api.apply_configlets_to_device() CVP server sends back a complete message. This function extract specific taskId field to then track it in the task manager Parameters ---------- task_reply : dict Server reply to a api.apply_configlets_to_device Returns ------- str taskID sent by server """ if 'data' in task_reply: if 'taskIds' in task_reply['data']: logging.debug('Task ID is %s', task_reply['data']['taskIds']) return task_reply['data']['taskIds'][0] return None
[docs] def _wait_task(self, task_id, timeout=10): """Wait for Task execution. As API call is asynchronous, task will run avec after receiving a status. This function implement a wait_for to get final status of a task As we have to protect against application timeout or task issue, a basic timeout has been implemented Parameters ---------- task_id : str ID of the task provided by self._get_task_id() timeout : int optional - Timeout to wait for before assuming task failed Returns: -------- dict Last status message collected from the server """ state = dict() state['taskStatus'] = None loop_timer = 0 while str(state['taskStatus']) != 'COMPLETED' and loop_timer < timeout: time.sleep(1) state = self._cvp_server.api.get_task_by_id(int(task_id)) logging.debug(' ** Wait for task completion (status: %s) / waiting for %s sec', # noqa E501 state['taskStatus'], str(loop_timer)) loop_timer += 1 return state
[docs] def deploy(self, device, schedule_at=None, task_timeout=10): r"""Deploy One configlet to One device. This function manage a deployment this configlet to a given device already attached to the configlet. Parameters ---------- device : dict dict representing a device schedule_at : str Optional - scheduler to run deployment at a given time task_timeout : int Optional - Timeout for task execution default is 10 seconds Warnings -------- ``schedule_at`` option is not yet implemented and shall not be used Returns ------- dict message from server """ if schedule_at is None: logging.warning('[%s] - Configlet %s is going to be deployed \ immediately', device['hostname'], self._configlet['name']) configlet_deploy = self._cvp_server.api.apply_configlets_to_device( app_name='Update devices', dev=device, new_configlets=[self._configlet], create_task=True) # Create task on CVP to update a device task_id = self._get_task_id(task_reply=configlet_deploy) task_status = self._cvp_server.api.get_task_by_id(int(task_id)) logging.info('[%s] - Task %s status : %s', device['hostname'], str(task_id), str(task_status['taskStatus']).upper()) # Execute task self._cvp_server.api.execute_task(task_id=task_id) task_status = self._wait_task(task_id=task_id, timeout=task_timeout) logging.info('[%s] - Task %s status : %s', device['hostname'], str(task_id), str(task_status['taskStatus']).upper()) task = dict() task['id'] = task_id task['status'] = task_status return task return None
[docs] def deploy_bulk(self, device_list=None, schedule_at=None, task_timeout=10): """Run configlet deployment against all devices. Run configlet deployment over all devices attached to this configlet. Every single deployment are managed by function self.deploy() Parameters ---------- device_list : list List of devices if it is set to None, then, fallback is to use devices discover initially at : str Optional scheduler to run deployment at a given time task_timeout : int Optional - Timeout for task execution. Default is 10 seconds Warnings -------- ``schedule_at`` option is not yet implemented and shall not be used Returns ------- list A list of tasks executed for the deployment """ tasks = list() logging.warning('Start tasks to deploy configlet %s to devices', self._configlet['name']) if device_list is None: device_list = self._configlet['devices'] for dev in device_list: if 'systemMacAddress' not in dev or 'hostname' not in dev: # If information are missing, then we can't deploy using API logging.warning('[%s] - information are missing to run \ deployment of %s', dev['hostname'], self._configlet['name']) break # Assuming we have enought information to deploy logging.info('[%s] - Start to update device with configlet %s', dev['hostname'], self._configlet['name']) task = self.deploy(device=dev, schedule_at=None, task_timeout=task_timeout) # Parsing task content to see if we can append to the list or not. if 'taskStatus' in task['status']: if task['status']['taskStatus'].upper() == 'COMPLETED': logging.info('[%s] - Updated', dev['hostname']) tasks.append(task) else: logging.error('[%s] - Not updated as expected, \ please check your CVP', dev['hostname']) logging.error('[%s] - Status is: %s', dev['hostname'], task['status']) return tasks
###############################################################################
[docs]class CvpChangeControl(object): """Change-control class to provide generic method for CVP CC mechanism. Change Control structure is based on: - A name to identify change - A list of tasks already created on CVP and on pending state - An optional scheduling. If no schedule is defined, then task will be run 3 minutes after creatio of CC **List of Available methods:** Methods ------- add_task() Append a task to self._list_changes get_tasks() Return list of of available tasks for this CC get_list_changes() Return list of tasks attached to this CC create() Create change-control on CVP server Todo ---- - Implement a way to get snapshot IDs based on name Warnings -------- - Change Control execution is not running snapshot before and after """
[docs] def __init__(self, cvp_server, name='Automated_Change_Control'): """Class Constructor. Build class content with followinactivities: - save cvp_server information - save name for CC - instanciate list for tasks - Collect tasks available from CVP Parameters ---------- cvp_server : CvpClient CVP Server information name : str Optional - Name of the Change Control. Default is ``Automated_Change_Control`` """ logging.debug('create instance of CvpChangeControl') self._cvp_server = cvp_server self._name = name # List of available tasks from server self._available = list() # List to save tasks to run with their order # Ex: [{'taskId': '100', 'taskOrder': 1}, # {'taskId': '101', 'taskOrder': 1}, # {'taskId': '102', 'taskOrder': 2}] self._list_changes = list() self._retrieve_tasks()
[docs] def _retrieve_tasks(self): """Extract tasks from CVP Server. Connect to CVP server and collect tasks in pending state These tasks are saved in self._available structure dedicated to pending tasks. """ logging.debug('getting list of available task for change control') self._available = self._cvp_server.api.change_control_available_tasks()
[docs] def add_task(self, task): """Add a tasks to available list. This task attach this new tasks to the pending tasks list. Parameters ---------- task : str TaskID from CVP server """ self._available.append(task)
[docs] def get_tasks(self, refresh=False): """Provide list of all available tasks. Return list of all tasks getting from CVP and/or attached with add_task method. Parameters ---------- refresh : bool Optional - Make a call to CVP to get latest list of tasks Returns ------- list List of available tasks found in this CC """ logging.debug('extractig list of available tasks out of our instance') if refresh: logging.debug('refreshing list of tasks available for change control') # noqa E501 self._retrieve_tasks() return self._available
[docs] def _build_change_dictionnary(self, order_mode='linear'): """Build ordered list to schedule changes. CVP Change Control expect a list with an order to run tasks. By default, all tasks are executed at the same time. But using order_mode set to incremental every task will be scheduled sequentially in this change-control Parameters ---------- order_mode : str Optional - Method to build task list. Shall be ``linear`` or ``incremental``. Note ---- Only linear has been tested. """ logging.info('Building a dictionary of changes') change_position = 1 for task in self._available: change = dict() change['taskId'] = task['workOrderId'] change['taskOrder'] = (change_position) logging.debug(' > Adding task %s to position %s', change['taskId'], change['taskOrder']) self._list_changes.append(change) if order_mode == 'incremental': change_position += 1
[docs] def get_list_changes(self, mode='linear'): """Return list of tasks and their execution order. Parameters ---------- mode : str Information about tasks scheduling. Shall be ``linear`` or ``incremental``. Note ---- Only linear has been tested. Returns ------- list List of changes and their order """ if len(self._list_changes) == 0: self._build_change_dictionnary(order_mode=mode) return self._list_changes
# TODO: manage way to retrieve Template ID
[docs] def create(self, mode='linear', country='France', tz='Europe/Paris', schedule=False, schedule_at='', snap_template='1708dd89-ff4b-4d1e-b09e-ee490b3e27f0', change_type='Custom', stop_on_error="true"): """Create a change-control. Parameters ---------- mode : str Optional - method to order tasks (default : linear) country : str Optional - Country requested by CVP API (default:France) tz : str Optional - Timezone required by CVP (default: Europe/Paris) schedule : bool Optional - Enable CC scheduling (default: False) schedule_at : str Optional - Time to execute CC if scheduled snap_template : str Optional - Snapshot template ID to run before / after tasks change_type : str Optional - CVP definition for CC Might be Custom or Rollback. (default: Custom) stop_on_error : str Optional - boolean string to stop CVP on errors Returns ------- dict CVP creation result (None if error occurs) """ # If scheduling is not enable, then we create cahnge control # to be run now+3 minutes by default if schedule is False: schedule_at = (datetime.now() + timedelta(seconds=180)).strftime("%Y-%m-%d %H:%M") # noqa E501 logging.debug('configure execution time in +3 minutes (%s)', schedule_at) # If list of changes to apply hsa not been built already, # then we do it before creating change request if len(self._list_changes) == 0: self._build_change_dictionnary(order_mode=mode) if LOGLEVEL == logging.DEBUG: logging.debug('Tasks to attach to current change-control:') for entry in self._list_changes: logging.debug(' * Found task %s w/ position %s', entry['taskId'], entry['taskOrder']) print(json.dumps(self._list_changes, indent=2, sort_keys=True)) # FIXME: change-control does not set snapshot ID correctly and this one is not run before and after change # Fix implemented in develop version : # https://github.com/aristanetworks/cvprac/blob/develop/cvprac/cvp_api.py#L1633 # pip install pip install git+https://github.com/aristanetworks/cvprac.git@develop # Should solve problem try: creation_request = self._cvp_server.api.create_change_control(name=self._name, # noqa E501 change_control_tasks=self._list_changes, timezone=tz, country_id=country, date_time=schedule_at, snapshot_template_key=snap_template, change_control_type=change_type, stop_on_error=stop_on_error) return creation_request except CvpApiError as err: logging.error('Cannot create change-control - error message is %s', format(err)) return None
###############################################################################
[docs]def load_constant(key_name, default='UNSET', verbose=False): r"""Set up constant value from OS Environment. Help to define CONSTANT from OS Environment. If it is not defined, then, fallback to default value provided within parameters :Example: >>> USERNAME = load_constant(key_name='USERNAME_1', default='myUser') >>> print USERNAME myUsername :param key_name: VAR to lookup in os.environment :type key_name: str :param default: Default value to use if key_name is not defined. :type default: str :param verbose: Boolean to activate verbos mode (Optional, expected values: debug level) :type verbose: bool :returns: Value for variable :rtype: str """ if key_name in os.environ: if verbose: logging.debug("%s is set to %s", key_name, os.environ.get(key_name)) return os.environ[key_name] else: if verbose: logging.debug('%s is not set - using default (%s)', key_name, str(default)) # noqa E501 return default
[docs]def config_read(config_file="actions.json"): r"""Read JSON configuration. Load information from JSON file defined in ``config_file`` First, method check if file exists or not and then try to load content using ``json.load()`` If file is not a JSON or if file does not exist, method return None **Data structure to read**:: [ { "name": "new CVP Configlet", "action": "add", "configlet": "configlet.example/VLANsTEMP", "devices": [ "leaf1", "leaf2", "leaf3" ] }, { "name": "new CVP Configlet", "action": "update", "configlet": "configlet.examples/VLANs" } ] Parameters ---------- config_file : str Path to the configuration file Returns ------- json Json structure with all actions to execute """ try: os.path.isfile(config_file) logging.debug('loading config from %s', config_file) with open(config_file) as file_content: config = json.load(file_content) return config except IOError: logging.critical('!! File not found: %s', config_file) return None
[docs]def connect_to_cvp(parameters): """Create a CVP connection. parameters option should at least contain following elements: - username - password - cvp (server IP or DNS hostname) Parameters ---------- parameters : dict Object with all information to create connection Returns ------- cvprack.CvpClient cvp client object """ client = CvpClient() try: client.connect([parameters.cvp], parameters.username, parameters.password, 10, CVP_PROTO, CVP_PORT) logging.info('Connected to %s', CVP_HOST) except CvpLoginError, e: # If error, then, printout message and quit program # If server cannot be reached, then no need to go further logging.error('Can\'t connect to %s', parameters.cvp) logging.error('Error message is: %s', str(e).replace('\n', ' ')) quit() return client
[docs]def action_update(configlet_def, parameters): """Manage actions to UPDATE and existing configlet. Create CVP connection and instantiate a CvpConfiglet object Then call appropriate method to start object update And finally run tasks Parameters option should at least contain following elements: - username - password - cvp (server IP or DNS hostname) Parameters ---------- configlet_def : dict Data from JSON to describe configlet parameters : dict Object with all information to create connection """ # Create an HTTP session to the CVP server client = connect_to_cvp(parameters) logging.info('*************') logging.info('Starting working with %s', configlet_def['configlet']) my_configlet = CvpConfiglet(cvp_server=client, configlet_file=configlet_def['configlet']) if my_configlet.on_cvp(): logging.info('Start to deploy new version of %s configlet', configlet_def['configlet']) my_configlet.update_configlet() else: logging.warning('%s is not configured on CVP server, fallback to ADD method', # noqa E501 my_configlet.name()) action_add(configlet_def=configlet_def, parameters=parameters) # Now check if configlet has to be applied on devices. # If not, it means a task should be run manually # or a Change Control should be defined as a next item. if 'apply' in configlet_def: if configlet_def['apply']: logging.info('configlet is gonna be deployed') if 'devices' not in configlet_def: logging.info('deploy target is set to all attached devices') my_configlet.deploy_bulk() else: logging.info('configlet will be deployed on some devices') cvp_inventory = CvpInventory(client) for device in configlet_def['devices']: dev_inventory = cvp_inventory.get_device_dict(device) if dev_inventory is not None: logging.info(' > Depoy %s on %s', my_configlet.name, device) my_configlet.deploy(dev_inventory) else: logging.warning('deploy option has not been set for the configlet') logging.warning('--> doing nothing') else: logging.warning('deploy option has not been set for the configlet') logging.warning('--> doing nothing')
[docs]def action_remove_devices(configlet_def, parameters): """Manage actions to remove devices from an existing configlet. Create CVP connection to check if configlet exists. if it exists, then call CvpConfiglet.remove_device() to remove all devices listed in the task. If apply option is set to true, then, generated tasks are applied by CVP. Otherwise, user has to do it manually Parameters option should at least contain following elements: - username - password - cvp (server IP or DNS hostname) Parameters ---------- configlet_def : dict Data from JSON to describe configlet parameters : dict Object with all information to create connection """ # Create an HTTP session to the CVP server client = connect_to_cvp(parameters) logging.info('*************') logging.info('Starting working with %s', configlet_def['configlet']) my_configlet = CvpConfiglet(cvp_server=client, configlet_name=configlet_def['configlet']) if my_configlet.on_cvp(): logging.info('%s configlet has been found on CVP', configlet_def['configlet']) my_configlet.remove_device(devices_hostnames=configlet_def['devices']) # Check if configlet must be deployed to devices if 'apply' in configlet_def: action_apply(configlet_def=configlet_def, my_configlet=my_configlet) else: logging.warning('deploy option has not been set for the configlet') logging.warning('--> doing nothing') return True
[docs]def action_add_devices(configlet_def, parameters): """Manage actions to add devices to an existing configlet. Create CVP connection to check if configlet exists. if it exists, then call CvpConfiglet.add_device() to add all devices listed in the json task. If apply option is set to true, then, generated tasks are applied by CVP. Otherwise, user has to do it manually Parameters option should at least contain following elements: - username - password - cvp (server IP or DNS hostname) Parameters ---------- configlet_def : dict Data from JSON to describe configlet parameters : dict Object with all information to create connection """ # Create an HTTP session to the CVP server client = connect_to_cvp(parameters) logging.info('*************') logging.info('Starting working with %s', configlet_def['configlet']) my_configlet = CvpConfiglet(cvp_server=client, configlet_name=configlet_def['configlet']) if my_configlet.on_cvp(): logging.info('%s configlet has been found on CVP', configlet_def['configlet']) my_configlet.add_device(device_hostnames=configlet_def['devices']) # Check if configlet must be deployed to devices if 'apply' in configlet_def: action_apply(configlet_def=configlet_def, my_configlet=my_configlet) else: logging.warning('deploy option has not been set for the configlet') logging.warning('--> doing nothing') return True
[docs]def action_apply(configlet_def, my_configlet): """Instruct CvpConfiglet to run tasks. Method to execute repetitive action instructing CvpConfiglet to execute pending tasks Parameters ---------- configlet_def : dict Dictionnary loaded from JSON file. represent one action and must have an 'apply' field defined my_configlet : CvpConfiglet A CvpConfiglet object already instantiated. It is used to call deploy_bulk method """ if configlet_def['apply']: logging.info('configlet is gonna be deployed') logging.info('deploy target is set to all attached devices') my_configlet.deploy_bulk() else: logging.warning('deploy option has not been set for the configlet') # noqa E501 logging.warning('--> doing nothing')
[docs]def action_add(configlet_def, parameters): """Manage actions to ADD a configlet. Create CVP connection and instantiate a CvpConfiglet object Then call appropriate method to start object creation If apply option is set to true, then, generated tasks are applied by CVP. Otherwise, user has to do it manually Parameters option should at least contain following elements: - username - password - cvp (server IP or DNS hostname) Parameters ---------- configlet_def : dict Data from JSON to describe configlet parameters : dict Object with all information to create connection """ # Create an HTTP session to the CVP server client = connect_to_cvp(parameters) logging.info('*************') logging.info('Start working with %s', configlet_def['configlet']) my_configlet = CvpConfiglet(cvp_server=client, configlet_file=configlet_def['configlet']) logging.info('Start to create new configlet: %s', configlet_def['configlet']) if my_configlet.on_cvp(): logging.warning('%s is already configured on CVP server, fallback to UPDATE method', # noqa E501 my_configlet.name()) action_update(configlet_def=configlet_def, parameters=parameters) else: # a list of device must be available to create configlet # if list is missing, then we have to break process if 'devices' not in configlet_def: logging.error('Configlet has no devices configured,\ cannot create configlet on server') return False # Now create configlet on CVP server my_configlet.deploy_configlet(device_hostnames=configlet_def['devices']) # Check if configlet must be deployed to devices if 'apply' in configlet_def: action_apply(configlet_def=configlet_def, my_configlet=my_configlet) else: logging.warning('deploy option has not been set for the configlet') logging.warning('--> doing nothing') return True
[docs]def action_delete(configlet_def, parameters): """Manage actions to DELTE a configlet. Create CVP connection and instantiate a CvpConfiglet object Then call appropriate method to start object deletion Parameters option should at least contain following elements: - username - password - cvp (server IP or DNS hostname) Parameters ---------- configlet_def : dict Data from JSON to describe configlet parameters : dict Object with all information to create connection """ client = connect_to_cvp(parameters) logging.info('*************') logging.info('Start working with %s', configlet_def['configlet']) my_configlet = CvpConfiglet(cvp_server=client, configlet_file=configlet_def['configlet']) logging.info('Start to delete existing configlet: %s', configlet_def['configlet']) my_configlet.delete_configlet()
# FIXME: Apply is not supported for configlet deletion # Please see issue #16 on github # https://github.com/titom73/configlet-cvp-uploader/issues/16
[docs]def action_create_change_control(parameters, data): """Create a Change-Control. Create a change-control on CVP based on a JSON definition. Current version supports following entries in JSON: - name: change-control name configured on CVP - type: change-control (Must be set with this vaue to engage CC) - country: Country required by CVP for CC - timezone: Timezone required by CVP to run changes **Expected inputs data** JSON file:: [ { "name": "Python_CC", "type": "change-control", "country": "France", "timezone": "Europe/Paris" } ] Todo ---- Manage way to retrieve Template ID / As feature is not part of CVPRAC, ``snapid`` shall be part of the job definition. If not, then we configure it to ``None`` Parameters ---------- configlet_def : dict Data from JSON to describe configlet parameters : dict Object with all information to create connection """ client = connect_to_cvp(parameters) logging.info('start change-control creation') change_control = CvpChangeControl(cvp_server=client, name=data['name'].replace(' ', '_')) if 'schedule_at' not in data: data['schedule_at'] = (datetime.now() + timedelta(seconds=180)).strftime("%Y-%m-%d %H:%M") # noqa E501 # Check if mandatory values are set. # Otherwise load default if 'timezone' not in data: logging.debug('Timezone not set in json, using default one: %s', CVP_TZ) data['timezone'] = CVP_TZ if 'country' not in data: logging.debug('Country not set in json, using default one: %s', CVP_COUNTRY) data['country'] = CVP_COUNTRY if 'snapid' not in data: logging.debug('Snapshot ID not configured') data['snapid'] = 'None' if 'apply' in data and data['apply'] is True: logging.info('Scheduling change control to be executed at %s', data['schedule_at']) result = change_control.create(tz=data['timezone'], country=data['country'], schedule=True, schedule_at=data['schedule_at'], snap_template=data['snapid'], change_type='Custom', stop_on_error="true") # noqa E501 else: logging.info('change control must be executed manually') result = change_control.create(tz=data['timezone'], country=data['country'], schedule=True, snap_template=data['snapid'], change_type='Custom', stop_on_error="true") # noqa E501 if result is not None: logging.info('!change-control creation is %s (id %s)', result['data'], result['ccId']) # noqa E501
# Main part of the script if __name__ == '__main__': # CVP information # Load information from your shell environment (export CVP_USER=cvpadmin) # If not set, then use default values. # It can be manually override with CLI option using argparse CVP_HOST = load_constant(key_name='CVP_HOST', default='127.0.0.1') CVP_PORT = int(load_constant(key_name='CVP_PORT', default=443)) CVP_PROTO = load_constant(key_name='CVP_PROTO', default='https') CVP_USER = load_constant(key_name='CVP_USER', default='username') CVP_PASS = load_constant(key_name='CVP_PASS', default='password') CVP_TZ = load_constant(key_name='CVP_TZ', default='France') CVP_COUNTRY = load_constant(key_name='CVP_COUNTRY', default='France') LOG_LEVEL = load_constant(key_name='LOG_LEVEL', default='info') JSON_CONFIG = load_constant(key_name='CVP_JSON', default='actions.json') CERT_VALIDATION = bool(load_constant(key_name='CERT_VALIDATION', default=False)) # Argaparser to load information using CLI PARSER = argparse.ArgumentParser(description="Configlet Uploader to CVP ", version=__version__) PARSER.add_argument('-c', '--configlet', help='Configlet path to use on CVP', type=str, default=None) PARSER.add_argument('-u', '--username', help='Username for CVP', type=str, default=CVP_USER) PARSER.add_argument('-p', '--password', help='Password for CVP', type=str, default=CVP_PASS) PARSER.add_argument('-s', '--cvp', help='Address of CVP server', type=str, default=CVP_HOST) PARSER.add_argument('-d', '--debug_level', help='Verbose level (debug / info / war\ ning / error / critical)', type=str, default=LOG_LEVEL) PARSER.add_argument('-j', '--json', help='File with list of actions to execute)', type=str, default=JSON_CONFIG) # PARSER.add_argument('-j', '--json', # help='File with list of actions to execute)', # type=str, default=JSON_CONFIG) OPTIONS = PARSER.parse_args() # Logging configuration LEVELS = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL} LOGLEVEL = LEVELS.get(OPTIONS.debug_level, logging.NOTSET) logging.basicConfig( format='%(asctime)s %(levelname)-8s %(message)s', level=LOGLEVEL, datefmt='%Y-%m-%d %H:%M:%S') # activate SSL enforcement or not # (should be disabled for self-signed certs) if CERT_VALIDATION is False: requests.packages.urllib3.disable_warnings(InsecureRequestWarning) logging.debug('CVP Client inputs:') logging.debug(' > Username is %s', OPTIONS.username) logging.debug(' > Server is %s', OPTIONS.cvp) logging.debug(' > Port is %s', CVP_PORT) logging.debug(' > Proto is %s', CVP_PROTO) print '\n--------------------\n' # If OPTIONS.configlet is defined we assume user wants to make short update # Then we don't care about JSON # If OPTIONS.configlet is not defined, # Then, OPTIONS.json MUST be configured if OPTIONS.configlet is not None: CONFIGLET = dict() CONFIGLET['name'] = 'Short path update' CONFIGLET['configlet'] = OPTIONS.configlet CONFIGLET['action'] = 'update' logging.info('task %s is going to update %s', CONFIGLET['name'], CONFIGLET['configlet']) action_update(configlet_def=CONFIGLET, parameters=OPTIONS) else: # Parse actions defined in OPTIONS.json # Based on action field, script will execute different # set of actions LIST_ACTIONS = config_read(config_file=OPTIONS.json) for ACTION in LIST_ACTIONS: if 'type' not in ACTION: logging.critical('task %s does not have type defined - skipping', ACTION['name']) # noqa E501 break if 'action' in ACTION and ACTION['type'] == 'configlet': if ACTION['action'] == 'add': logging.info('configlet %s is going to be created %s', ACTION['name'], ACTION['configlet']) action_add(configlet_def=ACTION, parameters=OPTIONS) elif ACTION['action'] == 'delete': logging.info('configlet %s is going to be deleted %s', ACTION['name'], ACTION['configlet']) action_delete(configlet_def=ACTION, parameters=OPTIONS) elif ACTION['action'] == 'update': logging.info('configlet %s is going to be updated %s', ACTION['name'], ACTION['configlet']) action_update(configlet_def=ACTION, parameters=OPTIONS) elif ACTION['action'] == 'remove-devices': logging.info('configlet %s is going to be removed from devices', ACTION['name']) action_remove_devices(configlet_def=ACTION, parameters=OPTIONS) elif ACTION['action'] == 'add-devices': logging.info('configlet %s is going to be added to devices', ACTION['name']) action_add_devices(configlet_def=ACTION, parameters=OPTIONS) else: logging.error('Unsupported action.\ Please use add / delete / update only') print '\n--------------------\n' elif ACTION['type'] == 'change-control': logging.info('Implementation in progress -- expect some issues') action_create_change_control(OPTIONS, ACTION) else: logging.warning('task is not supported -- skipping') logging.info('Wait 10 sec before next action') time.sleep(10)