Files
cinder/cinder/volume/drivers/datera/datera_api2.py
Sean McGinnis 3eb9b422f4 Introduce flake8-import-order extension
This adds usage of the flake8-import-order extension to our flake8
checks to enforce consistency on our import ordering to follow the
overall OpenStack code guidelines.

Since we have now dropped Python 2, this also cleans up a few cases for
things that were third party libs but became part of the standard
library such as mock, which is now a standard part of unittest.

Some questions, in order of importance:

Q: Are you insane?
A: Potentially.

Q: Why should we touch all of these files?
A: This adds consistency to our imports. The extension makes sure that
   all imports follow our published guidelines of having imports ordered
   by standard lib, third party, and local. This will be a one time
   churn, then we can ensure consistency over time.

Q: Why bother. this doesn't really matter?
A: I agree - but...

We have the issue that we have less people actively involved and less
time to perform thorough code reviews. This will make it objective and
automated to catch these kinds of issues.

But part of this, even though it maybe seems a little annoying, is for
making it easier for contributors. Right now, we may or may not notice
if something is following the guidelines or not. And we may or may not
comment in a review to ask for a contributor to make adjustments to
follow the guidelines.

But then further along into the review process, someone decides to be
thorough, and after the contributor feels like they've had to deal with
other change requests and things are in really good shape, they get a -1
on something mostly meaningless as far as the functionality of their
code. It can be a frustrating and disheartening thing.

I believe this actually helps avoid that by making it an objective thing
that they find out right away up front - either the code is following
the guidelines and everything is happy, or it's not and running local
jobs or the pep8 CI job will let them know right away and they can fix
it. No guessing on whether or not someone is going to take a stand on
following the guidelines or not.

This will also make it easier on the code reviewers. The more we can
automate, the more time we can spend in code reviews making sure the
logic of the change is correct and less time looking at trivial coding
and style things.

Q: Should we use our hacking extensions for this?
A: Hacking has had to keep back linter requirements for a long time now.
   Current versions of the linters actually don't work with the way
   we've been hooking into them for our hacking checks. We will likely
   need to do away with those at some point so we can move on to the
   current linter releases. This will help ensure we have something in
   place when that time comes to make sure some checks are automated.

Q: Didn't you spend more time on this than the benefit we'll get from
   it?
A: Yeah, probably.

Change-Id: Ic13ba238a4a45c6219f4de131cfe0366219d722f
Signed-off-by: Sean McGinnis <sean.mcginnis@gmail.com>
2020-01-06 09:59:35 -06:00

764 lines
30 KiB
Python

# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import re
import uuid
import eventlet
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import units
import six
from cinder import exception
from cinder.i18n import _
import cinder.volume.drivers.datera.datera_common as datc
from cinder.volume import volume_utils
LOG = logging.getLogger(__name__)
class DateraApi(object):
# =================
# = Create Volume =
# =================
def _create_volume_2(self, volume):
# Generate App Instance, Storage Instance and Volume
# Volume ID will be used as the App Instance Name
# Storage Instance and Volumes will have standard names
policies = self._get_policies_for_resource(volume)
num_replicas = int(policies['replica_count'])
storage_name = policies['default_storage_name']
volume_name = policies['default_volume_name']
template = policies['template']
if template:
app_params = (
{
'create_mode': "openstack",
# 'uuid': str(volume['id']),
'name': datc._get_name(volume['id']),
'app_template': '/app_templates/{}'.format(template)
})
else:
app_params = (
{
'create_mode': "openstack",
'uuid': str(volume['id']),
'name': datc._get_name(volume['id']),
'access_control_mode': 'deny_all',
'storage_instances': {
storage_name: {
'name': storage_name,
'volumes': {
volume_name: {
'name': volume_name,
'size': volume['size'],
'replica_count': num_replicas,
'snapshot_policies': {
}
}
}
}
}
})
self._issue_api_request(
datc.URL_TEMPLATES['ai'](),
'post',
body=app_params,
api_version='2')
self._update_qos(volume, policies)
# =================
# = Extend Volume =
# =================
def _extend_volume_2(self, volume, new_size):
# Current product limitation:
# If app_instance is bound to template resizing is not possible
# Once policies are implemented in the product this can go away
policies = self._get_policies_for_resource(volume)
template = policies['template']
if template:
LOG.warning("Volume size not extended due to template binding:"
" volume: %(volume)s, template: %(template)s",
volume=volume, template=template)
return
# Offline App Instance, if necessary
reonline = False
app_inst = self._issue_api_request(
datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id'])),
api_version='2')
if app_inst['admin_state'] == 'online':
reonline = True
self._detach_volume_2(None, volume)
# Change Volume Size
app_inst = datc._get_name(volume['id'])
data = {
'size': new_size
}
store_name, vol_name = self._scrape_template(policies)
self._issue_api_request(
datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name).format(app_inst),
method='put',
body=data,
api_version='2')
# Online Volume, if it was online before
if reonline:
self._create_export_2(None, volume, None)
# =================
# = Cloned Volume =
# =================
def _create_cloned_volume_2(self, volume, src_vref):
policies = self._get_policies_for_resource(volume)
store_name, vol_name = self._scrape_template(policies)
src = "/" + datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name).format(datc._get_name(src_vref['id']))
data = {
'create_mode': 'openstack',
'name': datc._get_name(volume['id']),
'uuid': str(volume['id']),
'clone_src': src,
}
self._issue_api_request(
datc.URL_TEMPLATES['ai'](), 'post', body=data, api_version='2')
if volume['size'] > src_vref['size']:
self._extend_volume_2(volume, volume['size'])
# =================
# = Delete Volume =
# =================
def _delete_volume_2(self, volume):
self.detach_volume(None, volume)
app_inst = datc._get_name(volume['id'])
try:
self._issue_api_request(datc.URL_TEMPLATES['ai_inst']().format(
app_inst),
method='delete',
api_version='2')
except exception.NotFound:
LOG.info("Tried to delete volume %s, but it was not found in the "
"Datera cluster. Continuing with delete.",
datc._get_name(volume['id']))
# =================
# = Ensure Export =
# =================
def _ensure_export_2(self, context, volume, connector):
return self._create_export_2(context, volume, connector)
# =========================
# = Initialize Connection =
# =========================
def _initialize_connection_2(self, volume, connector):
# Now online the app_instance (which will online all storage_instances)
multipath = connector.get('multipath', False)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'online'
}
app_inst = self._issue_api_request(
url, method='put', body=data, api_version='2')
storage_instances = app_inst["storage_instances"]
si_names = list(storage_instances.keys())
portal = storage_instances[si_names[0]]['access']['ips'][0] + ':3260'
iqn = storage_instances[si_names[0]]['access']['iqn']
if multipath:
portals = [p + ':3260' for p in
storage_instances[si_names[0]]['access']['ips']]
iqns = [iqn for _ in
storage_instances[si_names[0]]['access']['ips']]
lunids = [self._get_lunid() for _ in
storage_instances[si_names[0]]['access']['ips']]
return {
'driver_volume_type': 'iscsi',
'data': {
'target_discovered': False,
'target_iqn': iqn,
'target_iqns': iqns,
'target_portal': portal,
'target_portals': portals,
'target_lun': self._get_lunid(),
'target_luns': lunids,
'volume_id': volume['id'],
'discard': False}}
else:
return {
'driver_volume_type': 'iscsi',
'data': {
'target_discovered': False,
'target_iqn': iqn,
'target_portal': portal,
'target_lun': self._get_lunid(),
'volume_id': volume['id'],
'discard': False}}
# =================
# = Create Export =
# =================
def _create_export_2(self, context, volume, connector):
# Online volume in case it hasn't been already
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'online'
}
self._issue_api_request(url, method='put', body=data, api_version='2')
# Check if we've already setup everything for this volume
url = (datc.URL_TEMPLATES['si']().format(datc._get_name(volume['id'])))
storage_instances = self._issue_api_request(url, api_version='2')
# Handle adding initiator to product if necessary
# Then add initiator to ACL
policies = self._get_policies_for_resource(volume)
store_name, _ = self._scrape_template(policies)
if (connector and
connector.get('initiator') and
not policies['acl_allow_all']):
initiator_name = "OpenStack_{}_{}".format(
self.driver_prefix, str(uuid.uuid4())[:4])
initiator_group = datc.INITIATOR_GROUP_PREFIX + volume['id']
found = False
initiator = connector['initiator']
current_initiators = self._issue_api_request(
'initiators', api_version='2')
for iqn, values in current_initiators.items():
if initiator == iqn:
found = True
break
# If we didn't find a matching initiator, create one
if not found:
data = {'id': initiator, 'name': initiator_name}
# Try and create the initiator
# If we get a conflict, ignore it because race conditions
self._issue_api_request("initiators",
method="post",
body=data,
conflict_ok=True,
api_version='2')
# Create initiator group with initiator in it
initiator_path = "/initiators/{}".format(initiator)
initiator_group_path = "/initiator_groups/{}".format(
initiator_group)
ig_data = {'name': initiator_group, 'members': [initiator_path]}
self._issue_api_request("initiator_groups",
method="post",
body=ig_data,
conflict_ok=True,
api_version='2')
# Create ACL with initiator group as reference for each
# storage_instance in app_instance
# TODO(_alastor_): We need to avoid changing the ACLs if the
# template already specifies an ACL policy.
for si_name in storage_instances.keys():
acl_url = (datc.URL_TEMPLATES['si']() +
"/{}/acl_policy").format(
datc._get_name(volume['id']), si_name)
existing_acl = self._issue_api_request(acl_url,
method="get",
api_version='2')
data = {}
data['initiators'] = existing_acl['initiators']
data['initiator_groups'] = existing_acl['initiator_groups']
data['initiator_groups'].append(initiator_group_path)
self._issue_api_request(acl_url,
method="put",
body=data,
api_version='2')
if connector and connector.get('ip'):
try:
# Case where volume_type has non default IP Pool info
if policies['ip_pool'] != 'default':
initiator_ip_pool_path = self._issue_api_request(
"access_network_ip_pools/{}".format(
policies['ip_pool']), api_version='2')['path']
# Fallback to trying reasonable IP based guess
else:
initiator_ip_pool_path = self._get_ip_pool_for_string_ip(
connector['ip'])
ip_pool_url = datc.URL_TEMPLATES['si_inst'](
store_name).format(datc._get_name(volume['id']))
ip_pool_data = {'ip_pool': initiator_ip_pool_path}
self._issue_api_request(ip_pool_url,
method="put",
body=ip_pool_data,
api_version='2')
except exception.DateraAPIException:
# Datera product 1.0 support
pass
# Check to ensure we're ready for go-time
self._si_poll(volume, policies)
# =================
# = Detach Volume =
# =================
def _detach_volume_2(self, context, volume, attachment=None):
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'offline',
'force': True
}
try:
self._issue_api_request(url, method='put', body=data,
api_version='2')
except exception.NotFound:
msg = ("Tried to detach volume %s, but it was not found in the "
"Datera cluster. Continuing with detach.")
LOG.info(msg, volume['id'])
# TODO(_alastor_): Make acl cleaning multi-attach aware
self._clean_acl_2(volume)
def _check_for_acl_2(self, initiator_path):
"""Returns True if an acl is found for initiator_path """
# TODO(_alastor_) when we get a /initiators/:initiator/acl_policies
# endpoint use that instead of this monstrosity
initiator_groups = self._issue_api_request("initiator_groups",
api_version='2')
for ig, igdata in initiator_groups.items():
if initiator_path in igdata['members']:
LOG.debug("Found initiator_group: %s for initiator: %s",
ig, initiator_path)
return True
LOG.debug("No initiator_group found for initiator: %s", initiator_path)
return False
def _clean_acl_2(self, volume):
policies = self._get_policies_for_resource(volume)
store_name, _ = self._scrape_template(policies)
acl_url = (datc.URL_TEMPLATES["si_inst"](
store_name) + "/acl_policy").format(datc._get_name(volume['id']))
try:
initiator_group = self._issue_api_request(
acl_url, api_version='2')['initiator_groups'][0]
initiator_iqn_path = self._issue_api_request(
initiator_group.lstrip("/"))["members"][0]
# Clear out ACL and delete initiator group
self._issue_api_request(acl_url,
method="put",
body={'initiator_groups': []},
api_version='2')
self._issue_api_request(initiator_group.lstrip("/"),
method="delete",
api_version='2')
if not self._check_for_acl_2(initiator_iqn_path):
self._issue_api_request(initiator_iqn_path.lstrip("/"),
method="delete",
api_version='2')
except (IndexError, exception.NotFound):
LOG.debug("Did not find any initiator groups for volume: %s",
volume)
# ===================
# = Create Snapshot =
# ===================
def _create_snapshot_2(self, snapshot):
policies = self._get_policies_for_resource(snapshot)
store_name, vol_name = self._scrape_template(policies)
url_template = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
url = url_template.format(datc._get_name(snapshot['volume_id']))
snap_params = {
'uuid': snapshot['id'],
}
snap = self._issue_api_request(url, method='post', body=snap_params,
api_version='2')
snapu = "/".join((url, snap['timestamp']))
self._snap_poll(snapu)
# ===================
# = Delete Snapshot =
# ===================
def _delete_snapshot_2(self, snapshot):
policies = self._get_policies_for_resource(snapshot)
store_name, vol_name = self._scrape_template(policies)
snap_temp = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
snapu = snap_temp.format(datc._get_name(snapshot['volume_id']))
snapshots = self._issue_api_request(snapu, method='get',
api_version='2')
try:
for ts, snap in snapshots.items():
if snap['uuid'] == snapshot['id']:
url_template = snapu + '/{}'
url = url_template.format(ts)
self._issue_api_request(url, method='delete',
api_version='2')
break
else:
raise exception.NotFound
except exception.NotFound:
msg = ("Tried to delete snapshot %s, but was not found in "
"Datera cluster. Continuing with delete.")
LOG.info(msg, datc._get_name(snapshot['id']))
# ========================
# = Volume From Snapshot =
# ========================
def _create_volume_from_snapshot_2(self, volume, snapshot):
policies = self._get_policies_for_resource(snapshot)
store_name, vol_name = self._scrape_template(policies)
snap_temp = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
snapu = snap_temp.format(datc._get_name(snapshot['volume_id']))
snapshots = self._issue_api_request(snapu, method='get',
api_version='2')
for ts, snap in snapshots.items():
if snap['uuid'] == snapshot['id']:
found_ts = ts
break
else:
raise exception.NotFound
snap_url = (snap_temp + '/{}').format(
datc._get_name(snapshot['volume_id']), found_ts)
self._snap_poll(snap_url)
src = "/" + snap_url
app_params = (
{
'create_mode': 'openstack',
'uuid': str(volume['id']),
'name': datc._get_name(volume['id']),
'clone_src': src,
})
self._issue_api_request(
datc.URL_TEMPLATES['ai'](),
method='post',
body=app_params,
api_version='2')
if (volume['size'] > snapshot['volume_size']):
self._extend_volume_2(volume, volume['size'])
# ==========
# = Manage =
# ==========
def _manage_existing_2(self, volume, existing_ref):
existing_ref = existing_ref['source-name']
if existing_ref.count(":") != 2:
raise exception.ManageExistingInvalidReference(
_("existing_ref argument must be of this format:"
"app_inst_name:storage_inst_name:vol_name"))
app_inst_name = existing_ref.split(":")[0]
LOG.debug("Managing existing Datera volume %s. "
"Changing name to %s",
datc._get_name(volume['id']),
existing_ref)
data = {'name': datc._get_name(volume['id'])}
self._issue_api_request(datc.URL_TEMPLATES['ai_inst']().format(
app_inst_name), method='put', body=data, api_version='2')
# ===================
# = Manage Get Size =
# ===================
def _manage_existing_get_size_2(self, volume, existing_ref):
existing_ref = existing_ref['source-name']
if existing_ref.count(":") != 2:
raise exception.ManageExistingInvalidReference(
_("existing_ref argument must be of this format:"
"app_inst_name:storage_inst_name:vol_name"))
app_inst_name, si_name, vol_name = existing_ref.split(":")
app_inst = self._issue_api_request(
datc.URL_TEMPLATES['ai_inst']().format(app_inst_name),
api_version='2')
return self._get_size_2(volume, app_inst, si_name, vol_name)
def _get_size_2(self, volume, app_inst=None, si_name=None, vol_name=None):
"""Helper method for getting the size of a backend object
If app_inst is provided, we'll just parse the dict to get
the size instead of making a separate http request
"""
policies = self._get_policies_for_resource(volume)
si_name = si_name if si_name else policies['default_storage_name']
vol_name = vol_name if vol_name else policies['default_volume_name']
if not app_inst:
vol_url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
app_inst = self._issue_api_request(vol_url)
size = app_inst[
'storage_instances'][si_name]['volumes'][vol_name]['size']
return size
# =========================
# = Get Manageable Volume =
# =========================
def _get_manageable_volumes_2(self, cinder_volumes, marker, limit, offset,
sort_keys, sort_dirs):
LOG.debug("Listing manageable Datera volumes")
app_instances = self._issue_api_request(
datc.URL_TEMPLATES['ai'](), api_version='2').values()
results = []
cinder_volume_ids = [vol['id'] for vol in cinder_volumes]
for ai in app_instances:
ai_name = ai['name']
reference = None
size = None
safe_to_manage = False
reason_not_safe = None
cinder_id = None
extra_info = None
if re.match(datc.UUID4_RE, ai_name):
cinder_id = ai_name.lstrip(datc.OS_PREFIX)
if (not cinder_id and
ai_name.lstrip(datc.OS_PREFIX) not in cinder_volume_ids):
safe_to_manage = self._is_manageable(ai)
if safe_to_manage:
si = list(ai['storage_instances'].values())[0]
si_name = si['name']
vol = list(si['volumes'].values())[0]
vol_name = vol['name']
size = vol['size']
reference = {"source-name": "{}:{}:{}".format(
ai_name, si_name, vol_name)}
results.append({
'reference': reference,
'size': size,
'safe_to_manage': safe_to_manage,
'reason_not_safe': reason_not_safe,
'cinder_id': cinder_id,
'extra_info': extra_info})
page_results = volume_utils.paginate_entries_list(
results, marker, limit, offset, sort_keys, sort_dirs)
return page_results
# ============
# = Unmanage =
# ============
def _unmanage_2(self, volume):
LOG.debug("Unmanaging Cinder volume %s. Changing name to %s",
volume['id'], datc._get_unmanaged(volume['id']))
data = {'name': datc._get_unmanaged(volume['id'])}
self._issue_api_request(datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id'])),
method='put',
body=data,
api_version='2')
# ================
# = Volume Stats =
# ================
def _get_volume_stats_2(self, refresh=False):
if refresh or not self.cluster_stats:
try:
LOG.debug("Updating cluster stats info.")
results = self._issue_api_request('system', api_version='2')
if 'uuid' not in results:
LOG.error(
'Failed to get updated stats from Datera Cluster.')
backend_name = self.configuration.safe_get(
'volume_backend_name')
stats = {
'volume_backend_name': backend_name or 'Datera',
'vendor_name': 'Datera',
'driver_version': self.VERSION,
'storage_protocol': 'iSCSI',
'total_capacity_gb': (
int(results['total_capacity']) / units.Gi),
'free_capacity_gb': (
int(results['available_capacity']) / units.Gi),
'reserved_percentage': 0,
}
self.cluster_stats = stats
except exception.DateraAPIException:
LOG.error('Failed to get updated stats from Datera cluster.')
return self.cluster_stats
def _is_manageable(self, app_inst):
if len(app_inst['storage_instances']) == 1:
si = list(app_inst['storage_instances'].values())[0]
if len(si['volumes']) == 1:
return True
return False
# =========
# = Login =
# =========
def _login_2(self):
"""Use the san_login and san_password to set token."""
body = {
'name': self.username,
'password': self.password
}
# Unset token now, otherwise potential expired token will be sent
# along to be used for authorization when trying to login.
self.datera_api_token = None
try:
LOG.debug('Getting Datera auth token.')
results = self._issue_api_request('login', 'put', body=body,
sensitive=True, api_version='2')
self.datera_api_token = results['key']
except exception.NotAuthorized:
with excutils.save_and_reraise_exception():
LOG.error('Logging into the Datera cluster failed. Please '
'check your username and password set in the '
'cinder.conf and start the cinder-volume '
'service again.')
# ===========
# = Polling =
# ===========
def _snap_poll(self, url):
eventlet.sleep(datc.DEFAULT_SNAP_SLEEP)
TIMEOUT = 10
retry = 0
poll = True
while poll and retry < TIMEOUT:
retry += 1
snap = self._issue_api_request(url, api_version='2')
if snap['op_state'] == 'available':
poll = False
else:
eventlet.sleep(1)
if retry >= TIMEOUT:
raise exception.VolumeDriverException(
message=_('Snapshot not ready.'))
def _si_poll(self, volume, policies):
# Initial 4 second sleep required for some Datera versions
eventlet.sleep(datc.DEFAULT_SI_SLEEP_API_2)
TIMEOUT = 10
retry = 0
check_url = datc.URL_TEMPLATES['si_inst'](
policies['default_storage_name']).format(
datc._get_name(volume['id']))
poll = True
while poll and retry < TIMEOUT:
retry += 1
si = self._issue_api_request(check_url, api_version='2')
if si['op_state'] == 'available':
poll = False
else:
eventlet.sleep(1)
if retry >= TIMEOUT:
raise exception.VolumeDriverException(
message=_('Resource not ready.'))
# ============
# = IP Pools =
# ============
def _get_ip_pool_for_string_ip(self, ip):
"""Takes a string ipaddress and return the ip_pool API object dict """
pool = 'default'
ip_obj = ipaddress.ip_address(six.text_type(ip))
ip_pools = self._issue_api_request('access_network_ip_pools',
api_version='2')
for ip_pool, ipdata in ip_pools.items():
for access, adata in ipdata['network_paths'].items():
if not adata.get('start_ip'):
continue
pool_if = ipaddress.ip_interface(
"/".join((adata['start_ip'], str(adata['netmask']))))
if ip_obj in pool_if.network:
pool = ip_pool
return self._issue_api_request(
"access_network_ip_pools/{}".format(pool), api_version='2')['path']
# =============
# = Templates =
# =============
def _scrape_template(self, policies):
sname = policies['default_storage_name']
vname = policies['default_volume_name']
template = policies['template']
if template:
result = self._issue_api_request(
datc.URL_TEMPLATES['at']().format(template), api_version='2')
sname, st = list(result['storage_templates'].items())[0]
vname = list(st['volume_templates'].keys())[0]
return sname, vname
# =======
# = QoS =
# =======
def _update_qos(self, resource, policies):
url = datc.URL_TEMPLATES['vol_inst'](
policies['default_storage_name'],
policies['default_volume_name']) + '/performance_policy'
url = url.format(datc._get_name(resource['id']))
type_id = resource.get('volume_type_id', None)
if type_id is not None:
# Filter for just QOS policies in result. All of their keys
# should end with "max"
fpolicies = {k: int(v) for k, v in
policies.items() if k.endswith("max")}
# Filter all 0 values from being passed
fpolicies = dict(filter(lambda _v: _v[1] > 0, fpolicies.items()))
if fpolicies:
self._issue_api_request(url, 'post', body=fpolicies,
api_version='2')