#!/usr/local/bin/python
import sys
import argparse
import json
import logging
import os
import time
from datetime import datetime
from datetime import timedelta
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpLoginError
from cvprac.cvp_client_errors import CvpApiError
import requests
# Import to optionnaly disable HTTPS cert validation
from requests.packages.urllib3.exceptions import InsecureRequestWarning
r"""Script to manage configlet on a CVP server.
It provides mechanismes to create / update / delete configlet
through CVP APIs. It is based on cvprack module with its API
method to manage CVP.
Idea is to make a generic script to enhance reuse factor and
provides a generic tools to implement automation workflow
CLI Examples
------------
- Short Update method::
$ python cvp-configlet-uploader.py -c configlet.examples/VLANs
- Generic method using JSON inputs::
$ python cvp-configlet-uploader.py -j actions.json
Python Examples
---------------
>>> import cvprack
>>> client = CvpClient()
>>> try:
>>> client.connect([CVP_HOST], CVP_USER, CVP_PASS, 10, CVP_PROTO,
CVP_PORT)
>>> logging.info('Connected to %s', CVP_HOST )
>>> except CvpLoginError, e :
>>> # If error, then, printout message and quit program
>>> # If server cannot be reached, then no need to go further
>>> logging.error('Can\'t connect to %s', CVP_HOST )
>>> logging.error('Error message is: %s',str(e).replace('\n',' '))
>>> quit()
>>> logging.info('- Starting working with configlet.examples/VLANs')
>>> my_configlet = CvpConfiglet(cvp_server=client,
configlet_file='configlet.examples/VLANs')
>>> logging.info('Start to deploy new version of VLANs configlet')
>>> my_configlet.update_configlet()
>>> my_configlet.deploy_bulk(at=None)
WARNING
-------
Due to a change in CVP API, change-control needs to get snapshot referenced per
task. Current version of ``cvprack`` does not support it in version 1.0.1
Fix is available in develop version. To install development version, use pip::
$ pip install git+https://github.com/aristanetworks/cvprac.git@develop
Attributes:
-----------
``CERT_VALIDATION``: bool
Describe if we have to validate or not server certificate
``CVP_HOST``: str
Hostname / IP address of CVP server
``CVP_PASS``: str
Password to connect to CVP
``CVP_PORT``: int
Port to use for connection
``CVP_PROTO``: str
HTTP or HTTPS transport layer
``CVP_USER``: str
Username to use for CVP connection
``CVP_TZ``: str
Time-zone to use to configure change-control
``CVP_COUNTRY``: str
Country to use to schedule change-control task
``LOG_LEVEL``: str
Log level for getting information on stdout
(DEBUG / INFO / WARNING / ERROR)
``JSON_CONFIG``: str
PATH to default JSON configuration file
Todo
----
Following features are planned to be implemented in coming versions:
- Implement method to ADD/REMOVE device to configlet.
- Fallback to update if configlet exists.
"""
# Code version
__version__ = '0.9.4'
# Author information
__author__ = 'Thomas Grimonet'
__email__ = 'tgrimonet@arista.com'
# LOG_LEVEL definition (DEBUG / INFO / WARNING / ERROR)
LOG_LEVEL = logging.DEBUG
# Default JSON file to get list of actions
JSON_CONFIG = 'actions.json'
###############################################################################
[docs]class CvpInventory(object):
"""CVP Inventory Class.
Get complete inventory from CVP and expose some functions to get data.
It is RO only and nothing is pushed to CVP with this object.
"""
[docs] def __init__(self, cvp_server):
"""Class Constructor.
Instantiate an Inventory with a REST call to get device list
Parameters
----------
cvp_server : cvprack.CvpClient()
Your CVP Rack server
"""
self._cvp_server = cvp_server
self._inventory = self._cvp_server.api.get_inventory()
[docs] def get_device_dict(self, name):
"""Get information for a give device.
Parameters
----------
``name``: str
Hostname to lookup
Returns
-------
dict:
Complete dictionnary sent by CVP
"""
for device in self._inventory:
if device['hostname'] == name:
return device
return None
[docs] def get_devices(self):
"""Give a dict of all devices.
Returns
-------
dict: All devices attached to CVP inventory
"""
return self._inventory
###############################################################################
[docs]class CvpConfiglet(object):
"""Configlet class to provide generic method to manage CVP configlet.
**Data Structure**
Configlet structure is a name based dictionnary with following keys:
- ``name``: Name of configlet. This name is built from filename
- ``file``: Complete path of the local configlet file
- ``content``: Local Configlet content read from ``configlet['file']``
- ``key``: Key ID defined by CVP to identify configlet.
it is found by our instance during update, addition or deletion
- ``devices``: List of devices structure compliant
with ``CvpApi.get_device_by_name()`` It can be found by
using ``CvpInventory`` object.
**List of attributes:**
Attributes
----------
_cvp_server
cvprac.CvpClient() object to manage CVP connection
_devices_configlet
List of devices attached to configlet
_configlet
Dictionary with all configlet information: ``name``, ``file``,
``content``, ``key``, ``devices``
_cvp_found
Boolean to get status of configlet on CVP: True if configlet is on
server, False other cases
**List of Available methods:**
Methods
-------
get_devices()
Get list of devices for this specific configlet
update_configlet()
Start update process for that configlet.
Do not deploy content to devices
deploy_configlet()
Start configlet creation process.
Do not deploy content to devices
delete_configlet()
Start configlet deletion process. Do not deploy content to devices
deploy()
Deploy (add/update) change to a single device
deploy_bulk()
Deploy (add/update) change to all devices
on_cvp()
Inform about configlet available on CVP
Note
----
This class use call to ``cvprac`` to get and push data to CVP server.
"""
[docs] def __init__(self, cvp_server, configlet_file=None, configlet_name=None):
r"""Class Constructor.
Parameters
----------
``cvp_server``: CvpClient
CvpClient object from cvprack. Gives methods to manage CVP API
``configlet_file``: str
Path to configlet file.
"""
self._cvp_server = cvp_server
# create data storage for configlet
# and tasks related to deployment
self._configlet_init()
self._task_init()
self._devices_configlet = list()
# Extract information from configlet filename
if configlet_file is not None:
self._configlet['file'] = configlet_file
self._configlet['name'] = os.path.basename(configlet_file)
logging.debug('configlet basename is [%s]',
self._configlet['name'])
if configlet_name is not None:
self._configlet['name'] = configlet_name
# check if configlet is present on CVP server
if self._configlet_lookup():
logging.info('Configlet [%s] found on %s', self._configlet['name'],
CVP_HOST)
logging.info('Get list of applied devices from server')
self.get_devices(refresh=True)
self._cvp_found = True
else:
self._cvp_found = False
logging.warning('Configlet NOT found on %s', CVP_HOST)
[docs] def _configlet_lookup(self):
"""Check if a configlet is already present on CVP.
Check if CVP has already a configlet configured with the same name.
If yes return True and report key under self._configlet['key']
If no, return False
Returns
-------
bool
Return ``True`` or ``False`` if configlet name is
already configured on CVP
"""
try:
result = self._cvp_server.api.get_configlet_by_name(name=self._configlet['name']) # noqa E501
if 'key' in self._configlet:
self._configlet['key'] = result['key']
return True
except CvpApiError: # noqa E722
return False
[docs] def _configlet_init(self):
"""Create an empty dict for configlet."""
logging.debug('Init configlet object')
self._configlet = dict()
self._configlet['name'] = None
self._configlet['file'] = None
self._configlet['content'] = None
self._configlet['key'] = None
self._configlet['devices'] = list()
[docs] def _task_init(self):
"""Create an empty dict for task."""
logging.debug('Init task object')
self._task = dict()
self._task['id'] = None
self._task['run_at'] = None
self._task['result'] = None
[docs] def name(self):
"""Expose name of the configlet.
Returns
-------
str
Name of configlet built by ``__init__``
"""
return self._configlet['name']
[docs] def on_cvp(self):
"""Expose flag about configlet configured on CVP.
Return True if configlet is configured on CVP and can be updated.
If configlet is not present, then, False
Returns
-------
bool
True if configlet already configured on CVP, False otherwise
"""
return self._cvp_found
[docs] def _retireve_devices(self):
r"""Get list of devices attached to the configlet.
If configlet exists, then, retrieve a complete list of devices
attached to it.
Returns
-------
list
List of devices from CVP
"""
self._devices_configlet = list()
inventory = CvpInventory(self._cvp_server).get_devices()
logging.info('Start looking for devices attached to [%s]',
self._configlet['name'])
for device in inventory:
if 'systemMacAddress' in device:
try:
configlet_list = self._cvp_server.api.get_configlets_by_device_id(mac=device['systemMacAddress']) # noqa E501
except CvpLoginError, e:
logging.error('Error when getting list of configlet for device %s: %s', device['hostname'], str(e).replace('\n', ' ')) # noqa E501
quit()
for configlet in configlet_list:
logging.debug(' > Found configlet: %s for device [%s]', configlet['name'], device['hostname']) # noqa E501
if configlet['name'] == self._configlet['name']:
self._configlet['devices'].append(device)
logging.info(' > Configlet [%s] is applied to %s with sysMacAddr %s', self._configlet['name'], device['hostname'], device['systemMacAddress']) # noqa E501
return self._devices_configlet
[docs] def get_configlet_info(self):
"""To share configlet information.
Returns
-------
dict
dictionnary with configlet information
"""
return self._configlet
[docs] def get_devices(self, refresh=False):
r"""To share list of devices attached to the configlet.
If list is empty or if refresh trigger is active,
function will get a new list of device from self._retireve_devices()
Otherwise, just send back list to the caller
Parameters
----------
refresh : bool
Update device list from CVP (Optional)
Returns
-------
list
List of devices from CVP
"""
if refresh or 'devices' not in self._configlet:
self._retireve_devices()
return self._configlet['devices']
[docs] def update_configlet(self):
"""Update configlet on CVP with content from object.
Check if configlet is configured on CVP server before pushing an update.
If configlet is not there, then, stop method execution.
Returns
-------
str : message from server with result
"""
logging.info('%s is going to be updated with local config',
self._configlet['name'])
try:
with open(self._configlet['file'], 'r') as content:
configlet = content.read()
status_deployment = self._cvp_server.api.update_configlet(key=self._configlet['key'], # noqa E501
name=self._configlet['name'], # noqa E501
config=configlet) # noqa E501
logging.warning('Server response: %s', status_deployment['data'])
return status_deployment['data']
except IOError:
logging.critical('!! File not found: %s',
self._configlet['file'])
sys.exit()
[docs] def deploy_configlet(self, device_hostnames):
"""Create configlet on CVP with content from object.
Create a new configlet on CVP server and attached it to all devices
you provide in your JSON file.
Device attachement is managed with a CvpInventory call
to get all information from CVP.
It means you just have to provide existing hostname in your JSON
Each time a device is attached to configlet on CVP, it is also added
in CvpConfiglet object for futur use
Parameters
----------
devices_hostname : list
List of hostname to attached to configlet
"""
logging.info('Create configlet %s', self._configlet['name'])
with open(self._configlet['file'], 'r') as content:
self._configlet['content'] = content.read()
self._configlet['key'] = self._cvp_server.api.add_configlet(name=self._configlet['name'], # noqa E501
config=self._configlet['content']) # noqa E501
# Use method to attach device to configlet.
self.add_device(device_hostnames=device_hostnames)
[docs] def delete_configlet(self):
"""Delete a configlet from CVP.
To protect, function first check if configlet exists, if not, we stop
and return to next action out of this function.
Remove configlet from all devices where it is configured
Then if configlet exist, remove configlet from CVP DB
Returns
-------
bool
``True`` if able to remove configlet / False otherwise
"""
logging.info('start to remove %s from CVP', self._configlet['name'])
if self._configlet['key'] is None:
logging.critical('Configlet not configured. Can\'t remove it')
return False
impacted_devices = self.get_devices()
for device in impacted_devices:
logging.info('[%s] - remove configlet %s', device['hostname'],
self._configlet['name'])
self._cvp_server.api.remove_configlets_from_device(app_name='CVP Configlet Python Manager', dev=device, del_configlets=[self._configlet]) # noqa E501
logging.info('remove %s from CVP', self._configlet['name'])
self._cvp_server.api.delete_configlet(name=self._configlet['name'],
key=self._configlet['key'])
return True
[docs] def add_device(self, device_hostnames):
"""Remove device(s) from a configlet.
Remove device from configlet and create a task on CVP to remove
configuration generated by configlet from device.
For every hostname defined in devices_hostnames, a lookup is done to get
a complete data set for that device and a call to remove device is sent.
Warnings
--------
This function never send a call to execute task. it is managed by logic
out of that object
Arguments:
devices_hostnames {list} -- List of devices hostname to remove from
the configlet.
"""
cvp_inventory = CvpInventory(cvp_server=self._cvp_server)
logging.info('add devices to configlet %s',
self._configlet['name'])
for host in device_hostnames:
eos = cvp_inventory.get_device_dict(name=host)
if eos is not None:
logging.info('Apply configlet %s to %s',
self._configlet['name'],
eos['hostname'])
self._configlet['devices'].append(eos)
self._cvp_server.api.apply_configlets_to_device(app_name='Apply configlet to device', dev=eos, new_configlets=[self._configlet]) # noqa E501
logging.info('Configlet %s has been applied to all devices',
self._configlet['name'])
[docs] def remove_device(self, devices_hostnames):
"""Remove device(s) from a configlet.
Remove device from configlet and create a task on CVP to remove
configuration generated by configlet from device.
For every hostname defined in devices_hostnames, a lookup is done to get
a complete data set for that device and a call to remove device is sent.
Warnings
--------
This function never send a call to execute task. it is managed by logic
out of that object
Arguments:
devices_hostnames {list} -- List of devices hostname to remove from
the configlet.
"""
cvp_inventory = CvpInventory(cvp_server=self._cvp_server)
logging.info('remove devices from configlet %s',
self._configlet['name'])
for host in devices_hostnames:
eos = cvp_inventory.get_device_dict(name=host)
if eos is not None:
logging.info('remove configlet %s from %s',
self._configlet['name'],
eos['hostname'])
self._cvp_server.api.remove_configlets_from_device(app_name='Python Configlet Remove device', dev=eos, del_configlets=[self._configlet]) # noqa E501
logging.info('Configlet %s has been updated by removing some devices',
self._configlet['name'])
@classmethod
def _get_task_id(cls, task_reply):
"""Extract task ID from server reply.
When running api.apply_configlets_to_device() CVP server
sends back a complete message. This function extract specific
taskId field to then track it in the task manager
Parameters
----------
task_reply : dict
Server reply to a api.apply_configlets_to_device
Returns
-------
str
taskID sent by server
"""
if 'data' in task_reply:
if 'taskIds' in task_reply['data']:
logging.debug('Task ID is %s', task_reply['data']['taskIds'])
return task_reply['data']['taskIds'][0]
return None
[docs] def _wait_task(self, task_id, timeout=10):
"""Wait for Task execution.
As API call is asynchronous, task will run avec after
receiving a status.
This function implement a wait_for to get final status of a task
As we have to protect against application timeout or task issue,
a basic timeout has been implemented
Parameters
----------
task_id : str
ID of the task provided by self._get_task_id()
timeout : int
optional - Timeout to wait for before assuming task failed
Returns:
--------
dict
Last status message collected from the server
"""
state = dict()
state['taskStatus'] = None
loop_timer = 0
while str(state['taskStatus']) != 'COMPLETED' and loop_timer < timeout:
time.sleep(1)
state = self._cvp_server.api.get_task_by_id(int(task_id))
logging.debug(' ** Wait for task completion (status: %s) / waiting for %s sec', # noqa E501
state['taskStatus'],
str(loop_timer))
loop_timer += 1
return state
[docs] def deploy(self, device, schedule_at=None, task_timeout=10):
r"""Deploy One configlet to One device.
This function manage a deployment this configlet to a
given device already attached to the configlet.
Parameters
----------
device : dict
dict representing a device
schedule_at : str
Optional - scheduler to run deployment at a given time
task_timeout : int
Optional - Timeout for task execution default is 10 seconds
Warnings
--------
``schedule_at`` option is not yet implemented and shall not be used
Returns
-------
dict
message from server
"""
if schedule_at is None:
logging.warning('[%s] - Configlet %s is going to be deployed \
immediately', device['hostname'], self._configlet['name'])
configlet_deploy = self._cvp_server.api.apply_configlets_to_device(
app_name='Update devices',
dev=device,
new_configlets=[self._configlet],
create_task=True)
# Create task on CVP to update a device
task_id = self._get_task_id(task_reply=configlet_deploy)
task_status = self._cvp_server.api.get_task_by_id(int(task_id))
logging.info('[%s] - Task %s status : %s',
device['hostname'],
str(task_id),
str(task_status['taskStatus']).upper())
# Execute task
self._cvp_server.api.execute_task(task_id=task_id)
task_status = self._wait_task(task_id=task_id,
timeout=task_timeout)
logging.info('[%s] - Task %s status : %s',
device['hostname'],
str(task_id),
str(task_status['taskStatus']).upper())
task = dict()
task['id'] = task_id
task['status'] = task_status
return task
return None
[docs] def deploy_bulk(self,
device_list=None,
schedule_at=None,
task_timeout=10):
"""Run configlet deployment against all devices.
Run configlet deployment over all devices attached to this configlet.
Every single deployment are managed by function self.deploy()
Parameters
----------
device_list : list
List of devices if it is set to None, then,
fallback is to use devices discover initially
at : str
Optional scheduler to run deployment at a given time
task_timeout : int
Optional - Timeout for task execution. Default is 10 seconds
Warnings
--------
``schedule_at`` option is not yet implemented and shall not be used
Returns
-------
list
A list of tasks executed for the deployment
"""
tasks = list()
logging.warning('Start tasks to deploy configlet %s to devices',
self._configlet['name'])
if device_list is None:
device_list = self._configlet['devices']
for dev in device_list:
if 'systemMacAddress' not in dev or 'hostname' not in dev:
# If information are missing, then we can't deploy using API
logging.warning('[%s] - information are missing to run \
deployment of %s',
dev['hostname'],
self._configlet['name'])
break
# Assuming we have enought information to deploy
logging.info('[%s] - Start to update device with configlet %s',
dev['hostname'],
self._configlet['name'])
task = self.deploy(device=dev, schedule_at=None,
task_timeout=task_timeout)
# Parsing task content to see if we can append to the list or not.
if 'taskStatus' in task['status']:
if task['status']['taskStatus'].upper() == 'COMPLETED':
logging.info('[%s] - Updated',
dev['hostname'])
tasks.append(task)
else:
logging.error('[%s] - Not updated as expected, \
please check your CVP',
dev['hostname'])
logging.error('[%s] - Status is: %s',
dev['hostname'],
task['status'])
return tasks
###############################################################################
[docs]class CvpChangeControl(object):
"""Change-control class to provide generic method for CVP CC mechanism.
Change Control structure is based on:
- A name to identify change
- A list of tasks already created on CVP and on pending state
- An optional scheduling. If no schedule is defined,
then task will be run 3 minutes after creatio of CC
**List of Available methods:**
Methods
-------
add_task()
Append a task to self._list_changes
get_tasks()
Return list of of available tasks for this CC
get_list_changes()
Return list of tasks attached to this CC
create()
Create change-control on CVP server
Todo
----
- Implement a way to get snapshot IDs based on name
Warnings
--------
- Change Control execution is not running snapshot before and after
"""
[docs] def __init__(self, cvp_server, name='Automated_Change_Control'):
"""Class Constructor.
Build class content with followinactivities:
- save cvp_server information
- save name for CC
- instanciate list for tasks
- Collect tasks available from CVP
Parameters
----------
cvp_server : CvpClient
CVP Server information
name : str
Optional - Name of the Change Control.
Default is ``Automated_Change_Control``
"""
logging.debug('create instance of CvpChangeControl')
self._cvp_server = cvp_server
self._name = name
# List of available tasks from server
self._available = list()
# List to save tasks to run with their order
# Ex: [{'taskId': '100', 'taskOrder': 1},
# {'taskId': '101', 'taskOrder': 1},
# {'taskId': '102', 'taskOrder': 2}]
self._list_changes = list()
self._retrieve_tasks()
[docs] def _retrieve_tasks(self):
"""Extract tasks from CVP Server.
Connect to CVP server and collect tasks in pending state
These tasks are saved in self._available structure dedicated
to pending tasks.
"""
logging.debug('getting list of available task for change control')
self._available = self._cvp_server.api.change_control_available_tasks()
[docs] def add_task(self, task):
"""Add a tasks to available list.
This task attach this new tasks to the pending tasks list.
Parameters
----------
task : str
TaskID from CVP server
"""
self._available.append(task)
[docs] def get_tasks(self, refresh=False):
"""Provide list of all available tasks.
Return list of all tasks getting from CVP and/or attached
with add_task method.
Parameters
----------
refresh : bool
Optional - Make a call to CVP to get latest list of tasks
Returns
-------
list
List of available tasks found in this CC
"""
logging.debug('extractig list of available tasks out of our instance')
if refresh:
logging.debug('refreshing list of tasks available for change control') # noqa E501
self._retrieve_tasks()
return self._available
[docs] def _build_change_dictionnary(self, order_mode='linear'):
"""Build ordered list to schedule changes.
CVP Change Control expect a list with an order to run tasks.
By default, all tasks are executed at the same time.
But using order_mode set to incremental every task will
be scheduled sequentially in this change-control
Parameters
----------
order_mode : str
Optional - Method to build task list.
Shall be ``linear`` or ``incremental``.
Note
----
Only linear has been tested.
"""
logging.info('Building a dictionary of changes')
change_position = 1
for task in self._available:
change = dict()
change['taskId'] = task['workOrderId']
change['taskOrder'] = (change_position)
logging.debug(' > Adding task %s to position %s',
change['taskId'],
change['taskOrder'])
self._list_changes.append(change)
if order_mode == 'incremental':
change_position += 1
[docs] def get_list_changes(self, mode='linear'):
"""Return list of tasks and their execution order.
Parameters
----------
mode : str
Information about tasks scheduling.
Shall be ``linear`` or ``incremental``.
Note
----
Only linear has been tested.
Returns
-------
list
List of changes and their order
"""
if len(self._list_changes) == 0:
self._build_change_dictionnary(order_mode=mode)
return self._list_changes
# TODO: manage way to retrieve Template ID
[docs] def create(self, mode='linear',
country='France',
tz='Europe/Paris',
schedule=False,
schedule_at='',
snap_template='1708dd89-ff4b-4d1e-b09e-ee490b3e27f0',
change_type='Custom',
stop_on_error="true"):
"""Create a change-control.
Parameters
----------
mode : str
Optional - method to order tasks (default : linear)
country : str
Optional - Country requested by CVP API (default:France)
tz : str
Optional - Timezone required by CVP (default: Europe/Paris)
schedule : bool
Optional - Enable CC scheduling (default: False)
schedule_at : str
Optional - Time to execute CC if scheduled
snap_template : str
Optional - Snapshot template ID to run before / after tasks
change_type : str
Optional - CVP definition for CC Might be Custom or Rollback.
(default: Custom)
stop_on_error : str
Optional - boolean string to stop CVP on errors
Returns
-------
dict
CVP creation result (None if error occurs)
"""
# If scheduling is not enable, then we create cahnge control
# to be run now+3 minutes by default
if schedule is False:
schedule_at = (datetime.now() + timedelta(seconds=180)).strftime("%Y-%m-%d %H:%M") # noqa E501
logging.debug('configure execution time in +3 minutes (%s)',
schedule_at)
# If list of changes to apply hsa not been built already,
# then we do it before creating change request
if len(self._list_changes) == 0:
self._build_change_dictionnary(order_mode=mode)
if LOGLEVEL == logging.DEBUG:
logging.debug('Tasks to attach to current change-control:')
for entry in self._list_changes:
logging.debug(' * Found task %s w/ position %s',
entry['taskId'],
entry['taskOrder'])
print(json.dumps(self._list_changes, indent=2, sort_keys=True))
# FIXME: change-control does not set snapshot ID correctly and this one is not run before and after change
# Fix implemented in develop version :
# https://github.com/aristanetworks/cvprac/blob/develop/cvprac/cvp_api.py#L1633
# pip install pip install git+https://github.com/aristanetworks/cvprac.git@develop
# Should solve problem
try:
creation_request = self._cvp_server.api.create_change_control(name=self._name, # noqa E501
change_control_tasks=self._list_changes,
timezone=tz,
country_id=country,
date_time=schedule_at,
snapshot_template_key=snap_template,
change_control_type=change_type,
stop_on_error=stop_on_error)
return creation_request
except CvpApiError as err:
logging.error('Cannot create change-control - error message is %s',
format(err))
return None
###############################################################################
[docs]def load_constant(key_name, default='UNSET', verbose=False):
r"""Set up constant value from OS Environment.
Help to define CONSTANT from OS Environment.
If it is not defined, then, fallback to default value
provided within parameters
:Example:
>>> USERNAME = load_constant(key_name='USERNAME_1', default='myUser')
>>> print USERNAME
myUsername
:param key_name: VAR to lookup in os.environment
:type key_name: str
:param default: Default value to use if key_name is not defined.
:type default: str
:param verbose: Boolean to activate verbos mode (Optional,
expected values: debug level)
:type verbose: bool
:returns: Value for variable
:rtype: str
"""
if key_name in os.environ:
if verbose:
logging.debug("%s is set to %s", key_name, os.environ.get(key_name))
return os.environ[key_name]
else:
if verbose:
logging.debug('%s is not set - using default (%s)', key_name, str(default)) # noqa E501
return default
[docs]def config_read(config_file="actions.json"):
r"""Read JSON configuration.
Load information from JSON file defined in ``config_file``
First, method check if file exists or not and then try to load
content using ``json.load()``
If file is not a JSON or if file does not exist, method return None
**Data structure to read**::
[
{
"name": "new CVP Configlet",
"action": "add",
"configlet": "configlet.example/VLANsTEMP",
"devices": [
"leaf1",
"leaf2",
"leaf3"
]
},
{
"name": "new CVP Configlet",
"action": "update",
"configlet": "configlet.examples/VLANs"
}
]
Parameters
----------
config_file : str
Path to the configuration file
Returns
-------
json
Json structure with all actions to execute
"""
try:
os.path.isfile(config_file)
logging.debug('loading config from %s', config_file)
with open(config_file) as file_content:
config = json.load(file_content)
return config
except IOError:
logging.critical('!! File not found: %s', config_file)
return None
[docs]def connect_to_cvp(parameters):
"""Create a CVP connection.
parameters option should at least contain following elements:
- username
- password
- cvp (server IP or DNS hostname)
Parameters
----------
parameters : dict
Object with all information to create connection
Returns
-------
cvprack.CvpClient
cvp client object
"""
client = CvpClient()
try:
client.connect([parameters.cvp], parameters.username,
parameters.password, 10, CVP_PROTO, CVP_PORT)
logging.info('Connected to %s', CVP_HOST)
except CvpLoginError, e:
# If error, then, printout message and quit program
# If server cannot be reached, then no need to go further
logging.error('Can\'t connect to %s', parameters.cvp)
logging.error('Error message is: %s', str(e).replace('\n', ' '))
quit()
return client
[docs]def action_update(configlet_def, parameters):
"""Manage actions to UPDATE and existing configlet.
Create CVP connection and instantiate a CvpConfiglet object
Then call appropriate method to start object update
And finally run tasks
Parameters option should at least contain following elements:
- username
- password
- cvp (server IP or DNS hostname)
Parameters
----------
configlet_def : dict
Data from JSON to describe configlet
parameters : dict
Object with all information to create connection
"""
# Create an HTTP session to the CVP server
client = connect_to_cvp(parameters)
logging.info('*************')
logging.info('Starting working with %s', configlet_def['configlet'])
my_configlet = CvpConfiglet(cvp_server=client,
configlet_file=configlet_def['configlet'])
if my_configlet.on_cvp():
logging.info('Start to deploy new version of %s configlet',
configlet_def['configlet'])
my_configlet.update_configlet()
else:
logging.warning('%s is not configured on CVP server, fallback to ADD method', # noqa E501
my_configlet.name())
action_add(configlet_def=configlet_def, parameters=parameters)
# Now check if configlet has to be applied on devices.
# If not, it means a task should be run manually
# or a Change Control should be defined as a next item.
if 'apply' in configlet_def:
if configlet_def['apply']:
logging.info('configlet is gonna be deployed')
if 'devices' not in configlet_def:
logging.info('deploy target is set to all attached devices')
my_configlet.deploy_bulk()
else:
logging.info('configlet will be deployed on some devices')
cvp_inventory = CvpInventory(client)
for device in configlet_def['devices']:
dev_inventory = cvp_inventory.get_device_dict(device)
if dev_inventory is not None:
logging.info(' > Depoy %s on %s',
my_configlet.name,
device)
my_configlet.deploy(dev_inventory)
else:
logging.warning('deploy option has not been set for the configlet')
logging.warning('--> doing nothing')
else:
logging.warning('deploy option has not been set for the configlet')
logging.warning('--> doing nothing')
[docs]def action_remove_devices(configlet_def, parameters):
"""Manage actions to remove devices from an existing configlet.
Create CVP connection to check if configlet exists. if it exists, then call
CvpConfiglet.remove_device() to remove all devices listed in the task.
If apply option is set to true, then, generated tasks are applied by CVP.
Otherwise, user has to do it manually
Parameters option should at least contain following elements:
- username
- password
- cvp (server IP or DNS hostname)
Parameters
----------
configlet_def : dict
Data from JSON to describe configlet
parameters : dict
Object with all information to create connection
"""
# Create an HTTP session to the CVP server
client = connect_to_cvp(parameters)
logging.info('*************')
logging.info('Starting working with %s', configlet_def['configlet'])
my_configlet = CvpConfiglet(cvp_server=client,
configlet_name=configlet_def['configlet'])
if my_configlet.on_cvp():
logging.info('%s configlet has been found on CVP',
configlet_def['configlet'])
my_configlet.remove_device(devices_hostnames=configlet_def['devices'])
# Check if configlet must be deployed to devices
if 'apply' in configlet_def:
action_apply(configlet_def=configlet_def, my_configlet=my_configlet)
else:
logging.warning('deploy option has not been set for the configlet')
logging.warning('--> doing nothing')
return True
[docs]def action_add_devices(configlet_def, parameters):
"""Manage actions to add devices to an existing configlet.
Create CVP connection to check if configlet exists. if it exists, then call
CvpConfiglet.add_device() to add all devices listed in the json task.
If apply option is set to true, then, generated tasks are applied by CVP.
Otherwise, user has to do it manually
Parameters option should at least contain following elements:
- username
- password
- cvp (server IP or DNS hostname)
Parameters
----------
configlet_def : dict
Data from JSON to describe configlet
parameters : dict
Object with all information to create connection
"""
# Create an HTTP session to the CVP server
client = connect_to_cvp(parameters)
logging.info('*************')
logging.info('Starting working with %s', configlet_def['configlet'])
my_configlet = CvpConfiglet(cvp_server=client,
configlet_name=configlet_def['configlet'])
if my_configlet.on_cvp():
logging.info('%s configlet has been found on CVP',
configlet_def['configlet'])
my_configlet.add_device(device_hostnames=configlet_def['devices'])
# Check if configlet must be deployed to devices
if 'apply' in configlet_def:
action_apply(configlet_def=configlet_def, my_configlet=my_configlet)
else:
logging.warning('deploy option has not been set for the configlet')
logging.warning('--> doing nothing')
return True
[docs]def action_apply(configlet_def, my_configlet):
"""Instruct CvpConfiglet to run tasks.
Method to execute repetitive action instructing CvpConfiglet to execute
pending tasks
Parameters
----------
configlet_def : dict
Dictionnary loaded from JSON file. represent one action and must have
an 'apply' field defined
my_configlet : CvpConfiglet
A CvpConfiglet object already instantiated.
It is used to call deploy_bulk method
"""
if configlet_def['apply']:
logging.info('configlet is gonna be deployed')
logging.info('deploy target is set to all attached devices')
my_configlet.deploy_bulk()
else:
logging.warning('deploy option has not been set for the configlet') # noqa E501
logging.warning('--> doing nothing')
[docs]def action_add(configlet_def, parameters):
"""Manage actions to ADD a configlet.
Create CVP connection and instantiate a CvpConfiglet object
Then call appropriate method to start object creation
If apply option is set to true, then, generated tasks are applied by CVP.
Otherwise, user has to do it manually
Parameters option should at least contain following elements:
- username
- password
- cvp (server IP or DNS hostname)
Parameters
----------
configlet_def : dict
Data from JSON to describe configlet
parameters : dict
Object with all information to create connection
"""
# Create an HTTP session to the CVP server
client = connect_to_cvp(parameters)
logging.info('*************')
logging.info('Start working with %s', configlet_def['configlet'])
my_configlet = CvpConfiglet(cvp_server=client,
configlet_file=configlet_def['configlet'])
logging.info('Start to create new configlet: %s',
configlet_def['configlet'])
if my_configlet.on_cvp():
logging.warning('%s is already configured on CVP server, fallback to UPDATE method', # noqa E501
my_configlet.name())
action_update(configlet_def=configlet_def, parameters=parameters)
else:
# a list of device must be available to create configlet
# if list is missing, then we have to break process
if 'devices' not in configlet_def:
logging.error('Configlet has no devices configured,\
cannot create configlet on server')
return False
# Now create configlet on CVP server
my_configlet.deploy_configlet(device_hostnames=configlet_def['devices'])
# Check if configlet must be deployed to devices
if 'apply' in configlet_def:
action_apply(configlet_def=configlet_def, my_configlet=my_configlet)
else:
logging.warning('deploy option has not been set for the configlet')
logging.warning('--> doing nothing')
return True
[docs]def action_delete(configlet_def, parameters):
"""Manage actions to DELTE a configlet.
Create CVP connection and instantiate a CvpConfiglet object
Then call appropriate method to start object deletion
Parameters option should at least contain following elements:
- username
- password
- cvp (server IP or DNS hostname)
Parameters
----------
configlet_def : dict
Data from JSON to describe configlet
parameters : dict
Object with all information to create connection
"""
client = connect_to_cvp(parameters)
logging.info('*************')
logging.info('Start working with %s', configlet_def['configlet'])
my_configlet = CvpConfiglet(cvp_server=client,
configlet_file=configlet_def['configlet'])
logging.info('Start to delete existing configlet: %s',
configlet_def['configlet'])
my_configlet.delete_configlet()
# FIXME: Apply is not supported for configlet deletion
# Please see issue #16 on github
# https://github.com/titom73/configlet-cvp-uploader/issues/16
[docs]def action_create_change_control(parameters, data):
"""Create a Change-Control.
Create a change-control on CVP based on a JSON definition.
Current version supports following entries in JSON:
- name: change-control name configured on CVP
- type: change-control (Must be set with this vaue to engage CC)
- country: Country required by CVP for CC
- timezone: Timezone required by CVP to run changes
**Expected inputs data**
JSON file::
[
{
"name": "Python_CC",
"type": "change-control",
"country": "France",
"timezone": "Europe/Paris"
}
]
Todo
----
Manage way to retrieve Template ID / As feature is not part of CVPRAC,
``snapid`` shall be part of the job definition.
If not, then we configure it to ``None``
Parameters
----------
configlet_def : dict
Data from JSON to describe configlet
parameters : dict
Object with all information to create connection
"""
client = connect_to_cvp(parameters)
logging.info('start change-control creation')
change_control = CvpChangeControl(cvp_server=client,
name=data['name'].replace(' ', '_'))
if 'schedule_at' not in data:
data['schedule_at'] = (datetime.now() + timedelta(seconds=180)).strftime("%Y-%m-%d %H:%M") # noqa E501
# Check if mandatory values are set.
# Otherwise load default
if 'timezone' not in data:
logging.debug('Timezone not set in json, using default one: %s',
CVP_TZ)
data['timezone'] = CVP_TZ
if 'country' not in data:
logging.debug('Country not set in json, using default one: %s',
CVP_COUNTRY)
data['country'] = CVP_COUNTRY
if 'snapid' not in data:
logging.debug('Snapshot ID not configured')
data['snapid'] = 'None'
if 'apply' in data and data['apply'] is True:
logging.info('Scheduling change control to be executed at %s',
data['schedule_at'])
result = change_control.create(tz=data['timezone'],
country=data['country'],
schedule=True,
schedule_at=data['schedule_at'],
snap_template=data['snapid'],
change_type='Custom', stop_on_error="true") # noqa E501
else:
logging.info('change control must be executed manually')
result = change_control.create(tz=data['timezone'],
country=data['country'],
schedule=True,
snap_template=data['snapid'],
change_type='Custom', stop_on_error="true") # noqa E501
if result is not None:
logging.info('!change-control creation is %s (id %s)', result['data'], result['ccId']) # noqa E501
# Main part of the script
if __name__ == '__main__':
# CVP information
# Load information from your shell environment (export CVP_USER=cvpadmin)
# If not set, then use default values.
# It can be manually override with CLI option using argparse
CVP_HOST = load_constant(key_name='CVP_HOST', default='127.0.0.1')
CVP_PORT = int(load_constant(key_name='CVP_PORT', default=443))
CVP_PROTO = load_constant(key_name='CVP_PROTO', default='https')
CVP_USER = load_constant(key_name='CVP_USER', default='username')
CVP_PASS = load_constant(key_name='CVP_PASS', default='password')
CVP_TZ = load_constant(key_name='CVP_TZ', default='France')
CVP_COUNTRY = load_constant(key_name='CVP_COUNTRY',
default='France')
LOG_LEVEL = load_constant(key_name='LOG_LEVEL', default='info')
JSON_CONFIG = load_constant(key_name='CVP_JSON',
default='actions.json')
CERT_VALIDATION = bool(load_constant(key_name='CERT_VALIDATION',
default=False))
# Argaparser to load information using CLI
PARSER = argparse.ArgumentParser(description="Configlet Uploader to CVP ",
version=__version__)
PARSER.add_argument('-c', '--configlet',
help='Configlet path to use on CVP',
type=str, default=None)
PARSER.add_argument('-u', '--username',
help='Username for CVP', type=str, default=CVP_USER)
PARSER.add_argument('-p', '--password',
help='Password for CVP', type=str, default=CVP_PASS)
PARSER.add_argument('-s', '--cvp',
help='Address of CVP server',
type=str, default=CVP_HOST)
PARSER.add_argument('-d', '--debug_level',
help='Verbose level (debug / info / war\
ning / error / critical)',
type=str, default=LOG_LEVEL)
PARSER.add_argument('-j', '--json',
help='File with list of actions to execute)',
type=str, default=JSON_CONFIG)
# PARSER.add_argument('-j', '--json',
# help='File with list of actions to execute)',
# type=str, default=JSON_CONFIG)
OPTIONS = PARSER.parse_args()
# Logging configuration
LEVELS = {'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL}
LOGLEVEL = LEVELS.get(OPTIONS.debug_level, logging.NOTSET)
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=LOGLEVEL,
datefmt='%Y-%m-%d %H:%M:%S')
# activate SSL enforcement or not
# (should be disabled for self-signed certs)
if CERT_VALIDATION is False:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
logging.debug('CVP Client inputs:')
logging.debug(' > Username is %s', OPTIONS.username)
logging.debug(' > Server is %s', OPTIONS.cvp)
logging.debug(' > Port is %s', CVP_PORT)
logging.debug(' > Proto is %s', CVP_PROTO)
print '\n--------------------\n'
# If OPTIONS.configlet is defined we assume user wants to make short update
# Then we don't care about JSON
# If OPTIONS.configlet is not defined,
# Then, OPTIONS.json MUST be configured
if OPTIONS.configlet is not None:
CONFIGLET = dict()
CONFIGLET['name'] = 'Short path update'
CONFIGLET['configlet'] = OPTIONS.configlet
CONFIGLET['action'] = 'update'
logging.info('task %s is going to update %s',
CONFIGLET['name'],
CONFIGLET['configlet'])
action_update(configlet_def=CONFIGLET, parameters=OPTIONS)
else:
# Parse actions defined in OPTIONS.json
# Based on action field, script will execute different
# set of actions
LIST_ACTIONS = config_read(config_file=OPTIONS.json)
for ACTION in LIST_ACTIONS:
if 'type' not in ACTION:
logging.critical('task %s does not have type defined - skipping', ACTION['name']) # noqa E501
break
if 'action' in ACTION and ACTION['type'] == 'configlet':
if ACTION['action'] == 'add':
logging.info('configlet %s is going to be created %s',
ACTION['name'],
ACTION['configlet'])
action_add(configlet_def=ACTION, parameters=OPTIONS)
elif ACTION['action'] == 'delete':
logging.info('configlet %s is going to be deleted %s',
ACTION['name'],
ACTION['configlet'])
action_delete(configlet_def=ACTION, parameters=OPTIONS)
elif ACTION['action'] == 'update':
logging.info('configlet %s is going to be updated %s',
ACTION['name'],
ACTION['configlet'])
action_update(configlet_def=ACTION, parameters=OPTIONS)
elif ACTION['action'] == 'remove-devices':
logging.info('configlet %s is going to be removed from devices',
ACTION['name'])
action_remove_devices(configlet_def=ACTION, parameters=OPTIONS)
elif ACTION['action'] == 'add-devices':
logging.info('configlet %s is going to be added to devices',
ACTION['name'])
action_add_devices(configlet_def=ACTION, parameters=OPTIONS)
else:
logging.error('Unsupported action.\
Please use add / delete / update only')
print '\n--------------------\n'
elif ACTION['type'] == 'change-control':
logging.info('Implementation in progress -- expect some issues')
action_create_change_control(OPTIONS, ACTION)
else:
logging.warning('task is not supported -- skipping')
logging.info('Wait 10 sec before next action')
time.sleep(10)