ssync-receiver: terminate session if subreq read times out

If a PUT subrequest body iterator times out while the object server is
reading it, the object server will handle the timeout and return a 500
response to the ssync receiver.

Previously, the ssync receiver would attempt to drain the remainder of
the subrequest body iterator and then resume reading the next
subrequest from the SSYNC body. However, the attempt to drain the
subrequest iterator would fail (silently) because the timeout had
already caused the iterator to exit.

The ssync receiver would therefore treat any remaining subrequest body
as the preamble to the next subrequest. This remaining subrequest body
content was likely to cause the protocol parsing to fail, but in the
worst case could be erroneously interpreted as a valid subrequest.

(The exact failure mechanism depends on what state the
eventlet.wsgi.Input is left in when the timeout fired.)

This patch ensures that the ssync receiver will terminate processing
an SSYNC request if an exception occurs while reading a subrequest
body.

Closes-Bug: #2115991
Change-Id: I585e8a916d947c3da8d7c0e8a85cf0a8ab85f7f0
Co-authored-by: Tim Burke <tim.burke@gmail.com>
Signed-off-by: Alistair Coles <alistairncoles@gmail.com>
This commit is contained in:
Tim Burke
2025-06-27 10:21:31 -07:00
committed by Alistair Coles
parent 220f791466
commit 1dc3307eaf
4 changed files with 566 additions and 103 deletions

View File

@@ -16,7 +16,7 @@
import eventlet.greenio
import eventlet.wsgi
from eventlet import sleep
from eventlet import sleep, Timeout
import urllib
from swift.common import exceptions
@@ -104,6 +104,93 @@ def encode_wanted(remote, local):
return None
class SsyncInputProxy:
"""
Wraps a wsgi input to provide ssync specific read methods.
If any exception or timeout is raised while reading from the input then
subsequent calls will raise the same exception. Callers are thereby
prevented from reading the input after it has raised an exception, when its
state may be uncertain. This enables the input to be safely shared by
multiple callers (typically an ssync Receiver and an ObjectController) who
may otherwise each be unaware that the other has encountered an exception.
:param wsgi_input: a wsgi input
:param chunk_size: the number of bytes to read at a time
:param timeout: the timeout in seconds applied to each read
"""
def __init__(self, wsgi_input, chunk_size, timeout):
self.wsgi_input = wsgi_input
self.chunk_size = chunk_size
self.timeout = timeout
self.exception = None
def read_line(self, context):
"""
Try to read a line from the wsgi input; annotate any timeout or read
errors with a description of the calling context.
:param context: string to annotate any exception raised
"""
if self.exception:
raise self.exception
try:
try:
with exceptions.MessageTimeout(self.timeout, context):
line = self.wsgi_input.readline(self.chunk_size)
except (eventlet.wsgi.ChunkReadError, IOError) as err:
raise exceptions.ChunkReadError('%s: %s' % (context, err))
except (Exception, Timeout) as err:
self.exception = err
raise
if line and not line.endswith(b'\n'):
# Everywhere we would call readline, we should always get
# a clean end-of-line as we should be reading
# SSYNC-specific messages or HTTP request lines/headers.
# If we didn't, it indicates that the wsgi input readline reached a
# valid end of chunked body without finding a newline.
raise exceptions.ChunkReadError(
'%s: %s' % (context, 'missing newline'))
return line
def _read_chunk(self, context, size):
if self.exception:
raise self.exception
try:
try:
with exceptions.MessageTimeout(self.timeout, context):
chunk = self.wsgi_input.read(size)
except (eventlet.wsgi.ChunkReadError, IOError) as err:
raise exceptions.ChunkReadError('%s: %s' % (context, err))
if not chunk:
raise exceptions.ChunkReadError(
'Early termination for %s' % context)
except (Exception, Timeout) as err:
self.exception = err
raise
return chunk
def make_subreq_input(self, context, content_length):
"""
Returns a wsgi input that will read up to the given ``content-length``
from the wrapped wsgi input.
:param context: string to annotate any exception raised
:param content_length: maximum number of bytes to read
"""
def subreq_iter():
bytes_left = content_length
while bytes_left > 0:
size = min(bytes_left, self.chunk_size)
chunk = self._read_chunk(context, size)
bytes_left -= len(chunk)
yield chunk
return utils.FileLikeIter(subreq_iter())
class Receiver(object):
"""
Handles incoming SSYNC requests to the object server.
@@ -142,7 +229,7 @@ class Receiver(object):
self.request = request
self.device = None
self.partition = None
self.fp = None
self.input = None
# We default to dropping the connection in case there is any exception
# raised during processing because otherwise the sender could send for
# quite some time before realizing it was all in vain.
@@ -210,6 +297,8 @@ class Receiver(object):
'%s/%s/%s read failed in ssync.Receiver: %s' % (
self.request.remote_addr, self.device, self.partition,
err))
# Since the client (presumably) hung up, no point in trying to
# send anything about the error
except swob.HTTPException as err:
body = b''.join(err({}, lambda *args: None))
yield (':ERROR: %d %r\n' % (
@@ -260,18 +349,9 @@ class Receiver(object):
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if not self.diskfile_mgr.get_dev_path(self.device):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
def _readline(self, context):
# try to read a line from the wsgi input; annotate any timeout or read
# errors with a description of the calling context
with exceptions.MessageTimeout(
self.app.client_timeout, context):
try:
line = self.fp.readline(self.app.network_chunk_size)
except (eventlet.wsgi.ChunkReadError, IOError) as err:
raise exceptions.ChunkReadError('%s: %s' % (context, err))
return line
self.input = SsyncInputProxy(self.request.environ['wsgi.input'],
self.app.network_chunk_size,
self.app.client_timeout)
def _check_local(self, remote, make_durable=True):
"""
@@ -382,7 +462,7 @@ class Receiver(object):
have to read while it writes to ensure network buffers don't
fill up and block everything.
"""
line = self._readline('missing_check start')
line = self.input.read_line('missing_check start')
if not line:
# Guess they hung up
raise SsyncClientDisconnected
@@ -393,7 +473,7 @@ class Receiver(object):
object_hashes = []
nlines = 0
while True:
line = self._readline('missing_check line')
line = self.input.read_line('missing_check line')
if not line or line.strip() == b':MISSING_CHECK: END':
break
want = self._check_missing(line)
@@ -446,7 +526,7 @@ class Receiver(object):
success. This is so the sender knows if it can remove an out
of place partition, for example.
"""
line = self._readline('updates start')
line = self.input.read_line('updates start')
if not line:
# Guess they hung up waiting for us to process the missing check
raise SsyncClientDisconnected
@@ -457,11 +537,12 @@ class Receiver(object):
failures = 0
updates = 0
while True:
line = self._readline('updates line')
line = self.input.read_line('updates line')
if not line or line.strip() == b':UPDATES: END':
break
# Read first line METHOD PATH of subrequest.
method, path = swob.bytes_to_wsgi(line.strip()).split(' ', 1)
context = swob.bytes_to_wsgi(line.strip())
method, path = context.split(' ', 1)
subreq = swob.Request.blank(
'/%s/%s%s' % (self.device, self.partition, path),
environ={'REQUEST_METHOD': method})
@@ -469,10 +550,9 @@ class Receiver(object):
content_length = None
replication_headers = []
while True:
line = self._readline('updates line')
line = self.input.read_line('updates line')
if not line:
raise Exception(
'Got no headers for %s %s' % (method, path))
raise Exception('Got no headers for %s' % context)
line = line.strip()
if not line:
break
@@ -500,24 +580,9 @@ class Receiver(object):
% (method, path))
elif method == 'PUT':
if content_length is None:
raise Exception(
'No content-length sent for %s %s' % (method, path))
def subreq_iter():
left = content_length
while left > 0:
with exceptions.MessageTimeout(
self.app.client_timeout,
'updates content'):
chunk = self.fp.read(
min(left, self.app.network_chunk_size))
if not chunk:
raise exceptions.ChunkReadError(
'Early termination for %s %s' % (method, path))
left -= len(chunk)
yield chunk
subreq.environ['wsgi.input'] = utils.FileLikeIter(
subreq_iter())
raise Exception('No content-length sent for %s' % context)
subreq.environ['wsgi.input'] = self.input.make_subreq_input(
context, content_length)
else:
raise Exception('Invalid subrequest method %s' % method)
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
@@ -535,8 +600,8 @@ class Receiver(object):
successes += 1
else:
self.app.logger.warning(
'ssync subrequest failed with %s: %s %s (%s)' %
(resp.status_int, method, subreq.path, resp.body))
'ssync subrequest failed with %s: %s (%s)' %
(resp.status_int, context, resp.body))
failures += 1
if failures >= self.app.replication_failure_threshold and (
not successes or
@@ -546,8 +611,8 @@ class Receiver(object):
'Too many %d failures to %d successes' %
(failures, successes))
# The subreq may have failed, but we want to read the rest of the
# body from the remote side so we can continue on with the next
# subreq.
# body from the remote side so we can either detect a broken input
# or continue on with the next subreq.
for junk in subreq.environ['wsgi.input']:
pass
if updates % 5 == 0:

View File

@@ -991,18 +991,22 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.logger.get_lines_for_level('error')
tx_error_log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
tx_error_log_lines[0])
self.assertFalse(tx_error_log_lines[1:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertEqual(1, len(log_lines), self.rx_logger.all_log_lines())
eventlet.sleep(0.001)
rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertEqual(1, len(rx_warning_log_lines),
self.rx_logger.all_log_lines())
self.assertIn('ssync subrequest failed with 499',
log_lines[0])
self.assertFalse(log_lines[1:])
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
rx_warning_log_lines[0])
self.assertFalse(rx_warning_log_lines[1:])
rx_error_lines = self.rx_logger.get_lines_for_level('error')
self.assertEqual(1, len(rx_error_lines), rx_error_lines)
self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early '
'termination for PUT', rx_error_lines[0])
def test_sync_reconstructor_no_rebuilt_content(self):
# First fragment to sync gets no content in any response to
@@ -1026,17 +1030,20 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.logger.get_lines_for_level('error')
tx_error_log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
tx_error_log_lines[0])
self.assertFalse(tx_error_log_lines[1:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
eventlet.sleep(0.001)
rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertIn('ssync subrequest failed with 499',
log_lines[0])
self.assertFalse(log_lines[1:])
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
rx_warning_log_lines[0])
self.assertFalse(rx_warning_log_lines[1:])
rx_error_lines = self.rx_logger.get_lines_for_level('error')
self.assertEqual(1, len(rx_error_lines), rx_error_lines)
self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early '
'termination for PUT', rx_error_lines[0])
def test_sync_reconstructor_exception_during_rebuild(self):
# First fragment to sync has some reconstructor get responses raise
@@ -1071,18 +1078,21 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Error trying to rebuild', log_lines[0])
tx_error_log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Error trying to rebuild', tx_error_log_lines[0])
self.assertIn('Sent data length does not match content-length',
log_lines[1])
self.assertFalse(log_lines[2:])
tx_error_log_lines[1])
self.assertFalse(tx_error_log_lines[2:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
eventlet.sleep(0.001)
rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning')
self.assertIn('ssync subrequest failed with 499',
log_lines[0])
self.assertFalse(log_lines[1:])
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
rx_warning_log_lines[0])
self.assertFalse(rx_warning_log_lines[1:])
rx_error_lines = self.rx_logger.get_lines_for_level('error')
self.assertEqual(1, len(rx_error_lines), rx_error_lines)
self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early '
'termination for PUT', rx_error_lines[0])
def test_sync_reconstructor_no_responses(self):
# First fragment to sync gets no responses for reconstructor to rebuild
@@ -1131,7 +1141,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Unable to get enough responses', log_lines[0])
# trampoline for the receiver to write a log
eventlet.sleep(0)
eventlet.sleep(0.001)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
@@ -1234,7 +1244,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
fd.read())
# trampoline for the receiver to write a log
eventlet.sleep(0)
eventlet.sleep(0.001)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
@@ -1270,7 +1280,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
self.assertFalse(
self.logger.get_lines_for_level('error'))
# trampoline for the receiver to write a log
eventlet.sleep(0)
eventlet.sleep(0.001)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
self.assertFalse(self.rx_logger.get_lines_for_level('error'))

View File

@@ -20,19 +20,24 @@ import tempfile
import unittest
import eventlet
import eventlet.wsgi
from unittest import mock
import itertools
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import swob
from swift.common.exceptions import MessageTimeout, ChunkReadError
from swift.common.storage_policy import POLICIES
from swift.common import utils
from swift.common.swob import HTTPException
from swift.common.swob import HTTPException, HTTPCreated, Request, \
HTTPNoContent
from swift.common.utils import public
from swift.obj import diskfile
from swift.obj import server
from swift.obj import ssync_receiver, ssync_sender
from swift.obj.reconstructor import ObjectReconstructor
from swift.obj.ssync_receiver import SsyncInputProxy
from test import listen_zero, unit
from test.debug_logger import debug_logger
@@ -44,6 +49,70 @@ from test.unit.obj.common import write_diskfile
UNPACK_ERR = b":ERROR: 0 'not enough values to unpack (expected 2, got 1)'"
class FakeController(server.ObjectController):
def __init__(self, conf, logger=None):
super().__init__(conf, logger)
self.requests = []
def __call__(self, environ, start_response):
self.requests.append(Request(environ))
return super().__call__(environ, start_response)
@public
def PUT(self, req):
b''.join(req.environ['wsgi.input'])
return HTTPCreated()
@public
def DELETE(self, req):
b''.join(req.environ['wsgi.input'])
return HTTPNoContent()
class SlowBytesIO(io.BytesIO):
"""
A BytesIO that will sleep once for sleep_time before reading the byte at
sleep_index. If a read or readline call is completed by the byte at
(sleep_index - 1) then the call returns without sleeping, and the sleep
will occur at the start of the next read or readline call.
"""
def __init__(self, value, sleep_index=-1, sleep_time=0.1):
io.BytesIO.__init__(self, value)
self.sleep_index = sleep_index
self.sleep_time = sleep_time
self.bytes_read = []
self.num_bytes_read = 0
def _read(self, size=-1, readline=False):
size = -1 if size is None else size
num_read = 0
data = b''
self.bytes_read.append(data)
while True:
if self.num_bytes_read == self.sleep_index:
self.sleep_index = -1
eventlet.sleep(self.sleep_time)
next_byte = io.BytesIO.read(self, 1)
data = data + next_byte
self.bytes_read[-1] = data
num_read += 1
self.num_bytes_read += 1
if len(data) < num_read:
break
if readline and data[-1:] == b'\n':
break
if 0 <= size <= num_read:
break
return data
def read(self, size=-1):
return self._read(size, False)
def readline(self, size=-1):
return self._read(size, True)
@unit.patch_policies()
class TestReceiver(unittest.TestCase):
@@ -498,7 +567,7 @@ class TestReceiver(unittest.TestCase):
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\nBad content is here')
':UPDATES: START\r\nBad content is here\n')
req.remote_addr = '1.2.3.4'
mock_wsgi_input = _Wrapper(req.body)
req.environ['wsgi.input'] = mock_wsgi_input
@@ -533,7 +602,7 @@ class TestReceiver(unittest.TestCase):
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
':UPDATES: START\r\nBad content is here')
':UPDATES: START\r\nBad content is here\n')
req.remote_addr = mock.MagicMock()
req.remote_addr.__str__ = mock.Mock(
side_effect=Exception("can't stringify this"))
@@ -633,6 +702,22 @@ class TestReceiver(unittest.TestCase):
self.controller.logger.exception.assert_called_once_with(
'3.4.5.6/sda1/1 EXCEPTION in ssync.Receiver')
def test_MISSING_CHECK_partial_line(self):
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC'},
# not sure this would ever be yielded by the wsgi input since the
# bytes read wouldn't match the chunk size that was sent
body=':MISSING_CHECK: START\r\nhash no_newline'
)
resp = req.get_response(self.controller)
self.assertFalse(self.body_lines(resp.body))
self.assertEqual(resp.status_int, 200)
lines = self.logger.get_lines_for_level('error')
self.assertEqual(
['None/sda1/1 read failed in ssync.Receiver: missing_check line: '
'missing newline'], lines)
def test_MISSING_CHECK_empty_list(self):
self.controller.logger = mock.MagicMock()
@@ -1308,6 +1393,133 @@ class TestReceiver(unittest.TestCase):
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
'0.01 seconds: updates line')
def test_UPDATES_timeout_reading_PUT_subreq_input_1(self):
# timeout reading from wsgi input part way through a PUT subreq body
body_chunks = [
':MISSING_CHECK: START\r\n',
':MISSING_CHECK: END\r\n',
':UPDATES: START\r\n',
'PUT /a/c/o\r\nContent-Length: 28\r\n\r\n',
'body_chunk_one',
'body_chunk_two',
':UPDATES: END\r\n',
''
]
chunked_body = ''.join([
'%x\r\n%s\r\n' % (len(line), line) for line in body_chunks
])
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
body=chunked_body)
req.remote_addr = '2.3.4.5'
slow_down_index = chunked_body.find('chunk_one')
slow_io = SlowBytesIO(req.body, sleep_index=slow_down_index)
wsgi_input = eventlet.wsgi.Input(
rfile=slow_io, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
req.environ['wsgi.input'] = wsgi_input
controller = FakeController(self.conf, logger=self.logger)
controller.client_timeout = 0.01
with mock.patch.object(
ssync_receiver.eventlet.greenio, 'shutdown_safe') as \
mock_shutdown_safe:
resp = req.get_response(controller)
resp_body_lines = self.body_lines(resp.body)
self.assertEqual(resp.status_int, 200)
self.assertEqual(
[b':MISSING_CHECK: START',
b':MISSING_CHECK: END',
b":ERROR: 408 '0.01 seconds: PUT /a/c/o'"], resp_body_lines)
self.assertEqual([
b'17\r\n',
b':MISSING_CHECK: START\r\n',
b'\r\n',
b'15\r\n',
b':MISSING_CHECK: END\r\n',
b'\r\n',
b'11\r\n',
b':UPDATES: START\r\n',
b'\r\n',
b'22\r\n',
b'PUT /a/c/o\r\n',
b'Content-Length: 28\r\n',
b'\r\n',
b'\r\n',
b'e\r\n',
b'body_',
], slow_io.bytes_read)
# oops,the subreq body was not drained
self.assertEqual(
b'chunk_one\r\ne\r\nbody_chunk_two\r\n'
b'f\r\n:UPDATES: END\r\n\r\n'
b'0\r\n\r\n', slow_io.read())
mock_shutdown_safe.assert_called_once_with(
wsgi_input.get_socket())
self.assertTrue(wsgi_input.get_socket().closed)
log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(
['ERROR __call__ error with PUT /device/partition/a/c/o : '
'MessageTimeout (0.01s) PUT /a/c/o',
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
'0.01 seconds: PUT /a/c/o'],
log_lines)
def test_UPDATES_timeout_reading_PUT_subreq_input_2(self):
# timeout immediately before reading PUT subreq chunk content
body_chunks = [
':MISSING_CHECK: START\r\n',
':MISSING_CHECK: END\r\n',
':UPDATES: START\r\n',
'PUT /a/c/o\r\nContent-Length: 99\r\n\r\n',
'first body chunk',
# NB: this is still the PUT subreq body, it just happens to look
# like the start of another subreq...
'DELETE /in/second/body chunk\r\n'
'X-Timestamp: 123456789.12345\r\nContent-Length: 0\r\n\r\n',
':UPDATES: END\r\n',
]
chunked_body = ''.join([
'%x\r\n%s\r\n' % (len(line), line) for line in body_chunks
])
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
body=chunked_body)
req.remote_addr = '2.3.4.5'
slow_down_index = chunked_body.find('DELETE /in/second/body chunk')
slow_io = SlowBytesIO(req.body, sleep_index=slow_down_index)
wsgi_input = eventlet.wsgi.Input(
rfile=slow_io, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
req.environ['wsgi.input'] = wsgi_input
controller = FakeController(self.conf, logger=self.logger)
controller.client_timeout = 0.01
with mock.patch.object(
ssync_receiver.eventlet.greenio, 'shutdown_safe') as \
mock_shutdown_safe:
resp = req.get_response(controller)
resp_body_lines = self.body_lines(resp.body)
self.assertEqual(resp.status_int, 200)
self.assertEqual(['SSYNC', 'PUT'],
[req.method for req in controller.requests])
self.assertEqual(chunked_body.encode('utf-8')[:slow_down_index],
b''.join(slow_io.bytes_read))
self.assertEqual([
b':MISSING_CHECK: START',
b':MISSING_CHECK: END',
b":ERROR: 408 '0.01 seconds: PUT /a/c/o'"], resp_body_lines)
mock_shutdown_safe.assert_called_once_with(
wsgi_input.get_socket())
self.assertTrue(wsgi_input.get_socket().closed)
log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(
['ERROR __call__ error with PUT /device/partition/a/c/o : '
'MessageTimeout (0.01s) PUT /a/c/o',
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
'0.01 seconds: PUT /a/c/o'],
log_lines)
def test_UPDATES_other_exception(self):
class _Wrapper(io.BytesIO):
@@ -1391,8 +1603,7 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(mock_shutdown_safe.called)
self.assertFalse(mock_wsgi_input.mock_socket.close.called)
def test_UPDATES_bad_subrequest_line(self):
self.controller.logger = mock.MagicMock()
def test_UPDATES_bad_subrequest_line_1(self):
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
@@ -1405,13 +1616,16 @@ class TestReceiver(unittest.TestCase):
[b':MISSING_CHECK: START', b':MISSING_CHECK: END',
UNPACK_ERR])
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in ssync.Receiver')
lines = self.logger.get_lines_for_level('error')
self.assertEqual(
['None/device/partition EXCEPTION in ssync.Receiver: '], lines)
def test_UPDATES_bad_subrequest_line_2(self):
# If there's no line feed, we probably read a partial buffer
# because the client hung up
with mock.patch.object(
self.controller, 'DELETE',
return_value=swob.HTTPNoContent()):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
@@ -1424,11 +1638,14 @@ class TestReceiver(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(
self.body_lines(resp.body),
[b':MISSING_CHECK: START', b':MISSING_CHECK: END',
UNPACK_ERR])
[b':MISSING_CHECK: START', b':MISSING_CHECK: END'])
# Since the client (presumably) hung up, no point in sending
# anything about the error
self.assertEqual(resp.status_int, 200)
self.controller.logger.exception.assert_called_once_with(
'None/device/partition EXCEPTION in ssync.Receiver')
lines = self.logger.get_lines_for_level('error')
self.assertEqual(
['None/device/partition read failed in ssync.Receiver: '
'updates line: missing newline'], lines)
def test_UPDATES_no_headers(self):
self.controller.logger = mock.MagicMock()
@@ -2675,5 +2892,162 @@ class TestModuleMethods(unittest.TestCase):
expected)
class TestSsyncInputProxy(unittest.TestCase):
def test_read_line(self):
body = io.BytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n'
b'10\r\nDELETE /a/c/o1\r\n\r\n'
b'13\r\nDELETE /a/c/oh my\r\n\r\n')
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=60)
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
self.assertEqual(b'DELETE /a/c/o1\r\n', inpt.read_line('ctxt'))
self.assertEqual(b'DELETE /a/c/oh my\r\n', inpt.read_line('ctxt'))
def test_read_line_timeout(self):
body = SlowBytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n'
b'10\r\nDELETE /a/c/o1\r\n\r\n',
# timeout reading second line...
sleep_index=23)
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01)
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
with self.assertRaises(MessageTimeout) as cm:
inpt.read_line('ctxt')
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
# repeat
with self.assertRaises(MessageTimeout) as cm:
inpt.read_line('ctxt')
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
# check subreq input will also fail
sub_input = inpt.make_subreq_input('ctxt2', 123)
with self.assertRaises(MessageTimeout) as cm:
sub_input.read()
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
def test_read_line_chunk_read_error(self):
body = SlowBytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n'
# bad chunk length...
b'x\r\nDELETE /a/c/o1\r\n\r\n',
sleep_index=23)
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01)
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
with self.assertRaises(ChunkReadError) as cm:
inpt.read_line('ctxt')
self.assertEqual(
"ctxt: invalid literal for int() with base 16: b'x\\r\\n'",
str(cm.exception))
# repeat
with self.assertRaises(ChunkReadError) as cm:
inpt.read_line('ctxt')
self.assertEqual(
"ctxt: invalid literal for int() with base 16: b'x\\r\\n'",
str(cm.exception))
# check subreq input will also fail
sub_input = inpt.make_subreq_input('ctxt2', 123)
with self.assertRaises(ChunkReadError) as cm:
sub_input.read()
self.assertEqual(
"ctxt: invalid literal for int() with base 16: b'x\\r\\n'",
str(cm.exception))
def test_read_line_protocol_error(self):
body = io.BytesIO(
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'15\r\n:MISSING_CHECK: END\r\n\r\n'
b'11\r\n:UPDATES: START\r\n\r\n'
b'd\r\n:UPDATES: END\r\n' # note: chunk is missing its newline
b'0\r\n\r\n'
)
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01)
self.assertEqual(b':MISSING_CHECK: START\r\n', inpt.read_line('ctxt'))
self.assertEqual(b':MISSING_CHECK: END\r\n', inpt.read_line('ctxt'))
self.assertEqual(b':UPDATES: START\r\n', inpt.read_line('ctxt'))
with self.assertRaises(ChunkReadError) as cm:
inpt.read_line('ctxt')
self.assertEqual('ctxt: missing newline', str(cm.exception))
def test_subreq_input(self):
body = io.BytesIO(b'1a\r\nchunk1 \r\n'
b'1b\r\nchunktwo \r\n'
b'1c\r\nchunkthree \r\n'
b'f\r\nDELETE /a/c/o\r\n\r\n')
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
self.assertEqual(b'chunk1 '
b'chunktwo '
b'chunkthree ',
sub_input.read())
# check next read_line (note: chunk_size needs to be big enough to read
# whole ssync protocol 'line'
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
def test_subreq_input_content_length_less_than_body(self):
body = io.BytesIO(b'1a\r\nchunk1 \r\n'
b'1b\r\nchunktwo \r\n')
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
sub_input = inpt.make_subreq_input('ctxt', content_length=3)
self.assertEqual(b'chu', sub_input.read())
def test_subreq_input_content_length_more_than_body(self):
body = io.BytesIO(b'1a\r\nchunk1 \r\n')
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
with self.assertRaises(ChunkReadError) as cm:
sub_input.read()
self.assertEqual("ctxt: invalid literal for int() with base 16: b''",
str(cm.exception))
def test_subreq_input_early_termination(self):
body = io.BytesIO(b'1a\r\nchunk1 \r\n'
b'0\r\n\r\n') # the sender disconnected
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
with self.assertRaises(ChunkReadError) as cm:
sub_input.read()
self.assertEqual('Early termination for ctxt', str(cm.exception))
def test_subreq_input_timeout(self):
body = SlowBytesIO(b'1a\r\nchunk1 \r\n'
b'1b\r\nchunktwo \r\n',
sleep_index=25)
wsgi_input = eventlet.wsgi.Input(
rfile=body, content_length=123, sock=mock.MagicMock(),
chunked_input=True)
inpt = SsyncInputProxy(wsgi_input, chunk_size=16, timeout=0.01)
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
self.assertEqual(b'chunk1 ', sub_input.read(16))
with self.assertRaises(MessageTimeout) as cm:
sub_input.read()
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
# repeat
self.assertEqual(b'', sub_input.read())
# check next read_line
with self.assertRaises(MessageTimeout) as cm:
inpt.read_line('ctxt2')
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
if __name__ == '__main__':
unittest.main()

View File

@@ -82,12 +82,15 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
class FakeConnection(object):
def __init__(self):
def __init__(self, sleeps=None):
self.sleeps = sleeps
self.sent = []
self.closed = False
def send(self, data):
self.sent.append(data)
if self.sleeps:
eventlet.sleep(self.sleeps.pop(0))
def close(self):
self.closed = True
@@ -791,18 +794,16 @@ class TestSender(BaseTest):
self.assertEqual(response.readline(), b'')
def test_missing_check_timeout_start(self):
connection = FakeConnection()
connection = FakeConnection(sleeps=[1])
response = FakeResponse()
self.sender.daemon.node_timeout = 0.01
self.assertFalse(self.sender.limited_by_max_objects)
with mock.patch.object(connection, 'send',
side_effect=lambda *args: eventlet.sleep(1)):
with self.assertRaises(exceptions.MessageTimeout) as cm:
self.sender.missing_check(connection, response)
with self.assertRaises(exceptions.MessageTimeout) as cm:
self.sender.missing_check(connection, response)
self.assertIn('0.01 seconds: missing_check start', str(cm.exception))
self.assertFalse(self.sender.limited_by_max_objects)
def test_missing_check_timeout_send_line(self):
def test_call_and_missing_check_timeout_send_line(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
yield (
'9d41d8cd98f00b204e9800998ecf0abc',
@@ -810,23 +811,36 @@ class TestSender(BaseTest):
yield (
'9d41d8cd98f00b204e9800998ecf0def',
{'ts_data': Timestamp(1380144471.00000)})
connection = FakeConnection()
response = FakeResponse()
# max_objects unlimited
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None,
node = {'replication_ip': '1.2.3.4',
'replication_port': 5678,
'device': 'sda1'}
self.sender = ssync_sender.Sender(self.daemon, node, self.job, None,
max_objects=0)
self.sender.daemon.node_timeout = 0.01
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
# arrange for timeout while sending first missing check item
self.sender.daemon.node_timeout = 0.01
connection = FakeConnection(sleeps=[0, 1])
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
self.sender.updates = mock.MagicMock()
self.assertFalse(self.sender.limited_by_max_objects)
sleeps = [0, 0, 1]
with mock.patch.object(
connection, 'send',
side_effect=lambda *args: eventlet.sleep(sleeps.pop(0))):
with self.assertRaises(exceptions.MessageTimeout) as cm:
self.sender.missing_check(connection, response)
self.assertIn('0.01 seconds: missing_check send line: '
'1 lines (57 bytes) sent', str(cm.exception))
success, candidates = self.sender()
self.assertFalse(success)
log_lines = self.daemon_logger.get_lines_for_level('error')
self.assertIn(
'1.2.3.4:5678/sda1/99 0.01 seconds: missing_check send line: '
'0 lines (0 bytes) sent', log_lines)
self.assertFalse(self.sender.limited_by_max_objects)
# only the first missing check item was sent, plus a disconnect line
self.assertEqual(
b''.join(connection.sent),
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
b'0\r\n\r\n')
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):