 1d7e1558b3
			
		
	
	1d7e1558b3
	
	
	
		
			
			There's still one problem, though: since swiftclient on py3 doesn't support non-ASCII characters in metadata names, none of the tests in TestReconstructorRebuildUTF8 will pass. Change-Id: I4ec879ade534e09c3a625414d8aa1f16fd600fa4
		
			
				
	
	
		
			633 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			633 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/python -u
 | |
| # Copyright (c) 2010-2012 OpenStack Foundation
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | |
| # implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| from unittest import main
 | |
| from uuid import uuid4
 | |
| import random
 | |
| from hashlib import md5
 | |
| from collections import defaultdict
 | |
| import os
 | |
| import socket
 | |
| import errno
 | |
| 
 | |
| from swiftclient import client
 | |
| 
 | |
| from swift.common import direct_client
 | |
| from swift.common.exceptions import ClientException
 | |
| from swift.common.manager import Manager
 | |
| from test.probe.common import (kill_server, start_server, ReplProbeTest,
 | |
|                                ECProbeTest, Body)
 | |
| 
 | |
| 
 | |
| class TestObjectHandoff(ReplProbeTest):
 | |
| 
 | |
|     def test_main(self):
 | |
|         # Create container
 | |
|         container = 'container-%s' % uuid4()
 | |
|         client.put_container(self.url, self.token, container,
 | |
|                              headers={'X-Storage-Policy':
 | |
|                                       self.policy.name})
 | |
| 
 | |
|         # Kill one container/obj primary server
 | |
|         cpart, cnodes = self.container_ring.get_nodes(self.account, container)
 | |
|         cnode = cnodes[0]
 | |
|         obj = 'object-%s' % uuid4()
 | |
|         opart, onodes = self.object_ring.get_nodes(
 | |
|             self.account, container, obj)
 | |
|         onode = onodes[0]
 | |
|         kill_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Create container/obj (goes to two primary servers and one handoff)
 | |
|         client.put_object(self.url, self.token, container, obj, b'VERIFY')
 | |
|         odata = client.get_object(self.url, self.token, container, obj)[-1]
 | |
|         if odata != b'VERIFY':
 | |
|             raise Exception('Object GET did not return VERIFY, instead it '
 | |
|                             'returned: %s' % repr(odata))
 | |
| 
 | |
|         # Stash the on disk data from a primary for future comparison with the
 | |
|         # handoff - this may not equal 'VERIFY' if for example the proxy has
 | |
|         # crypto enabled
 | |
|         direct_get_data = direct_client.direct_get_object(
 | |
|             onodes[1], opart, self.account, container, obj, headers={
 | |
|                 'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
 | |
| 
 | |
|         # Kill other two container/obj primary servers
 | |
|         #   to ensure GET handoff works
 | |
|         for node in onodes[1:]:
 | |
|             kill_server((node['ip'], node['port']), self.ipport2server)
 | |
| 
 | |
|         # Indirectly through proxy assert we can get container/obj
 | |
|         odata = client.get_object(self.url, self.token, container, obj)[-1]
 | |
|         if odata != b'VERIFY':
 | |
|             raise Exception('Object GET did not return VERIFY, instead it '
 | |
|                             'returned: %s' % repr(odata))
 | |
| 
 | |
|         # Restart those other two container/obj primary servers
 | |
|         for node in onodes[1:]:
 | |
|             start_server((node['ip'], node['port']), self.ipport2server)
 | |
| 
 | |
|         # We've indirectly verified the handoff node has the container/object,
 | |
|         #   but let's directly verify it.
 | |
|         another_onode = next(self.object_ring.get_more_nodes(opart))
 | |
|         odata = direct_client.direct_get_object(
 | |
|             another_onode, opart, self.account, container, obj, headers={
 | |
|                 'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
 | |
|         self.assertEqual(direct_get_data, odata)
 | |
| 
 | |
|         # drop a tempfile in the handoff's datadir, like it might have
 | |
|         # had if there was an rsync failure while it was previously a
 | |
|         # primary
 | |
|         handoff_device_path = self.device_dir('object', another_onode)
 | |
|         data_filename = None
 | |
|         for root, dirs, files in os.walk(handoff_device_path):
 | |
|             for filename in files:
 | |
|                 if filename.endswith('.data'):
 | |
|                     data_filename = filename
 | |
|                     temp_filename = '.%s.6MbL6r' % data_filename
 | |
|                     temp_filepath = os.path.join(root, temp_filename)
 | |
|         if not data_filename:
 | |
|             self.fail('Did not find any data files on %r' %
 | |
|                       handoff_device_path)
 | |
|         open(temp_filepath, 'w')
 | |
| 
 | |
|         # Assert container listing (via proxy and directly) has container/obj
 | |
|         objs = [o['name'] for o in
 | |
|                 client.get_container(self.url, self.token, container)[1]]
 | |
|         if obj not in objs:
 | |
|             raise Exception('Container listing did not know about object')
 | |
|         for cnode in cnodes:
 | |
|             objs = [o['name'] for o in
 | |
|                     direct_client.direct_get_container(
 | |
|                         cnode, cpart, self.account, container)[1]]
 | |
|             if obj not in objs:
 | |
|                 raise Exception(
 | |
|                     'Container server %s:%s did not know about object' %
 | |
|                     (cnode['ip'], cnode['port']))
 | |
| 
 | |
|         # Bring the first container/obj primary server back up
 | |
|         start_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Assert that it doesn't have container/obj yet
 | |
|         try:
 | |
|             direct_client.direct_get_object(
 | |
|                 onode, opart, self.account, container, obj, headers={
 | |
|                     'X-Backend-Storage-Policy-Index': self.policy.idx})
 | |
|         except ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|         # Run object replication, ensuring we run the handoff node last so it
 | |
|         #   will remove its extra handoff partition
 | |
|         for node in onodes:
 | |
|             try:
 | |
|                 port_num = node['replication_port']
 | |
|             except KeyError:
 | |
|                 port_num = node['port']
 | |
|             node_id = (port_num - 6000) // 10
 | |
|             Manager(['object-replicator']).once(number=node_id)
 | |
|         try:
 | |
|             another_port_num = another_onode['replication_port']
 | |
|         except KeyError:
 | |
|             another_port_num = another_onode['port']
 | |
|         another_num = (another_port_num - 6000) // 10
 | |
|         Manager(['object-replicator']).once(number=another_num)
 | |
| 
 | |
|         # Assert the first container/obj primary server now has container/obj
 | |
|         odata = direct_client.direct_get_object(
 | |
|             onode, opart, self.account, container, obj, headers={
 | |
|                 'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
 | |
|         self.assertEqual(direct_get_data, odata)
 | |
| 
 | |
|         # and that it does *not* have a temporary rsync dropping!
 | |
|         found_data_filename = False
 | |
|         primary_device_path = self.device_dir('object', onode)
 | |
|         for root, dirs, files in os.walk(primary_device_path):
 | |
|             for filename in files:
 | |
|                 if filename.endswith('.6MbL6r'):
 | |
|                     self.fail('Found unexpected file %s' %
 | |
|                               os.path.join(root, filename))
 | |
|                 if filename == data_filename:
 | |
|                     found_data_filename = True
 | |
|         self.assertTrue(found_data_filename,
 | |
|                         'Did not find data file %r on %r' % (
 | |
|                             data_filename, primary_device_path))
 | |
| 
 | |
|         # Assert the handoff server no longer has container/obj
 | |
|         try:
 | |
|             direct_client.direct_get_object(
 | |
|                 another_onode, opart, self.account, container, obj, headers={
 | |
|                     'X-Backend-Storage-Policy-Index': self.policy.idx})
 | |
|         except ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|         # Kill the first container/obj primary server again (we have two
 | |
|         #   primaries and the handoff up now)
 | |
|         kill_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Delete container/obj
 | |
|         try:
 | |
|             client.delete_object(self.url, self.token, container, obj)
 | |
|         except client.ClientException as err:
 | |
|             if self.object_ring.replica_count > 2:
 | |
|                 raise
 | |
|             # Object DELETE returning 503 for (404, 204)
 | |
|             # remove this with fix for
 | |
|             # https://bugs.launchpad.net/swift/+bug/1318375
 | |
|             self.assertEqual(503, err.http_status)
 | |
| 
 | |
|         # Assert we can't head container/obj
 | |
|         try:
 | |
|             client.head_object(self.url, self.token, container, obj)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|         # Assert container/obj is not in the container listing, both indirectly
 | |
|         #   and directly
 | |
|         objs = [o['name'] for o in
 | |
|                 client.get_container(self.url, self.token, container)[1]]
 | |
|         if obj in objs:
 | |
|             raise Exception('Container listing still knew about object')
 | |
|         for cnode in cnodes:
 | |
|             objs = [o['name'] for o in
 | |
|                     direct_client.direct_get_container(
 | |
|                         cnode, cpart, self.account, container)[1]]
 | |
|             if obj in objs:
 | |
|                 raise Exception(
 | |
|                     'Container server %s:%s still knew about object' %
 | |
|                     (cnode['ip'], cnode['port']))
 | |
| 
 | |
|         # Restart the first container/obj primary server again
 | |
|         start_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Assert it still has container/obj
 | |
|         direct_client.direct_get_object(
 | |
|             onode, opart, self.account, container, obj, headers={
 | |
|                 'X-Backend-Storage-Policy-Index': self.policy.idx})
 | |
| 
 | |
|         # Run object replication, ensuring we run the handoff node last so it
 | |
|         #   will remove its extra handoff partition
 | |
|         for node in onodes:
 | |
|             try:
 | |
|                 port_num = node['replication_port']
 | |
|             except KeyError:
 | |
|                 port_num = node['port']
 | |
|             node_id = (port_num - 6000) // 10
 | |
|             Manager(['object-replicator']).once(number=node_id)
 | |
|         another_node_id = (another_port_num - 6000) // 10
 | |
|         Manager(['object-replicator']).once(number=another_node_id)
 | |
| 
 | |
|         # Assert primary node no longer has container/obj
 | |
|         try:
 | |
|             direct_client.direct_get_object(
 | |
|                 another_onode, opart, self.account, container, obj, headers={
 | |
|                     'X-Backend-Storage-Policy-Index': self.policy.idx})
 | |
|         except ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|     def test_stale_reads(self):
 | |
|         # Create container
 | |
|         container = 'container-%s' % uuid4()
 | |
|         client.put_container(self.url, self.token, container,
 | |
|                              headers={'X-Storage-Policy':
 | |
|                                       self.policy.name})
 | |
| 
 | |
|         # Kill one primary obj server
 | |
|         obj = 'object-%s' % uuid4()
 | |
|         opart, onodes = self.object_ring.get_nodes(
 | |
|             self.account, container, obj)
 | |
|         onode = onodes[0]
 | |
|         kill_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Create container/obj (goes to two primaries and one handoff)
 | |
|         client.put_object(self.url, self.token, container, obj, b'VERIFY')
 | |
|         odata = client.get_object(self.url, self.token, container, obj)[-1]
 | |
|         if odata != b'VERIFY':
 | |
|             raise Exception('Object GET did not return VERIFY, instead it '
 | |
|                             'returned: %s' % repr(odata))
 | |
| 
 | |
|         # Stash the on disk data from a primary for future comparison with the
 | |
|         # handoff - this may not equal 'VERIFY' if for example the proxy has
 | |
|         # crypto enabled
 | |
|         direct_get_data = direct_client.direct_get_object(
 | |
|             onodes[1], opart, self.account, container, obj, headers={
 | |
|                 'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
 | |
| 
 | |
|         # Restart the first container/obj primary server again
 | |
|         start_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # send a delete request to primaries
 | |
|         client.delete_object(self.url, self.token, container, obj)
 | |
| 
 | |
|         # there should be .ts files in all primaries now
 | |
|         for node in onodes:
 | |
|             try:
 | |
|                 direct_client.direct_get_object(
 | |
|                     node, opart, self.account, container, obj, headers={
 | |
|                         'X-Backend-Storage-Policy-Index': self.policy.idx})
 | |
|             except ClientException as err:
 | |
|                 self.assertEqual(err.http_status, 404)
 | |
|             else:
 | |
|                 self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|         # verify that handoff still has the data, DELETEs should have gone
 | |
|         # only to primaries
 | |
|         another_onode = next(self.object_ring.get_more_nodes(opart))
 | |
|         handoff_data = direct_client.direct_get_object(
 | |
|             another_onode, opart, self.account, container, obj, headers={
 | |
|                 'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
 | |
|         self.assertEqual(handoff_data, direct_get_data)
 | |
| 
 | |
|         # Indirectly (i.e., through proxy) try to GET object, it should return
 | |
|         # a 404, before bug #1560574, the proxy would return the stale object
 | |
|         # from the handoff
 | |
|         try:
 | |
|             client.get_object(self.url, self.token, container, obj)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|     def test_missing_primaries(self):
 | |
|         # Create container
 | |
|         container = 'container-%s' % uuid4()
 | |
|         client.put_container(self.url, self.token, container,
 | |
|                              headers={'X-Storage-Policy':
 | |
|                                       self.policy.name})
 | |
| 
 | |
|         # Create container/obj (goes to all three primaries)
 | |
|         obj = 'object-%s' % uuid4()
 | |
|         client.put_object(self.url, self.token, container, obj, b'VERIFY')
 | |
|         odata = client.get_object(self.url, self.token, container, obj)[-1]
 | |
|         if odata != b'VERIFY':
 | |
|             raise Exception('Object GET did not return VERIFY, instead it '
 | |
|                             'returned: %s' % repr(odata))
 | |
| 
 | |
|         # Kill all primaries obj server
 | |
|         obj = 'object-%s' % uuid4()
 | |
|         opart, onodes = self.object_ring.get_nodes(
 | |
|             self.account, container, obj)
 | |
|         for onode in onodes:
 | |
|             kill_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Indirectly (i.e., through proxy) try to GET object, it should return
 | |
|         # a 503, since all primaries will Timeout and handoffs return a 404.
 | |
|         try:
 | |
|             client.get_object(self.url, self.token, container, obj)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 503)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|         # Restart the first container/obj primary server again
 | |
|         onode = onodes[0]
 | |
|         start_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # Send a delete that will reach first primary and handoff.
 | |
|         # Sure, the DELETE will return a 404 since the handoff doesn't
 | |
|         # have a .data file, but object server will still write a
 | |
|         # Tombstone in the handoff node!
 | |
|         try:
 | |
|             client.delete_object(self.url, self.token, container, obj)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
| 
 | |
|         # kill the first container/obj primary server again
 | |
|         kill_server((onode['ip'], onode['port']), self.ipport2server)
 | |
| 
 | |
|         # a new GET should return a 404, since all primaries will Timeout
 | |
|         # and the handoff will return a 404 but this time with a tombstone
 | |
|         try:
 | |
|             client.get_object(self.url, self.token, container, obj)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
| 
 | |
| class TestECObjectHandoff(ECProbeTest):
 | |
| 
 | |
|     def get_object(self, container_name, object_name):
 | |
|         headers, body = client.get_object(self.url, self.token,
 | |
|                                           container_name,
 | |
|                                           object_name,
 | |
|                                           resp_chunk_size=64 * 2 ** 10)
 | |
|         resp_checksum = md5()
 | |
|         for chunk in body:
 | |
|             resp_checksum.update(chunk)
 | |
|         return resp_checksum.hexdigest()
 | |
| 
 | |
|     def test_ec_handoff_overwrite(self):
 | |
|         container_name = 'container-%s' % uuid4()
 | |
|         object_name = 'object-%s' % uuid4()
 | |
| 
 | |
|         # create EC container
 | |
|         headers = {'X-Storage-Policy': self.policy.name}
 | |
|         client.put_container(self.url, self.token, container_name,
 | |
|                              headers=headers)
 | |
| 
 | |
|         # PUT object
 | |
|         old_contents = Body()
 | |
|         client.put_object(self.url, self.token, container_name,
 | |
|                           object_name, contents=old_contents)
 | |
| 
 | |
|         # get our node lists
 | |
|         opart, onodes = self.object_ring.get_nodes(
 | |
|             self.account, container_name, object_name)
 | |
| 
 | |
|         # shutdown one of the primary data nodes
 | |
|         failed_primary = random.choice(onodes)
 | |
|         failed_primary_device_path = self.device_dir('object', failed_primary)
 | |
|         # first read its ec etag value for future reference - this may not
 | |
|         # equal old_contents.etag if for example the proxy has crypto enabled
 | |
|         req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
 | |
|         headers = direct_client.direct_head_object(
 | |
|             failed_primary, opart, self.account, container_name,
 | |
|             object_name, headers=req_headers)
 | |
|         old_backend_etag = headers['X-Object-Sysmeta-EC-Etag']
 | |
| 
 | |
|         self.kill_drive(failed_primary_device_path)
 | |
| 
 | |
|         # overwrite our object with some new data
 | |
|         new_contents = Body()
 | |
|         client.put_object(self.url, self.token, container_name,
 | |
|                           object_name, contents=new_contents)
 | |
|         self.assertNotEqual(new_contents.etag, old_contents.etag)
 | |
| 
 | |
|         # restore failed primary device
 | |
|         self.revive_drive(failed_primary_device_path)
 | |
| 
 | |
|         # sanity - failed node has old contents
 | |
|         req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
 | |
|         headers = direct_client.direct_head_object(
 | |
|             failed_primary, opart, self.account, container_name,
 | |
|             object_name, headers=req_headers)
 | |
|         self.assertEqual(headers['X-Object-Sysmeta-EC-Etag'],
 | |
|                          old_backend_etag)
 | |
| 
 | |
|         # we have 1 primary with wrong old etag, and we should have 5 with
 | |
|         # new etag plus a handoff with the new etag, so killing 2 other
 | |
|         # primaries forces proxy to try to GET from all primaries plus handoff.
 | |
|         other_nodes = [n for n in onodes if n != failed_primary]
 | |
|         random.shuffle(other_nodes)
 | |
|         # grab the value of the new content's ec etag for future reference
 | |
|         headers = direct_client.direct_head_object(
 | |
|             other_nodes[0], opart, self.account, container_name,
 | |
|             object_name, headers=req_headers)
 | |
|         new_backend_etag = headers['X-Object-Sysmeta-EC-Etag']
 | |
|         for node in other_nodes[:2]:
 | |
|             self.kill_drive(self.device_dir('object', node))
 | |
| 
 | |
|         # sanity, after taking out two primaries we should be down to
 | |
|         # only four primaries, one of which has the old etag - but we
 | |
|         # also have a handoff with the new etag out there
 | |
|         found_frags = defaultdict(int)
 | |
|         req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
 | |
|         for node in onodes + list(self.object_ring.get_more_nodes(opart)):
 | |
|             try:
 | |
|                 headers = direct_client.direct_head_object(
 | |
|                     node, opart, self.account, container_name,
 | |
|                     object_name, headers=req_headers)
 | |
|             except Exception:
 | |
|                 continue
 | |
|             found_frags[headers['X-Object-Sysmeta-EC-Etag']] += 1
 | |
|         self.assertEqual(found_frags, {
 | |
|             new_backend_etag: 4,  # this should be enough to rebuild!
 | |
|             old_backend_etag: 1,
 | |
|         })
 | |
| 
 | |
|         # clear node error limiting
 | |
|         Manager(['proxy']).restart()
 | |
| 
 | |
|         resp_etag = self.get_object(container_name, object_name)
 | |
|         self.assertEqual(resp_etag, new_contents.etag)
 | |
| 
 | |
|     def _check_nodes(self, opart, onodes, container_name, object_name):
 | |
|         found_frags = defaultdict(int)
 | |
|         req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
 | |
|         for node in onodes + list(self.object_ring.get_more_nodes(opart)):
 | |
|             try:
 | |
|                 headers = direct_client.direct_head_object(
 | |
|                     node, opart, self.account, container_name,
 | |
|                     object_name, headers=req_headers)
 | |
|             except socket.error as e:
 | |
|                 if e.errno != errno.ECONNREFUSED:
 | |
|                     raise
 | |
|             except direct_client.DirectClientException as e:
 | |
|                 if e.http_status != 404:
 | |
|                     raise
 | |
|             else:
 | |
|                 found_frags[headers['X-Object-Sysmeta-Ec-Frag-Index']] += 1
 | |
|         return found_frags
 | |
| 
 | |
|     def test_ec_handoff_duplicate_available(self):
 | |
|         container_name = 'container-%s' % uuid4()
 | |
|         object_name = 'object-%s' % uuid4()
 | |
| 
 | |
|         # create EC container
 | |
|         headers = {'X-Storage-Policy': self.policy.name}
 | |
|         client.put_container(self.url, self.token, container_name,
 | |
|                              headers=headers)
 | |
| 
 | |
|         # get our node lists
 | |
|         opart, onodes = self.object_ring.get_nodes(
 | |
|             self.account, container_name, object_name)
 | |
| 
 | |
|         # find both primary servers that have both of their devices in
 | |
|         # the primary node list
 | |
|         group_nodes_by_config = defaultdict(list)
 | |
|         for n in onodes:
 | |
|             group_nodes_by_config[self.config_number(n)].append(n)
 | |
|         double_disk_primary = []
 | |
|         for config_number, node_list in group_nodes_by_config.items():
 | |
|             if len(node_list) > 1:
 | |
|                 double_disk_primary.append((config_number, node_list))
 | |
| 
 | |
|         # sanity, in a 4+2 with 8 disks two servers will be doubled
 | |
|         self.assertEqual(len(double_disk_primary), 2)
 | |
| 
 | |
|         # shutdown the first double primary
 | |
|         primary0_config_number, primary0_node_list = double_disk_primary[0]
 | |
|         Manager(['object-server']).stop(number=primary0_config_number)
 | |
| 
 | |
|         # PUT object
 | |
|         contents = Body()
 | |
|         client.put_object(self.url, self.token, container_name,
 | |
|                           object_name, contents=contents)
 | |
| 
 | |
|         # sanity fetch two frags on handoffs
 | |
|         handoff_frags = []
 | |
|         for node in self.object_ring.get_more_nodes(opart):
 | |
|             headers, data = direct_client.direct_get_object(
 | |
|                 node, opart, self.account, container_name, object_name,
 | |
|                 headers={'X-Backend-Storage-Policy-Index': int(self.policy)}
 | |
|             )
 | |
|             handoff_frags.append((node, headers, data))
 | |
| 
 | |
|         # bring the first double primary back, and fail the other one
 | |
|         Manager(['object-server']).start(number=primary0_config_number)
 | |
|         primary1_config_number, primary1_node_list = double_disk_primary[1]
 | |
|         Manager(['object-server']).stop(number=primary1_config_number)
 | |
| 
 | |
|         # we can still GET the object
 | |
|         resp_etag = self.get_object(container_name, object_name)
 | |
|         self.assertEqual(resp_etag, contents.etag)
 | |
| 
 | |
|         # now start to "revert" the first handoff frag
 | |
|         node = primary0_node_list[0]
 | |
|         handoff_node, headers, data = handoff_frags[0]
 | |
|         # N.B. object server api returns quoted ETag
 | |
|         headers['ETag'] = headers['Etag'].strip('"')
 | |
|         headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
 | |
|         direct_client.direct_put_object(
 | |
|             node, opart,
 | |
|             self.account, container_name, object_name,
 | |
|             contents=data, headers=headers)
 | |
| 
 | |
|         # sanity - check available frags
 | |
|         frag2count = self._check_nodes(opart, onodes,
 | |
|                                        container_name, object_name)
 | |
|         # ... five frags total
 | |
|         self.assertEqual(sum(frag2count.values()), 5)
 | |
|         # ... only 4 unique indexes
 | |
|         self.assertEqual(len(frag2count), 4)
 | |
| 
 | |
|         # we can still GET the object
 | |
|         resp_etag = self.get_object(container_name, object_name)
 | |
|         self.assertEqual(resp_etag, contents.etag)
 | |
| 
 | |
|         # ... but we need both handoffs or we get a error
 | |
|         for handoff_node, hdrs, data in handoff_frags:
 | |
|             Manager(['object-server']).stop(
 | |
|                 number=self.config_number(handoff_node))
 | |
|             with self.assertRaises(Exception) as cm:
 | |
|                 self.get_object(container_name, object_name)
 | |
|             self.assertIn(cm.exception.http_status, (404, 503))
 | |
|             Manager(['object-server']).start(
 | |
|                 number=self.config_number(handoff_node))
 | |
| 
 | |
|         # fix everything
 | |
|         Manager(['object-server']).start(number=primary1_config_number)
 | |
|         Manager(["object-reconstructor"]).once()
 | |
| 
 | |
|         # sanity - check available frags
 | |
|         frag2count = self._check_nodes(opart, onodes,
 | |
|                                        container_name, object_name)
 | |
|         # ... six frags total
 | |
|         self.assertEqual(sum(frag2count.values()), 6)
 | |
|         # ... all six unique
 | |
|         self.assertEqual(len(frag2count), 6)
 | |
| 
 | |
|     def test_ec_primary_timeout(self):
 | |
|         container_name = 'container-%s' % uuid4()
 | |
|         object_name = 'object-%s' % uuid4()
 | |
| 
 | |
|         # create EC container
 | |
|         headers = {'X-Storage-Policy': self.policy.name}
 | |
|         client.put_container(self.url, self.token, container_name,
 | |
|                              headers=headers)
 | |
| 
 | |
|         # PUT object, should go to primary nodes
 | |
|         old_contents = Body()
 | |
|         client.put_object(self.url, self.token, container_name,
 | |
|                           object_name, contents=old_contents)
 | |
| 
 | |
|         # get our node lists
 | |
|         opart, onodes = self.object_ring.get_nodes(
 | |
|             self.account, container_name, object_name)
 | |
| 
 | |
|         # shutdown three of the primary data nodes
 | |
|         for i in range(3):
 | |
|             failed_primary = onodes[i]
 | |
|             failed_primary_device_path = self.device_dir('object',
 | |
|                                                          failed_primary)
 | |
|             self.kill_drive(failed_primary_device_path)
 | |
| 
 | |
|         # Indirectly (i.e., through proxy) try to GET object, it should return
 | |
|         # a 503, since all primaries will Timeout and handoffs return a 404.
 | |
|         try:
 | |
|             client.get_object(self.url, self.token, container_name,
 | |
|                               object_name)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 503)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
|         # Send a delete to write down tombstones in the handoff nodes
 | |
|         client.delete_object(self.url, self.token, container_name, object_name)
 | |
| 
 | |
|         # Now a new GET should return 404 because the handoff nodes
 | |
|         # return a 404 with a Tombstone.
 | |
|         try:
 | |
|             client.get_object(self.url, self.token, container_name,
 | |
|                               object_name)
 | |
|         except client.ClientException as err:
 | |
|             self.assertEqual(err.http_status, 404)
 | |
|         else:
 | |
|             self.fail("Expected ClientException but didn't get it")
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |