Merge "py3: Port common/constraints"

This commit is contained in:
Zuul
2018-06-22 11:48:41 +00:00
committed by Gerrit Code Review
4 changed files with 108 additions and 69 deletions

View File

@@ -24,7 +24,7 @@ from six.moves import urllib
from swift.common import utils, exceptions from swift.common import utils, exceptions
from swift.common.swob import HTTPBadRequest, HTTPLengthRequired, \ from swift.common.swob import HTTPBadRequest, HTTPLengthRequired, \
HTTPRequestEntityTooLarge, HTTPPreconditionFailed, HTTPNotImplemented, \ HTTPRequestEntityTooLarge, HTTPPreconditionFailed, HTTPNotImplemented, \
HTTPException HTTPException, str_from_wsgi
MAX_FILE_SIZE = 5368709122 MAX_FILE_SIZE = 5368709122
MAX_META_NAME_LENGTH = 128 MAX_META_NAME_LENGTH = 128
@@ -141,8 +141,8 @@ def check_metadata(req, target_type):
if not key: if not key:
return HTTPBadRequest(body='Metadata name cannot be empty', return HTTPBadRequest(body='Metadata name cannot be empty',
request=req, content_type='text/plain') request=req, content_type='text/plain')
bad_key = not check_utf8(key) bad_key = not check_utf8(str_from_wsgi(key))
bad_value = value and not check_utf8(value) bad_value = value and not check_utf8(str_from_wsgi(value))
if target_type in ('account', 'container') and (bad_key or bad_value): if target_type in ('account', 'container') and (bad_key or bad_value):
return HTTPBadRequest(body='Metadata must be valid UTF-8', return HTTPBadRequest(body='Metadata must be valid UTF-8',
request=req, content_type='text/plain') request=req, content_type='text/plain')
@@ -215,7 +215,7 @@ def check_object_creation(req, object_name):
return HTTPBadRequest(request=req, body=e.body, return HTTPBadRequest(request=req, body=e.body,
content_type='text/plain') content_type='text/plain')
if not check_utf8(req.headers['Content-Type']): if not check_utf8(str_from_wsgi(req.headers['Content-Type'])):
return HTTPBadRequest(request=req, body='Invalid Content-Type', return HTTPBadRequest(request=req, body='Invalid Content-Type',
content_type='text/plain') content_type='text/plain')
return check_metadata(req, 'object') return check_metadata(req, 'object')
@@ -362,16 +362,18 @@ def check_utf8(string):
return False return False
try: try:
if isinstance(string, six.text_type): if isinstance(string, six.text_type):
string.encode('utf-8') encoded = string.encode('utf-8')
decoded = string
else: else:
encoded = string
decoded = string.decode('UTF-8') decoded = string.decode('UTF-8')
if decoded.encode('UTF-8') != string: if decoded.encode('UTF-8') != encoded:
return False return False
# A UTF-8 string with surrogates in it is invalid. # A UTF-8 string with surrogates in it is invalid.
if any(0xD800 <= ord(codepoint) <= 0xDFFF if any(0xD800 <= ord(codepoint) <= 0xDFFF
for codepoint in decoded): for codepoint in decoded):
return False return False
return '\x00' not in string return b'\x00' not in encoded
# If string is unicode, decode() will raise UnicodeEncodeError # If string is unicode, decode() will raise UnicodeEncodeError
# So, we should catch both UnicodeDecodeError & UnicodeEncodeError # So, we should catch both UnicodeDecodeError & UnicodeEncodeError
except UnicodeError: except UnicodeError:
@@ -395,7 +397,7 @@ def check_name_format(req, name, target_type):
body='%s name cannot be empty' % target_type) body='%s name cannot be empty' % target_type)
if isinstance(name, six.text_type): if isinstance(name, six.text_type):
name = name.encode('utf-8') name = name.encode('utf-8')
if '/' in name: if b'/' in name:
raise HTTPPreconditionFailed( raise HTTPPreconditionFailed(
request=req, request=req,
body='%s name cannot contain slashes' % target_type) body='%s name cannot contain slashes' % target_type)

View File

@@ -219,7 +219,11 @@ def _header_int_property(header):
def header_to_environ_key(header_name): def header_to_environ_key(header_name):
header_name = 'HTTP_' + header_name.replace('-', '_').upper() # Why the to/from wsgi dance? Headers that include something like b'\xff'
# on the wire get translated to u'\u00ff' on py3, which gets upper()ed to
# u'\u0178', which is nonsense in a WSGI string.
real_header = str_from_wsgi(header_name)
header_name = 'HTTP_' + str_to_wsgi(real_header.upper()).replace('-', '_')
if header_name == 'HTTP_CONTENT_LENGTH': if header_name == 'HTTP_CONTENT_LENGTH':
return 'CONTENT_LENGTH' return 'CONTENT_LENGTH'
if header_name == 'HTTP_CONTENT_TYPE': if header_name == 'HTTP_CONTENT_TYPE':
@@ -251,8 +255,10 @@ class HeaderEnvironProxy(MutableMapping):
def __setitem__(self, key, value): def __setitem__(self, key, value):
if value is None: if value is None:
self.environ.pop(header_to_environ_key(key), None) self.environ.pop(header_to_environ_key(key), None)
elif isinstance(value, six.text_type): elif six.PY2 and isinstance(value, six.text_type):
self.environ[header_to_environ_key(key)] = value.encode('utf-8') self.environ[header_to_environ_key(key)] = value.encode('utf-8')
elif not six.PY2 and isinstance(value, six.binary_type):
self.environ[header_to_environ_key(key)] = value.decode('latin1')
else: else:
self.environ[header_to_environ_key(key)] = str(value) self.environ[header_to_environ_key(key)] = str(value)
@@ -263,7 +269,8 @@ class HeaderEnvironProxy(MutableMapping):
del self.environ[header_to_environ_key(key)] del self.environ[header_to_environ_key(key)]
def keys(self): def keys(self):
keys = [key[5:].replace('_', '-').title() # See the to/from WSGI comment in header_to_environ_key
keys = [str_to_wsgi(str_from_wsgi(key[5:]).replace('_', '-').title())
for key in self.environ if key.startswith('HTTP_')] for key in self.environ if key.startswith('HTTP_')]
if 'CONTENT_LENGTH' in self.environ: if 'CONTENT_LENGTH' in self.environ:
keys.append('Content-Length') keys.append('Content-Length')
@@ -272,6 +279,18 @@ class HeaderEnvironProxy(MutableMapping):
return keys return keys
def str_from_wsgi(wsgi_str):
if six.PY2:
return wsgi_str
return wsgi_str.encode('latin1').decode('utf8', errors='surrogateescape')
def str_to_wsgi(native_str):
if six.PY2:
return native_str
return native_str.encode('utf8', errors='surrogateescape').decode('latin1')
def _resp_status_property(): def _resp_status_property():
""" """
Set and retrieve the value of Response.status Set and retrieve the value of Response.status

View File

@@ -44,22 +44,23 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_metadata(Request.blank( resp = constraints.check_metadata(Request.blank(
'/', headers=headers), 'object') '/', headers=headers), 'object')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Metadata name cannot be empty', resp.body) self.assertIn(b'Metadata name cannot be empty', resp.body)
def test_check_metadata_non_utf8(self): def test_check_metadata_non_utf8(self):
headers = {'X-Account-Meta-Foo': b'\xff'} # Consciously using native "WSGI strings" in headers
headers = {'X-Account-Meta-Foo': '\xff'}
resp = constraints.check_metadata(Request.blank( resp = constraints.check_metadata(Request.blank(
'/', headers=headers), 'account') '/', headers=headers), 'account')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Metadata must be valid UTF-8', resp.body) self.assertIn(b'Metadata must be valid UTF-8', resp.body)
headers = {b'X-Container-Meta-\xff': 'foo'} headers = {'X-Container-Meta-\xff': 'foo'}
resp = constraints.check_metadata(Request.blank( resp = constraints.check_metadata(Request.blank(
'/', headers=headers), 'container') '/', headers=headers), 'container')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Metadata must be valid UTF-8', resp.body) self.assertIn(b'Metadata must be valid UTF-8', resp.body)
# Object's OK; its metadata isn't serialized as JSON # Object's OK; its metadata isn't serialized as JSON
headers = {'X-Object-Meta-Foo': b'\xff'} headers = {'X-Object-Meta-Foo': '\xff'}
self.assertIsNone(constraints.check_metadata(Request.blank( self.assertIsNone(constraints.check_metadata(Request.blank(
'/', headers=headers), 'object')) '/', headers=headers), 'object'))
@@ -75,8 +76,8 @@ class TestConstraints(unittest.TestCase):
'/', headers=headers), 'object') '/', headers=headers), 'object')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn( self.assertIn(
('X-Object-Meta-%s' % name).lower(), resp.body.lower()) b'x-object-meta-%s' % name.encode('ascii'), resp.body.lower())
self.assertIn('Metadata name too long', resp.body) self.assertIn(b'Metadata name too long', resp.body)
def test_check_metadata_value_length(self): def test_check_metadata_value_length(self):
value = 'a' * constraints.MAX_META_VALUE_LENGTH value = 'a' * constraints.MAX_META_VALUE_LENGTH
@@ -89,10 +90,10 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_metadata(Request.blank( resp = constraints.check_metadata(Request.blank(
'/', headers=headers), 'object') '/', headers=headers), 'object')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('x-object-meta-name', resp.body.lower()) self.assertIn(b'x-object-meta-name', resp.body.lower())
self.assertIn( self.assertIn(
str(constraints.MAX_META_VALUE_LENGTH), resp.body) str(constraints.MAX_META_VALUE_LENGTH).encode('ascii'), resp.body)
self.assertIn('Metadata value longer than 256', resp.body) self.assertIn(b'Metadata value longer than 256', resp.body)
def test_check_metadata_count(self): def test_check_metadata_count(self):
headers = {} headers = {}
@@ -105,7 +106,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_metadata(Request.blank( resp = constraints.check_metadata(Request.blank(
'/', headers=headers), 'object') '/', headers=headers), 'object')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Too many metadata items', resp.body) self.assertIn(b'Too many metadata items', resp.body)
def test_check_metadata_size(self): def test_check_metadata_size(self):
headers = {} headers = {}
@@ -132,7 +133,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_metadata(Request.blank( resp = constraints.check_metadata(Request.blank(
'/', headers=headers), 'object') '/', headers=headers), 'object')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Total metadata too large', resp.body) self.assertIn(b'Total metadata too large', resp.body)
def test_check_object_creation_content_length(self): def test_check_object_creation_content_length(self):
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE), headers = {'Content-Length': str(constraints.MAX_FILE_SIZE),
@@ -160,7 +161,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation(Request.blank( resp = constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name') '/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Invalid Transfer-Encoding header value', resp.body) self.assertIn(b'Invalid Transfer-Encoding header value', resp.body)
headers = {'Content-Type': 'text/plain', headers = {'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())} 'X-Timestamp': str(time.time())}
@@ -174,7 +175,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation(Request.blank( resp = constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name') '/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Invalid Content-Length header value', resp.body) self.assertIn(b'Invalid Content-Length header value', resp.body)
headers = {'Transfer-Encoding': 'gzip,chunked', headers = {'Transfer-Encoding': 'gzip,chunked',
'Content-Type': 'text/plain', 'Content-Type': 'text/plain',
@@ -195,7 +196,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation( resp = constraints.check_object_creation(
Request.blank('/', headers=headers), name) Request.blank('/', headers=headers), name)
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Object name length of %d longer than %d' % self.assertIn(b'Object name length of %d longer than %d' %
(MAX_OBJECT_NAME_LENGTH + 1, MAX_OBJECT_NAME_LENGTH), (MAX_OBJECT_NAME_LENGTH + 1, MAX_OBJECT_NAME_LENGTH),
resp.body) resp.body)
@@ -211,7 +212,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation( resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name') Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('No content type', resp.body) self.assertIn(b'No content type', resp.body)
def test_check_object_creation_bad_content_type(self): def test_check_object_creation_bad_content_type(self):
headers = {'Transfer-Encoding': 'chunked', headers = {'Transfer-Encoding': 'chunked',
@@ -220,7 +221,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation( resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name') Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Content-Type', resp.body) self.assertIn(b'Content-Type', resp.body)
def test_check_object_creation_bad_delete_headers(self): def test_check_object_creation_bad_delete_headers(self):
headers = {'Transfer-Encoding': 'chunked', headers = {'Transfer-Encoding': 'chunked',
@@ -230,7 +231,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation( resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name') Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-After', resp.body) self.assertIn(b'Non-integer X-Delete-After', resp.body)
t = str(int(time.time() - 60)) t = str(int(time.time() - 60))
headers = {'Transfer-Encoding': 'chunked', headers = {'Transfer-Encoding': 'chunked',
@@ -240,7 +241,7 @@ class TestConstraints(unittest.TestCase):
resp = constraints.check_object_creation( resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name') Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST) self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-At in past', resp.body) self.assertIn(b'X-Delete-At in past', resp.body)
def test_check_delete_headers(self): def test_check_delete_headers(self):
# x-delete-at value should be relative to the request timestamp rather # x-delete-at value should be relative to the request timestamp rather
@@ -265,7 +266,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-After', cm.exception.body) self.assertIn(b'Non-integer X-Delete-After', cm.exception.body)
headers = {'X-Delete-After': '60.1', headers = {'X-Delete-After': '60.1',
'X-Timestamp': ts.internal} 'X-Timestamp': ts.internal}
@@ -273,7 +274,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-After', cm.exception.body) self.assertIn(b'Non-integer X-Delete-After', cm.exception.body)
headers = {'X-Delete-After': '-1', headers = {'X-Delete-After': '-1',
'X-Timestamp': ts.internal} 'X-Timestamp': ts.internal}
@@ -281,7 +282,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-After in past', cm.exception.body) self.assertIn(b'X-Delete-After in past', cm.exception.body)
headers = {'X-Delete-After': '0', headers = {'X-Delete-After': '0',
'X-Timestamp': ts.internal} 'X-Timestamp': ts.internal}
@@ -289,7 +290,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-After in past', cm.exception.body) self.assertIn(b'X-Delete-After in past', cm.exception.body)
# x-delete-after = 0 disallowed when it results in x-delete-at equal to # x-delete-after = 0 disallowed when it results in x-delete-at equal to
# the timestamp # the timestamp
@@ -299,7 +300,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-After in past', cm.exception.body) self.assertIn(b'X-Delete-After in past', cm.exception.body)
# X-Delete-At # X-Delete-At
delete_at = str(int(ts) + 100) delete_at = str(int(ts) + 100)
@@ -317,7 +318,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-At', cm.exception.body) self.assertIn(b'Non-integer X-Delete-At', cm.exception.body)
delete_at = str(int(ts) + 100) + '.1' delete_at = str(int(ts) + 100) + '.1'
headers = {'X-Delete-At': delete_at, headers = {'X-Delete-At': delete_at,
@@ -326,7 +327,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-At', cm.exception.body) self.assertIn(b'Non-integer X-Delete-At', cm.exception.body)
delete_at = str(int(ts) - 1) delete_at = str(int(ts) - 1)
headers = {'X-Delete-At': delete_at, headers = {'X-Delete-At': delete_at,
@@ -335,7 +336,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-At in past', cm.exception.body) self.assertIn(b'X-Delete-At in past', cm.exception.body)
# x-delete-at disallowed when exactly equal to timestamp # x-delete-at disallowed when exactly equal to timestamp
delete_at = str(int(ts)) delete_at = str(int(ts))
@@ -345,7 +346,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_delete_headers( constraints.check_delete_headers(
Request.blank('/', headers=headers)) Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST) self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-At in past', cm.exception.body) self.assertIn(b'X-Delete-At in past', cm.exception.body)
def test_check_delete_headers_removes_delete_after(self): def test_check_delete_headers_removes_delete_after(self):
ts = utils.Timestamp.now() ts = utils.Timestamp.now()
@@ -505,30 +506,45 @@ class TestConstraints(unittest.TestCase):
def test_check_utf8(self): def test_check_utf8(self):
unicode_sample = u'\uc77c\uc601' unicode_sample = u'\uc77c\uc601'
valid_utf8_str = unicode_sample.encode('utf-8')
invalid_utf8_str = unicode_sample.encode('utf-8')[::-1]
unicode_with_null = u'abc\u0000def' unicode_with_null = u'abc\u0000def'
utf8_with_null = unicode_with_null.encode('utf-8')
for false_argument in [None, # Some false-y values
'', self.assertFalse(constraints.check_utf8(None))
invalid_utf8_str, self.assertFalse(constraints.check_utf8(''))
unicode_with_null, self.assertFalse(constraints.check_utf8(b''))
utf8_with_null]: self.assertFalse(constraints.check_utf8(u''))
self.assertFalse(constraints.check_utf8(false_argument))
for true_argument in ['this is ascii and utf-8, too', # invalid utf8 bytes
unicode_sample, self.assertFalse(constraints.check_utf8(
valid_utf8_str]: unicode_sample.encode('utf-8')[::-1]))
self.assertTrue(constraints.check_utf8(true_argument)) # unicode with null
self.assertFalse(constraints.check_utf8(unicode_with_null))
# utf8 bytes with null
self.assertFalse(constraints.check_utf8(
unicode_with_null.encode('utf8')))
self.assertTrue(constraints.check_utf8('this is ascii and utf-8, too'))
self.assertTrue(constraints.check_utf8(unicode_sample))
self.assertTrue(constraints.check_utf8(unicode_sample.encode('utf8')))
def test_check_utf8_non_canonical(self): def test_check_utf8_non_canonical(self):
self.assertFalse(constraints.check_utf8('\xed\xa0\xbc\xed\xbc\xb8')) self.assertFalse(constraints.check_utf8(b'\xed\xa0\xbc\xed\xbc\xb8'))
self.assertFalse(constraints.check_utf8('\xed\xa0\xbd\xed\xb9\x88')) self.assertTrue(constraints.check_utf8(u'\U0001f338'))
self.assertTrue(constraints.check_utf8(b'\xf0\x9f\x8c\xb8'))
self.assertTrue(constraints.check_utf8(u'\U0001f338'.encode('utf8')))
self.assertFalse(constraints.check_utf8(b'\xed\xa0\xbd\xed\xb9\x88'))
self.assertTrue(constraints.check_utf8(u'\U0001f648'))
def test_check_utf8_lone_surrogates(self): def test_check_utf8_lone_surrogates(self):
self.assertFalse(constraints.check_utf8('\xed\xa0\xbc')) self.assertFalse(constraints.check_utf8(b'\xed\xa0\xbc'))
self.assertFalse(constraints.check_utf8('\xed\xb9\x88')) self.assertFalse(constraints.check_utf8(u'\ud83c'))
self.assertFalse(constraints.check_utf8(b'\xed\xb9\x88'))
self.assertFalse(constraints.check_utf8(u'\ude48'))
self.assertFalse(constraints.check_utf8(u'\ud800'))
self.assertFalse(constraints.check_utf8(u'\udc00'))
self.assertFalse(constraints.check_utf8(u'\udcff'))
self.assertFalse(constraints.check_utf8(u'\udfff'))
def test_validate_bad_meta(self): def test_validate_bad_meta(self):
req = Request.blank( req = Request.blank(
@@ -537,8 +553,9 @@ class TestConstraints(unittest.TestCase):
'ab' * constraints.MAX_HEADER_SIZE}) 'ab' * constraints.MAX_HEADER_SIZE})
self.assertEqual(constraints.check_metadata(req, 'object').status_int, self.assertEqual(constraints.check_metadata(req, 'object').status_int,
HTTP_BAD_REQUEST) HTTP_BAD_REQUEST)
self.assertIn('x-object-meta-hello', constraints.check_metadata(req, resp = constraints.check_metadata(req, 'object')
'object').body.lower()) self.assertIsNotNone(resp)
self.assertIn(b'x-object-meta-hello', resp.body.lower())
def test_validate_constraints(self): def test_validate_constraints(self):
c = constraints c = constraints
@@ -574,7 +591,7 @@ class TestConstraints(unittest.TestCase):
constraints.check_container_format( constraints.check_container_format(
req, req.headers['X-Versions-Location']) req, req.headers['X-Versions-Location'])
self.assertTrue(cm.exception.body.startswith( self.assertTrue(cm.exception.body.startswith(
'Container name cannot')) b'Container name cannot'))
def test_valid_api_version(self): def test_valid_api_version(self):
version = 'v1' version = 'v1'
@@ -616,10 +633,10 @@ class TestConstraintsConfig(unittest.TestCase):
def test_override_constraints(self): def test_override_constraints(self):
try: try:
with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile() as f:
f.write('[swift-constraints]\n') f.write(b'[swift-constraints]\n')
# set everything to 1 # set everything to 1
for key in constraints.DEFAULT_CONSTRAINTS: for key in constraints.DEFAULT_CONSTRAINTS:
f.write('%s = 1\n' % key) f.write(b'%s = 1\n' % key.encode('ascii'))
f.flush() f.flush()
with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name): with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name):
constraints.reload_constraints() constraints.reload_constraints()
@@ -640,10 +657,10 @@ class TestConstraintsConfig(unittest.TestCase):
def test_reload_reset(self): def test_reload_reset(self):
try: try:
with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile() as f:
f.write('[swift-constraints]\n') f.write(b'[swift-constraints]\n')
# set everything to 1 # set everything to 1
for key in constraints.DEFAULT_CONSTRAINTS: for key in constraints.DEFAULT_CONSTRAINTS:
f.write('%s = 1\n' % key) f.write(b'%s = 1\n' % key.encode('ascii'))
f.flush() f.flush()
with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name): with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name):
constraints.reload_constraints() constraints.reload_constraints()
@@ -656,7 +673,7 @@ class TestConstraintsConfig(unittest.TestCase):
# no constraints have been loaded from non-existent swift.conf # no constraints have been loaded from non-existent swift.conf
self.assertFalse(constraints.SWIFT_CONSTRAINTS_LOADED) self.assertFalse(constraints.SWIFT_CONSTRAINTS_LOADED)
# no constraints are in OVERRIDE # no constraints are in OVERRIDE
self.assertEqual([], constraints.OVERRIDE_CONSTRAINTS.keys()) self.assertEqual([], list(constraints.OVERRIDE_CONSTRAINTS.keys()))
# the EFFECTIVE constraints mirror DEFAULT # the EFFECTIVE constraints mirror DEFAULT
self.assertEqual(constraints.EFFECTIVE_CONSTRAINTS, self.assertEqual(constraints.EFFECTIVE_CONSTRAINTS,
constraints.DEFAULT_CONSTRAINTS) constraints.DEFAULT_CONSTRAINTS)

View File

@@ -38,6 +38,7 @@ commands =
test/unit/cli/test_ringbuilder.py \ test/unit/cli/test_ringbuilder.py \
test/unit/common/middleware/test_gatekeeper.py \ test/unit/common/middleware/test_gatekeeper.py \
test/unit/common/ring \ test/unit/common/ring \
test/unit/common/test_constraints.py \
test/unit/common/test_daemon.py \ test/unit/common/test_daemon.py \
test/unit/common/test_exceptions.py \ test/unit/common/test_exceptions.py \
test/unit/common/test_header_key_dict.py \ test/unit/common/test_header_key_dict.py \