 e8affa7db5
			
		
	
	e8affa7db5
	
	
	
		
			
			When the proxy passes the container-update headers to the object server now include the db_state, which it already had in hand. This will be written to async_pending and allow the object-updater to know more about a container rather then just relying on container_path attribute. This patch also cleans up the PUT, POST and DELETE _get_update_target paths refactoring the call into _backend_requests, only used by these methods, so it only happens once. Change-Id: Ie665e5c656c7fb27b45ee7427fe4b07ad466e3e2
		
			
				
	
	
		
			834 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			834 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2010-2012 OpenStack Foundation
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | |
| # implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| import errno
 | |
| import gc
 | |
| import json
 | |
| import mock
 | |
| import os
 | |
| from subprocess import Popen, PIPE
 | |
| import sys
 | |
| from tempfile import mkdtemp
 | |
| from textwrap import dedent
 | |
| from time import sleep, time
 | |
| from collections import defaultdict
 | |
| import unittest
 | |
| from uuid import uuid4
 | |
| import shutil
 | |
| import six
 | |
| from six.moves.http_client import HTTPConnection
 | |
| from six.moves.urllib.parse import urlparse
 | |
| 
 | |
| from swiftclient import get_auth, head_account, client
 | |
| from swift.common import internal_client, direct_client, utils
 | |
| from swift.common.direct_client import DirectClientException
 | |
| from swift.common.ring import Ring
 | |
| from swift.common.utils import hash_path, md5, \
 | |
|     readconf, renamer, rsync_module_interpolation
 | |
| from swift.common.manager import Manager
 | |
| from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
 | |
| from swift.obj.diskfile import get_data_dir
 | |
| from test.debug_logger import capture_logger
 | |
| 
 | |
| from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC, PROXY_BASE_URL
 | |
| 
 | |
| 
 | |
| ENABLED_POLICIES = [p for p in POLICIES if not p.is_deprecated]
 | |
| POLICIES_BY_TYPE = defaultdict(list)
 | |
| for p in POLICIES:
 | |
|     POLICIES_BY_TYPE[p.policy_type].append(p)
 | |
| 
 | |
| 
 | |
| def get_server_number(ipport, ipport2server):
 | |
|     server_number = ipport2server[ipport]
 | |
|     server, number = server_number[:-1], server_number[-1:]
 | |
|     try:
 | |
|         number = int(number)
 | |
|     except ValueError:
 | |
|         # probably the proxy
 | |
|         return server_number, None
 | |
|     return server, number
 | |
| 
 | |
| 
 | |
| def start_server(ipport, ipport2server):
 | |
|     server, number = get_server_number(ipport, ipport2server)
 | |
|     err = Manager([server]).start(number=number, wait=True)
 | |
|     if err:
 | |
|         raise Exception('unable to start %s' % (
 | |
|             server if not number else '%s%s' % (server, number)))
 | |
|     return check_server(ipport, ipport2server)
 | |
| 
 | |
| 
 | |
| def _check_storage(ipport, path):
 | |
|     conn = HTTPConnection(*ipport)
 | |
|     conn.request('GET', path)
 | |
|     resp = conn.getresponse()
 | |
|     # 404 because it's a nonsense path (and mount_check is false)
 | |
|     # 507 in case the test target is a VM using mount_check
 | |
|     if resp.status not in (404, 507):
 | |
|         raise Exception(
 | |
|             'Unexpected status %s' % resp.status)
 | |
|     return resp
 | |
| 
 | |
| 
 | |
| def _check_proxy(user, key):
 | |
|     url, token = get_auth(PROXY_BASE_URL + '/auth/v1.0',
 | |
|                           user, key)
 | |
|     account = url.split('/')[-1]
 | |
|     head_account(url, token)
 | |
|     return url, token, account
 | |
| 
 | |
| 
 | |
| def _retry_timeout(f, args=None, kwargs=None, timeout=CHECK_SERVER_TIMEOUT):
 | |
|     args = args or ()
 | |
|     kwargs = kwargs or {}
 | |
|     try_until = time() + timeout
 | |
|     while True:
 | |
|         try:
 | |
|             return f(*args, **kwargs)
 | |
|         except Exception as err:
 | |
|             if time() > try_until:
 | |
|                 print(err)
 | |
|                 fsignature = '%s(*%r, **%r)' % (f.__name__, args, kwargs)
 | |
|                 print('Giving up on %s after %s seconds.' % (
 | |
|                     fsignature, timeout))
 | |
|                 raise err
 | |
|             sleep(0.1)
 | |
| 
 | |
| 
 | |
| def check_server(ipport, ipport2server):
 | |
|     server = ipport2server[ipport]
 | |
|     if server[:-1] in ('account', 'container', 'object'):
 | |
|         if int(server[-1]) > 4:
 | |
|             return None
 | |
|         path = '/connect/1/2'
 | |
|         if server[:-1] == 'container':
 | |
|             path += '/3'
 | |
|         elif server[:-1] == 'object':
 | |
|             path += '/3/4'
 | |
|         rv = _retry_timeout(_check_storage, args=(ipport, path))
 | |
|     else:
 | |
|         rv = _retry_timeout(_check_proxy, args=(
 | |
|             'test:tester', 'testing'))
 | |
|     return rv
 | |
| 
 | |
| 
 | |
| def kill_server(ipport, ipport2server):
 | |
|     server, number = get_server_number(ipport, ipport2server)
 | |
|     err = Manager([server]).kill(number=number)
 | |
|     if err:
 | |
|         raise Exception('unable to kill %s' % (server if not number else
 | |
|                                                '%s%s' % (server, number)))
 | |
|     return wait_for_server_to_hangup(ipport)
 | |
| 
 | |
| 
 | |
| def wait_for_server_to_hangup(ipport):
 | |
|     try_until = time() + 30
 | |
|     while True:
 | |
|         try:
 | |
|             conn = HTTPConnection(*ipport)
 | |
|             conn.request('GET', '/')
 | |
|             conn.getresponse()
 | |
|         except Exception:
 | |
|             break
 | |
|         if time() > try_until:
 | |
|             raise Exception(
 | |
|                 'Still answering on %s:%s after 30 seconds' % ipport)
 | |
|         sleep(0.1)
 | |
| 
 | |
| 
 | |
| def kill_nonprimary_server(primary_nodes, ipport2server):
 | |
|     primary_ipports = [(n['ip'], n['port']) for n in primary_nodes]
 | |
|     for ipport, server in ipport2server.items():
 | |
|         if ipport in primary_ipports:
 | |
|             server_type = server[:-1]
 | |
|             break
 | |
|     else:
 | |
|         raise Exception('Cannot figure out server type for %r' % primary_nodes)
 | |
|     for ipport, server in list(ipport2server.items()):
 | |
|         if server[:-1] == server_type and ipport not in primary_ipports:
 | |
|             kill_server(ipport, ipport2server)
 | |
|             return ipport
 | |
| 
 | |
| 
 | |
| def add_ring_devs_to_ipport2server(ring, server_type, ipport2server,
 | |
|                                    servers_per_port=0):
 | |
|     # We'll number the servers by order of unique occurrence of:
 | |
|     #   IP, if servers_per_port > 0 OR there > 1 IP in ring
 | |
|     #   ipport, otherwise
 | |
|     unique_ip_count = len({dev['ip'] for dev in ring.devs if dev})
 | |
|     things_to_number = {}
 | |
|     number = 0
 | |
|     for dev in filter(None, ring.devs):
 | |
|         ip = dev['ip']
 | |
|         ipport = (ip, dev['port'])
 | |
|         unique_by = ip if servers_per_port or unique_ip_count > 1 else ipport
 | |
|         if unique_by not in things_to_number:
 | |
|             number += 1
 | |
|             things_to_number[unique_by] = number
 | |
|         ipport2server[ipport] = '%s%d' % (server_type,
 | |
|                                           things_to_number[unique_by])
 | |
| 
 | |
| 
 | |
| def store_config_paths(name, configs):
 | |
|     server_names = [name, '%s-replicator' % name]
 | |
|     if name == 'container':
 | |
|         server_names.append('container-sharder')
 | |
|     elif name == 'object':
 | |
|         server_names.append('object-reconstructor')
 | |
|     for server_name in server_names:
 | |
|         for server in Manager([server_name]):
 | |
|             for i, conf in enumerate(server.conf_files(), 1):
 | |
|                 configs[server.server][i] = conf
 | |
| 
 | |
| 
 | |
| def get_ring(ring_name, required_replicas, required_devices,
 | |
|              server=None, force_validate=None, ipport2server=None,
 | |
|              config_paths=None):
 | |
|     if not server:
 | |
|         server = ring_name
 | |
|     ring = Ring('/etc/swift', ring_name=ring_name)
 | |
|     if ipport2server is None:
 | |
|         ipport2server = {}  # used internally, even if not passed in
 | |
|     if config_paths is None:
 | |
|         config_paths = defaultdict(dict)
 | |
|     store_config_paths(server, config_paths)
 | |
| 
 | |
|     repl_name = '%s-replicator' % server
 | |
|     repl_configs = {i: readconf(c, section_name=repl_name)
 | |
|                     for i, c in config_paths[repl_name].items()}
 | |
|     servers_per_port = any(int(c.get('servers_per_port', '0'))
 | |
|                            for c in repl_configs.values())
 | |
| 
 | |
|     add_ring_devs_to_ipport2server(ring, server, ipport2server,
 | |
|                                    servers_per_port=servers_per_port)
 | |
|     if not VALIDATE_RSYNC and not force_validate:
 | |
|         return ring
 | |
|     # easy sanity checks
 | |
|     if ring.replica_count != required_replicas:
 | |
|         raise unittest.SkipTest('%s has %s replicas instead of %s' % (
 | |
|             ring.serialized_path, ring.replica_count, required_replicas))
 | |
| 
 | |
|     devs = [dev for dev in ring.devs if dev is not None]
 | |
|     if len(devs) != required_devices:
 | |
|         raise unittest.SkipTest('%s has %s devices instead of %s' % (
 | |
|             ring.serialized_path, len(devs), required_devices))
 | |
|     for dev in devs:
 | |
|         # verify server is exposing mounted device
 | |
|         ipport = (dev['ip'], dev['port'])
 | |
|         _, server_number = get_server_number(ipport, ipport2server)
 | |
|         conf = repl_configs[server_number]
 | |
|         for device in os.listdir(conf['devices']):
 | |
|             if device == dev['device']:
 | |
|                 dev_path = os.path.join(conf['devices'], device)
 | |
|                 full_path = os.path.realpath(dev_path)
 | |
|                 if not os.path.exists(full_path):
 | |
|                     raise unittest.SkipTest(
 | |
|                         'device %s in %s was not found (%s)' %
 | |
|                         (device, conf['devices'], full_path))
 | |
|                 break
 | |
|         else:
 | |
|             raise unittest.SkipTest(
 | |
|                 "unable to find ring device %s under %s's devices (%s)" % (
 | |
|                     dev['device'], server, conf['devices']))
 | |
|         # verify server is exposing rsync device
 | |
|         rsync_export = conf.get('rsync_module', '').rstrip('/')
 | |
|         if not rsync_export:
 | |
|             rsync_export = '{replication_ip}::%s' % server
 | |
|         cmd = "rsync %s" % rsync_module_interpolation(rsync_export, dev)
 | |
|         p = Popen(cmd, shell=True, stdout=PIPE)
 | |
|         stdout, _stderr = p.communicate()
 | |
|         if p.returncode:
 | |
|             raise unittest.SkipTest('unable to connect to rsync '
 | |
|                                     'export %s (%s)' % (rsync_export, cmd))
 | |
|         for line in stdout.decode().splitlines():
 | |
|             if line.rsplit(None, 1)[-1] == dev['device']:
 | |
|                 break
 | |
|         else:
 | |
|             raise unittest.SkipTest("unable to find ring device %s under "
 | |
|                                     "rsync's exported devices for %s (%s)" %
 | |
|                                     (dev['device'], rsync_export, cmd))
 | |
|     return ring
 | |
| 
 | |
| 
 | |
| def get_policy(**kwargs):
 | |
|     kwargs.setdefault('is_deprecated', False)
 | |
|     # go through the policies and make sure they match the
 | |
|     # requirements of kwargs
 | |
|     for policy in POLICIES:
 | |
|         # TODO: for EC, pop policy type here and check it first
 | |
|         matches = True
 | |
|         for key, value in kwargs.items():
 | |
|             try:
 | |
|                 if getattr(policy, key) != value:
 | |
|                     matches = False
 | |
|             except AttributeError:
 | |
|                 matches = False
 | |
|         if matches:
 | |
|             return policy
 | |
|     raise unittest.SkipTest('No policy matching %s' % kwargs)
 | |
| 
 | |
| 
 | |
| def run_cleanup(cmd):
 | |
|     p = Popen(cmd + " 2>&1", shell=True, stdout=PIPE)
 | |
|     stdout, _stderr = p.communicate()
 | |
|     if p.returncode:
 | |
|         raise AssertionError(
 | |
|             'Cleanup with %r failed: stdout: %s, stderr: %s'
 | |
|             % (cmd, stdout, _stderr))
 | |
| 
 | |
|     print(stdout)
 | |
|     Manager(['all']).stop()
 | |
| 
 | |
| 
 | |
| def resetswift():
 | |
|     run_cleanup("resetswift")
 | |
| 
 | |
| 
 | |
| def kill_orphans():
 | |
|     run_cleanup("swift-orphans -a 0 -k 9")
 | |
| 
 | |
| 
 | |
| class Body(object):
 | |
| 
 | |
|     def __init__(self, total=3.5 * 2 ** 20):
 | |
|         self.length = int(total)
 | |
|         self.hasher = md5(usedforsecurity=False)
 | |
|         self.read_amount = 0
 | |
|         self.chunk = uuid4().hex.encode('ascii') * 2 ** 10
 | |
|         self.buff = b''
 | |
| 
 | |
|     @property
 | |
|     def etag(self):
 | |
|         return self.hasher.hexdigest()
 | |
| 
 | |
|     def __len__(self):
 | |
|         return self.length
 | |
| 
 | |
|     def read(self, amount):
 | |
|         if len(self.buff) < amount:
 | |
|             try:
 | |
|                 self.buff += next(self)
 | |
|             except StopIteration:
 | |
|                 pass
 | |
|         rv, self.buff = self.buff[:amount], self.buff[amount:]
 | |
|         return rv
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __next__(self):
 | |
|         if self.buff:
 | |
|             rv, self.buff = self.buff, b''
 | |
|             return rv
 | |
|         if self.read_amount >= self.length:
 | |
|             raise StopIteration()
 | |
|         rv = self.chunk[:int(self.length - self.read_amount)]
 | |
|         self.read_amount += len(rv)
 | |
|         self.hasher.update(rv)
 | |
|         return rv
 | |
| 
 | |
|     # for py2 compat:
 | |
|     next = __next__
 | |
| 
 | |
| 
 | |
| def exclude_nodes(nodes, *excludes):
 | |
|     """
 | |
|     Iterate over ``nodes`` yielding only those not in ``excludes``.
 | |
| 
 | |
|     The index key of the node dicts is ignored when matching nodes against the
 | |
|     ``excludes`` nodes. Index is not a fundamental property of a node but a
 | |
|     variable annotation added by the Ring depending upon the partition for
 | |
|     which the nodes were generated.
 | |
| 
 | |
|     :param nodes: an iterable of node dicts.
 | |
|     :param *excludes: one or more node dicts that should not be yielded.
 | |
|     :return: yields node dicts.
 | |
|     """
 | |
|     for node in nodes:
 | |
|         match_node = {k: mock.ANY if k == 'index' else v
 | |
|                       for k, v in node.items()}
 | |
|         if any(exclude == match_node for exclude in excludes):
 | |
|             continue
 | |
|         yield node
 | |
| 
 | |
| 
 | |
| class ProbeTest(unittest.TestCase):
 | |
|     """
 | |
|     Don't instantiate this directly, use a child class instead.
 | |
|     """
 | |
| 
 | |
|     def _load_rings_and_configs(self):
 | |
|         self.ipport2server = {}
 | |
|         self.configs = defaultdict(dict)
 | |
|         self.account_ring = get_ring(
 | |
|             'account',
 | |
|             self.acct_cont_required_replicas,
 | |
|             self.acct_cont_required_devices,
 | |
|             ipport2server=self.ipport2server,
 | |
|             config_paths=self.configs)
 | |
|         self.container_ring = get_ring(
 | |
|             'container',
 | |
|             self.acct_cont_required_replicas,
 | |
|             self.acct_cont_required_devices,
 | |
|             ipport2server=self.ipport2server,
 | |
|             config_paths=self.configs)
 | |
|         self.policy = get_policy(**self.policy_requirements)
 | |
|         self.object_ring = get_ring(
 | |
|             self.policy.ring_name,
 | |
|             self.obj_required_replicas,
 | |
|             self.obj_required_devices,
 | |
|             server='object',
 | |
|             ipport2server=self.ipport2server,
 | |
|             config_paths=self.configs)
 | |
|         for server in Manager(['proxy-server']):
 | |
|             for conf in server.conf_files():
 | |
|                 self.configs['proxy-server'] = conf
 | |
| 
 | |
|     def setUp(self):
 | |
|         # previous test may have left DatabaseBroker instances in garbage with
 | |
|         # open connections to db files which will prevent unmounting devices in
 | |
|         # resetswift, so collect garbage now
 | |
|         gc.collect()
 | |
|         resetswift()
 | |
|         kill_orphans()
 | |
|         self._load_rings_and_configs()
 | |
|         try:
 | |
|             self.servers_per_port = any(
 | |
|                 int(readconf(c, section_name='object-replicator').get(
 | |
|                     'servers_per_port', '0'))
 | |
|                 for c in self.configs['object-replicator'].values())
 | |
| 
 | |
|             Manager(['main']).start(wait=True)
 | |
|             for ipport in self.ipport2server:
 | |
|                 check_server(ipport, self.ipport2server)
 | |
|             proxy_conf = readconf(self.configs['proxy-server'],
 | |
|                                   section_name='app:proxy-server')
 | |
|             proxy_ipport = (proxy_conf.get('bind_ip', '127.0.0.1'),
 | |
|                             int(proxy_conf.get('bind_port', 8080)))
 | |
|             self.ipport2server[proxy_ipport] = 'proxy'
 | |
|             self.url, self.token, self.account = check_server(
 | |
|                 proxy_ipport, self.ipport2server)
 | |
|             self.account_1 = {
 | |
|                 'url': self.url, 'token': self.token, 'account': self.account}
 | |
| 
 | |
|             rv = _retry_timeout(_check_proxy, args=(
 | |
|                 'test2:tester2', 'testing2'))
 | |
|             self.account_2 = {
 | |
|                 k: v for (k, v) in zip(('url', 'token', 'account'), rv)}
 | |
| 
 | |
|             self.replicators = Manager(
 | |
|                 ['account-replicator', 'container-replicator',
 | |
|                  'object-replicator'])
 | |
|             self.updaters = Manager(['container-updater', 'object-updater'])
 | |
|         except BaseException:
 | |
|             try:
 | |
|                 raise
 | |
|             finally:
 | |
|                 try:
 | |
|                     Manager(['all']).kill()
 | |
|                 except Exception:
 | |
|                     pass
 | |
|         info_url = "%s://%s/info" % (urlparse(self.url).scheme,
 | |
|                                      urlparse(self.url).netloc)
 | |
|         proxy_conn = client.http_connection(info_url)
 | |
|         self.cluster_info = client.get_capabilities(proxy_conn)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         Manager(['all']).kill()
 | |
| 
 | |
|     def assertLengthEqual(self, obj, length):
 | |
|         obj_len = len(obj)
 | |
|         self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
 | |
|             obj, obj_len, length))
 | |
| 
 | |
|     def device_dir(self, node):
 | |
|         server_type, config_number = get_server_number(
 | |
|             (node['ip'], node['port']), self.ipport2server)
 | |
|         repl_server = '%s-replicator' % server_type
 | |
|         conf = readconf(self.configs[repl_server][config_number],
 | |
|                         section_name=repl_server)
 | |
|         return os.path.join(conf['devices'], node['device'])
 | |
| 
 | |
|     def storage_dir(self, node, part=None, policy=None):
 | |
|         policy = policy or self.policy
 | |
|         device_path = self.device_dir(node)
 | |
|         path_parts = [device_path, get_data_dir(policy)]
 | |
|         if part is not None:
 | |
|             path_parts.append(str(part))
 | |
|         return os.path.join(*path_parts)
 | |
| 
 | |
|     def config_number(self, node):
 | |
|         _server_type, config_number = get_server_number(
 | |
|             (node['ip'], node['port']), self.ipport2server)
 | |
|         return config_number
 | |
| 
 | |
|     def is_local_to(self, node1, node2):
 | |
|         """
 | |
|         Return True if both ring devices are "local" to each other (on the same
 | |
|         "server".
 | |
|         """
 | |
|         if self.servers_per_port:
 | |
|             return node1['ip'] == node2['ip']
 | |
| 
 | |
|         # Without a disambiguating IP, for SAIOs, we have to assume ports
 | |
|         # uniquely identify "servers".  SAIOs should be configured to *either*
 | |
|         # have unique IPs per node (e.g. 127.0.0.1, 127.0.0.2, etc.) OR unique
 | |
|         # ports per server (i.e. sdb1 & sdb5 would have same port numbers in
 | |
|         # the 8-disk EC ring).
 | |
|         return node1['port'] == node2['port']
 | |
| 
 | |
|     def get_to_final_state(self):
 | |
|         # these .stop()s are probably not strictly necessary,
 | |
|         # but may prevent race conditions
 | |
|         self.replicators.stop()
 | |
|         self.updaters.stop()
 | |
| 
 | |
|         self.replicators.once()
 | |
|         self.updaters.once()
 | |
|         self.replicators.once()
 | |
| 
 | |
|     def kill_drive(self, device):
 | |
|         if os.path.ismount(device):
 | |
|             os.system('sudo umount %s' % device)
 | |
|         else:
 | |
|             renamer(device, device + "X")
 | |
| 
 | |
|     def revive_drive(self, device):
 | |
|         disabled_name = device + "X"
 | |
|         if os.path.isdir(disabled_name):
 | |
|             renamer(disabled_name, device)
 | |
|         else:
 | |
|             os.system('sudo mount %s' % device)
 | |
| 
 | |
|     def make_internal_client(self):
 | |
|         tempdir = mkdtemp()
 | |
|         try:
 | |
|             conf_path = os.path.join(tempdir, 'internal_client.conf')
 | |
|             conf_body = """
 | |
|             [DEFAULT]
 | |
|             swift_dir = /etc/swift
 | |
| 
 | |
|             [pipeline:main]
 | |
|             pipeline = catch_errors cache copy proxy-server
 | |
| 
 | |
|             [app:proxy-server]
 | |
|             use = egg:swift#proxy
 | |
|             allow_account_management = True
 | |
| 
 | |
|             [filter:copy]
 | |
|             use = egg:swift#copy
 | |
| 
 | |
|             [filter:cache]
 | |
|             use = egg:swift#memcache
 | |
| 
 | |
|             [filter:catch_errors]
 | |
|             use = egg:swift#catch_errors
 | |
|             """
 | |
|             with open(conf_path, 'w') as f:
 | |
|                 f.write(dedent(conf_body))
 | |
|             return internal_client.InternalClient(conf_path, 'test', 1)
 | |
|         finally:
 | |
|             shutil.rmtree(tempdir)
 | |
| 
 | |
|     def get_all_object_nodes(self):
 | |
|         """
 | |
|         Returns a list of all nodes in all object storage policies.
 | |
| 
 | |
|         :return: a list of node dicts.
 | |
|         """
 | |
|         all_obj_nodes = {}
 | |
|         for policy in ENABLED_POLICIES:
 | |
|             for dev in policy.object_ring.devs:
 | |
|                 all_obj_nodes[dev['device']] = dev
 | |
|         return list(all_obj_nodes.values())
 | |
| 
 | |
|     def gather_async_pendings(self, onodes=None):
 | |
|         """
 | |
|         Returns a list of paths to async pending files found on given nodes.
 | |
| 
 | |
|         :param onodes: a list of nodes. If None, check all object nodes.
 | |
|         :return: a list of file paths.
 | |
|         """
 | |
|         async_pendings = []
 | |
|         if onodes is None:
 | |
|             onodes = self.get_all_object_nodes()
 | |
|         for onode in onodes:
 | |
|             device_dir = self.device_dir(onode)
 | |
|             for ap_pol_dir in os.listdir(device_dir):
 | |
|                 if not ap_pol_dir.startswith('async_pending'):
 | |
|                     # skip 'objects', 'containers', etc.
 | |
|                     continue
 | |
|                 async_pending_dir = os.path.join(device_dir, ap_pol_dir)
 | |
|                 try:
 | |
|                     ap_dirs = os.listdir(async_pending_dir)
 | |
|                 except OSError as err:
 | |
|                     if err.errno == errno.ENOENT:
 | |
|                         pass
 | |
|                     else:
 | |
|                         raise
 | |
|                 else:
 | |
|                     for ap_dir in ap_dirs:
 | |
|                         ap_dir_fullpath = os.path.join(
 | |
|                             async_pending_dir, ap_dir)
 | |
|                         async_pendings.extend([
 | |
|                             os.path.join(ap_dir_fullpath, ent)
 | |
|                             for ent in os.listdir(ap_dir_fullpath)])
 | |
|         return async_pendings
 | |
| 
 | |
|     def run_custom_daemon(self, klass, conf_section, conf_index,
 | |
|                           custom_conf, **kwargs):
 | |
|         conf_file = self.configs[conf_section][conf_index]
 | |
|         conf = utils.readconf(conf_file, conf_section)
 | |
|         conf.update(custom_conf)
 | |
|         # Use a CaptureLogAdapter in order to preserve the pattern of tests
 | |
|         # calling the log accessor methods (e.g. get_lines_for_level) directly
 | |
|         # on the logger instance
 | |
|         with capture_logger(conf, conf.get('log_name', conf_section),
 | |
|                             log_to_console=kwargs.pop('verbose', False),
 | |
|                             log_route=conf_section) as log_adapter:
 | |
|             daemon = klass(conf, log_adapter)
 | |
|             daemon.run_once(**kwargs)
 | |
|         return daemon
 | |
| 
 | |
| 
 | |
| def _get_db_file_path(obj_dir):
 | |
|     files = sorted(os.listdir(obj_dir), reverse=True)
 | |
|     for filename in files:
 | |
|         if filename.endswith('db'):
 | |
|             return os.path.join(obj_dir, filename)
 | |
| 
 | |
| 
 | |
| class ReplProbeTest(ProbeTest):
 | |
| 
 | |
|     acct_cont_required_replicas = 3
 | |
|     acct_cont_required_devices = 4
 | |
|     obj_required_replicas = 3
 | |
|     obj_required_devices = 4
 | |
|     policy_requirements = {'policy_type': REPL_POLICY}
 | |
| 
 | |
|     def direct_container_op(self, func, account=None, container=None,
 | |
|                             expect_failure=False):
 | |
|         account = account if account else self.account
 | |
|         container = container if container else self.container_to_shard
 | |
|         cpart, cnodes = self.container_ring.get_nodes(account, container)
 | |
|         unexpected_responses = []
 | |
|         results = {}
 | |
|         for cnode in cnodes:
 | |
|             try:
 | |
|                 results[cnode['id']] = func(cnode, cpart, account, container)
 | |
|             except DirectClientException as err:
 | |
|                 if not expect_failure:
 | |
|                     unexpected_responses.append((cnode, err))
 | |
|             else:
 | |
|                 if expect_failure:
 | |
|                     unexpected_responses.append((cnode, 'success'))
 | |
|         if unexpected_responses:
 | |
|             self.fail('Unexpected responses: %s' % unexpected_responses)
 | |
|         return results
 | |
| 
 | |
|     def direct_delete_container(self, account=None, container=None,
 | |
|                                 expect_failure=False):
 | |
|         self.direct_container_op(direct_client.direct_delete_container,
 | |
|                                  account, container, expect_failure)
 | |
| 
 | |
|     def direct_head_container(self, account=None, container=None,
 | |
|                               expect_failure=False):
 | |
|         return self.direct_container_op(direct_client.direct_head_container,
 | |
|                                         account, container, expect_failure)
 | |
| 
 | |
|     def direct_get_container(self, account=None, container=None,
 | |
|                              expect_failure=False):
 | |
|         return self.direct_container_op(direct_client.direct_get_container,
 | |
|                                         account, container, expect_failure)
 | |
| 
 | |
|     def get_container_db_files(self, container):
 | |
|         opart, onodes = self.container_ring.get_nodes(self.account, container)
 | |
|         db_files = []
 | |
|         for onode in onodes:
 | |
|             node_id = self.config_number(onode)
 | |
|             device = onode['device']
 | |
|             hash_str = hash_path(self.account, container)
 | |
|             server_conf = readconf(self.configs['container-server'][node_id])
 | |
|             devices = server_conf['app:container-server']['devices']
 | |
|             obj_dir = '%s/%s/containers/%s/%s/%s/' % (devices,
 | |
|                                                       device, opart,
 | |
|                                                       hash_str[-3:], hash_str)
 | |
|             db_files.append(_get_db_file_path(obj_dir))
 | |
| 
 | |
|         return db_files
 | |
| 
 | |
| 
 | |
| class ECProbeTest(ProbeTest):
 | |
| 
 | |
|     acct_cont_required_replicas = 3
 | |
|     acct_cont_required_devices = 4
 | |
|     obj_required_replicas = 6
 | |
|     obj_required_devices = 8
 | |
|     policy_requirements = {'policy_type': EC_POLICY}
 | |
| 
 | |
|     def _make_name(self, prefix):
 | |
|         return ('%s%s' % (prefix, uuid4())).encode()
 | |
| 
 | |
|     def setUp(self):
 | |
|         super(ECProbeTest, self).setUp()
 | |
|         self.container_name = self._make_name('container-')
 | |
|         self.object_name = self._make_name('object-')
 | |
|         # sanity
 | |
|         self.assertEqual(self.policy.policy_type, EC_POLICY)
 | |
|         self.reconstructor = Manager(["object-reconstructor"])
 | |
| 
 | |
|     def proxy_put(self, extra_headers=None):
 | |
|         contents = Body()
 | |
|         headers = {
 | |
|             self._make_name('x-object-meta-').decode('utf8'):
 | |
|                 self._make_name('meta-foo-').decode('utf8'),
 | |
|         }
 | |
|         if extra_headers:
 | |
|             headers.update(extra_headers)
 | |
|         self.etag = client.put_object(self.url, self.token,
 | |
|                                       self.container_name,
 | |
|                                       self.object_name,
 | |
|                                       contents=contents, headers=headers)
 | |
| 
 | |
|     def proxy_get(self):
 | |
|         # GET object
 | |
|         headers, body = client.get_object(self.url, self.token,
 | |
|                                           self.container_name,
 | |
|                                           self.object_name,
 | |
|                                           resp_chunk_size=64 * 2 ** 10)
 | |
|         resp_checksum = md5(usedforsecurity=False)
 | |
|         for chunk in body:
 | |
|             resp_checksum.update(chunk)
 | |
|         return headers, resp_checksum.hexdigest()
 | |
| 
 | |
|     def direct_get(self, node, part, require_durable=True, extra_headers=None):
 | |
|         req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
 | |
|         if extra_headers:
 | |
|             req_headers.update(extra_headers)
 | |
|         if not require_durable:
 | |
|             req_headers.update(
 | |
|                 {'X-Backend-Fragment-Preferences': json.dumps([])})
 | |
|         # node dict has unicode values so utf8 decode our path parts too in
 | |
|         # case they have non-ascii characters
 | |
|         if six.PY2:
 | |
|             acc, con, obj = (s.decode('utf8') for s in (
 | |
|                 self.account, self.container_name, self.object_name))
 | |
|         else:
 | |
|             acc, con, obj = self.account, self.container_name, self.object_name
 | |
|         headers, data = direct_client.direct_get_object(
 | |
|             node, part, acc, con, obj, headers=req_headers,
 | |
|             resp_chunk_size=64 * 2 ** 20)
 | |
|         hasher = md5(usedforsecurity=False)
 | |
|         for chunk in data:
 | |
|             hasher.update(chunk)
 | |
|         return headers, hasher.hexdigest()
 | |
| 
 | |
|     def assert_direct_get_fails(self, onode, opart, status,
 | |
|                                 require_durable=True):
 | |
|         try:
 | |
|             self.direct_get(onode, opart, require_durable=require_durable)
 | |
|         except direct_client.DirectClientException as err:
 | |
|             self.assertEqual(err.http_status, status)
 | |
|             return err
 | |
|         else:
 | |
|             self.fail('Node data on %r was not fully destroyed!' % (onode,))
 | |
| 
 | |
|     def assert_direct_get_succeeds(self, onode, opart, require_durable=True,
 | |
|                                    extra_headers=None):
 | |
|         try:
 | |
|             return self.direct_get(onode, opart,
 | |
|                                    require_durable=require_durable,
 | |
|                                    extra_headers=extra_headers)
 | |
|         except direct_client.DirectClientException as err:
 | |
|             self.fail('Node data on %r was not available: %s' % (onode, err))
 | |
| 
 | |
|     def break_nodes(self, nodes, opart, failed, non_durable):
 | |
|         # delete partitions on the failed nodes and remove durable marker from
 | |
|         # non-durable nodes
 | |
|         made_non_durable = 0
 | |
|         for i, node in enumerate(nodes):
 | |
|             part_dir = self.storage_dir(node, part=opart)
 | |
|             if i in failed:
 | |
|                 shutil.rmtree(part_dir, True)
 | |
|                 try:
 | |
|                     self.direct_get(node, opart)
 | |
|                 except direct_client.DirectClientException as err:
 | |
|                     self.assertEqual(err.http_status, 404)
 | |
|             elif i in non_durable:
 | |
|                 for dirs, subdirs, files in os.walk(part_dir):
 | |
|                     for fname in sorted(files, reverse=True):
 | |
|                         # make the newest durable be non-durable
 | |
|                         if fname.endswith('.data'):
 | |
|                             made_non_durable += 1
 | |
|                             non_durable_fname = fname.replace('#d', '')
 | |
|                             os.rename(os.path.join(dirs, fname),
 | |
|                                       os.path.join(dirs, non_durable_fname))
 | |
| 
 | |
|                             break
 | |
|                 headers, etag = self.direct_get(node, opart,
 | |
|                                                 require_durable=False)
 | |
|                 self.assertNotIn('X-Backend-Durable-Timestamp', headers)
 | |
|             try:
 | |
|                 os.remove(os.path.join(part_dir, 'hashes.pkl'))
 | |
|             except OSError as e:
 | |
|                 if e.errno != errno.ENOENT:
 | |
|                     raise
 | |
|         return made_non_durable
 | |
| 
 | |
|     def make_durable(self, nodes, opart):
 | |
|         # ensure all data files on the specified nodes are durable
 | |
|         made_durable = 0
 | |
|         for i, node in enumerate(nodes):
 | |
|             part_dir = self.storage_dir(node, part=opart)
 | |
|             for dirs, subdirs, files in os.walk(part_dir):
 | |
|                 for fname in sorted(files, reverse=True):
 | |
|                     # make the newest non-durable be durable
 | |
|                     if (fname.endswith('.data') and
 | |
|                             not fname.endswith('#d.data')):
 | |
|                         made_durable += 1
 | |
|                         non_durable_fname = fname.replace('.data', '#d.data')
 | |
|                         os.rename(os.path.join(dirs, fname),
 | |
|                                   os.path.join(dirs, non_durable_fname))
 | |
| 
 | |
|                         break
 | |
|             headers, etag = self.assert_direct_get_succeeds(node, opart)
 | |
|             self.assertIn('X-Backend-Durable-Timestamp', headers)
 | |
|             try:
 | |
|                 os.remove(os.path.join(part_dir, 'hashes.pkl'))
 | |
|             except OSError as e:
 | |
|                 if e.errno != errno.ENOENT:
 | |
|                     raise
 | |
|         return made_durable
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     for server in ('account', 'container'):
 | |
|         try:
 | |
|             get_ring(server, 3, 4,
 | |
|                      force_validate=True)
 | |
|         except unittest.SkipTest as err:
 | |
|             sys.exit('%s ERROR: %s' % (server, err))
 | |
|         print('%s OK' % server)
 | |
|     for policy in POLICIES:
 | |
|         try:
 | |
|             get_ring(policy.ring_name, 3, 4,
 | |
|                      server='object', force_validate=True)
 | |
|         except unittest.SkipTest as err:
 | |
|             sys.exit('object ERROR (%s): %s' % (policy.name, err))
 | |
|         print('object OK (%s)' % policy.name)
 |