Reverted the wal+index code.
This commit is contained in:
@@ -86,6 +86,8 @@ class AccountController(object):
|
|||||||
return Response(status='507 %s is not mounted' % drive)
|
return Response(status='507 %s is not mounted' % drive)
|
||||||
broker = self._get_account_broker(drive, part, account)
|
broker = self._get_account_broker(drive, part, account)
|
||||||
if container: # put account container
|
if container: # put account container
|
||||||
|
if 'x-cf-trans-id' in req.headers:
|
||||||
|
broker.pending_timeout = 3
|
||||||
if req.headers.get('x-account-override-deleted', 'no').lower() != \
|
if req.headers.get('x-account-override-deleted', 'no').lower() != \
|
||||||
'yes' and broker.is_deleted():
|
'yes' and broker.is_deleted():
|
||||||
return HTTPNotFound(request=req)
|
return HTTPNotFound(request=req)
|
||||||
@@ -138,6 +140,9 @@ class AccountController(object):
|
|||||||
if self.mount_check and not check_mount(self.root, drive):
|
if self.mount_check and not check_mount(self.root, drive):
|
||||||
return Response(status='507 %s is not mounted' % drive)
|
return Response(status='507 %s is not mounted' % drive)
|
||||||
broker = self._get_account_broker(drive, part, account)
|
broker = self._get_account_broker(drive, part, account)
|
||||||
|
if not container:
|
||||||
|
broker.pending_timeout = 0.1
|
||||||
|
broker.stale_reads_ok = True
|
||||||
if broker.is_deleted():
|
if broker.is_deleted():
|
||||||
return HTTPNotFound(request=req)
|
return HTTPNotFound(request=req)
|
||||||
info = broker.get_info()
|
info = broker.get_info()
|
||||||
@@ -166,6 +171,8 @@ class AccountController(object):
|
|||||||
if self.mount_check and not check_mount(self.root, drive):
|
if self.mount_check and not check_mount(self.root, drive):
|
||||||
return Response(status='507 %s is not mounted' % drive)
|
return Response(status='507 %s is not mounted' % drive)
|
||||||
broker = self._get_account_broker(drive, part, account)
|
broker = self._get_account_broker(drive, part, account)
|
||||||
|
broker.pending_timeout = 0.1
|
||||||
|
broker.stale_reads_ok = True
|
||||||
if broker.is_deleted():
|
if broker.is_deleted():
|
||||||
return HTTPNotFound(request=req)
|
return HTTPNotFound(request=req)
|
||||||
info = broker.get_info()
|
info = broker.get_info()
|
||||||
|
@@ -27,14 +27,13 @@ import cPickle as pickle
|
|||||||
import errno
|
import errno
|
||||||
from random import randint
|
from random import randint
|
||||||
from tempfile import mkstemp
|
from tempfile import mkstemp
|
||||||
import traceback
|
|
||||||
|
|
||||||
from eventlet import sleep
|
from eventlet import sleep
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
|
||||||
from swift.common.utils import normalize_timestamp, renamer, \
|
from swift.common.utils import normalize_timestamp, renamer, \
|
||||||
mkdirs, lock_parent_directory
|
mkdirs, lock_parent_directory, fallocate
|
||||||
from swift.common.exceptions import LockTimeout
|
from swift.common.exceptions import LockTimeout
|
||||||
|
|
||||||
|
|
||||||
@@ -42,9 +41,8 @@ from swift.common.exceptions import LockTimeout
|
|||||||
BROKER_TIMEOUT = 25
|
BROKER_TIMEOUT = 25
|
||||||
#: Pickle protocol to use
|
#: Pickle protocol to use
|
||||||
PICKLE_PROTOCOL = 2
|
PICKLE_PROTOCOL = 2
|
||||||
CONNECT_ATTEMPTS = 4
|
#: Max number of pending entries
|
||||||
PENDING_COMMIT_TIMEOUT = 900
|
PENDING_CAP = 131072
|
||||||
AUTOCHECKPOINT = 8192
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseConnectionError(sqlite3.DatabaseError):
|
class DatabaseConnectionError(sqlite3.DatabaseError):
|
||||||
@@ -125,48 +123,48 @@ def get_db_connection(path, timeout=30, okay_to_create=False):
|
|||||||
:param okay_to_create: if True, create the DB if it doesn't exist
|
:param okay_to_create: if True, create the DB if it doesn't exist
|
||||||
:returns: DB connection object
|
:returns: DB connection object
|
||||||
"""
|
"""
|
||||||
# retry logic to address:
|
try:
|
||||||
# http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html
|
connect_time = time.time()
|
||||||
for attempt in xrange(CONNECT_ATTEMPTS):
|
conn = sqlite3.connect(path, check_same_thread=False,
|
||||||
try:
|
factory=GreenDBConnection, timeout=timeout)
|
||||||
connect_time = time.time()
|
if path != ':memory:' and not okay_to_create:
|
||||||
conn = sqlite3.connect(path, check_same_thread=False,
|
|
||||||
factory=GreenDBConnection, timeout=timeout)
|
|
||||||
# attempt to detect and fail when connect creates the db file
|
# attempt to detect and fail when connect creates the db file
|
||||||
if path != ':memory:' and not okay_to_create:
|
stat = os.stat(path)
|
||||||
stat = os.stat(path)
|
if stat.st_size == 0 and stat.st_ctime >= connect_time:
|
||||||
if stat.st_size == 0 and stat.st_ctime >= connect_time:
|
os.unlink(path)
|
||||||
os.unlink(path)
|
raise DatabaseConnectionError(path,
|
||||||
raise DatabaseConnectionError(path,
|
'DB file created by connect?')
|
||||||
'DB file created by connect?')
|
conn.row_factory = sqlite3.Row
|
||||||
conn.execute('PRAGMA journal_mode = WAL')
|
conn.text_factory = str
|
||||||
conn.execute('PRAGMA synchronous = NORMAL')
|
conn.execute('PRAGMA synchronous = NORMAL')
|
||||||
conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT)
|
conn.execute('PRAGMA count_changes = OFF')
|
||||||
conn.execute('PRAGMA count_changes = OFF')
|
conn.execute('PRAGMA temp_store = MEMORY')
|
||||||
conn.execute('PRAGMA temp_store = MEMORY')
|
conn.execute('PRAGMA journal_mode = DELETE')
|
||||||
conn.create_function('chexor', 3, chexor)
|
conn.create_function('chexor', 3, chexor)
|
||||||
conn.row_factory = sqlite3.Row
|
except sqlite3.DatabaseError:
|
||||||
conn.text_factory = str
|
import traceback
|
||||||
return conn
|
raise DatabaseConnectionError(path, traceback.format_exc(),
|
||||||
except sqlite3.DatabaseError, e:
|
timeout=timeout)
|
||||||
errstr = traceback.format_exc()
|
return conn
|
||||||
raise DatabaseConnectionError(path, errstr, timeout=timeout)
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseBroker(object):
|
class DatabaseBroker(object):
|
||||||
"""Encapsulates working with a database."""
|
"""Encapsulates working with a database."""
|
||||||
|
|
||||||
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
|
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
|
||||||
account=None, container=None):
|
account=None, container=None, pending_timeout=10,
|
||||||
|
stale_reads_ok=False):
|
||||||
""" Encapsulates working with a database. """
|
""" Encapsulates working with a database. """
|
||||||
self.conn = None
|
self.conn = None
|
||||||
self.db_file = db_file
|
self.db_file = db_file
|
||||||
|
self.pending_file = self.db_file + '.pending'
|
||||||
|
self.pending_timeout = pending_timeout
|
||||||
|
self.stale_reads_ok = stale_reads_ok
|
||||||
self.db_dir = os.path.dirname(db_file)
|
self.db_dir = os.path.dirname(db_file)
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.logger = logger or logging.getLogger()
|
self.logger = logger or logging.getLogger()
|
||||||
self.account = account
|
self.account = account
|
||||||
self.container = container
|
self.container = container
|
||||||
self._db_version = -1
|
|
||||||
|
|
||||||
def initialize(self, put_timestamp=None):
|
def initialize(self, put_timestamp=None):
|
||||||
"""
|
"""
|
||||||
@@ -235,7 +233,7 @@ class DatabaseBroker(object):
|
|||||||
conn.close()
|
conn.close()
|
||||||
with open(tmp_db_file, 'r+b') as fp:
|
with open(tmp_db_file, 'r+b') as fp:
|
||||||
os.fsync(fp.fileno())
|
os.fsync(fp.fileno())
|
||||||
with lock_parent_directory(self.db_file, self.timeout):
|
with lock_parent_directory(self.db_file, self.pending_timeout):
|
||||||
if os.path.exists(self.db_file):
|
if os.path.exists(self.db_file):
|
||||||
# It's as if there was a "condition" where different parts
|
# It's as if there was a "condition" where different parts
|
||||||
# of the system were "racing" each other.
|
# of the system were "racing" each other.
|
||||||
@@ -287,7 +285,6 @@ class DatabaseBroker(object):
|
|||||||
self.conn = None
|
self.conn = None
|
||||||
orig_isolation_level = conn.isolation_level
|
orig_isolation_level = conn.isolation_level
|
||||||
conn.isolation_level = None
|
conn.isolation_level = None
|
||||||
conn.execute('PRAGMA journal_mode = DELETE') # remove journal files
|
|
||||||
conn.execute('BEGIN IMMEDIATE')
|
conn.execute('BEGIN IMMEDIATE')
|
||||||
try:
|
try:
|
||||||
yield True
|
yield True
|
||||||
@@ -295,7 +292,6 @@ class DatabaseBroker(object):
|
|||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
conn.execute('ROLLBACK')
|
conn.execute('ROLLBACK')
|
||||||
conn.execute('PRAGMA journal_mode = WAL') # back to WAL mode
|
|
||||||
conn.isolation_level = orig_isolation_level
|
conn.isolation_level = orig_isolation_level
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -352,6 +348,11 @@ class DatabaseBroker(object):
|
|||||||
:param count: number to get
|
:param count: number to get
|
||||||
:returns: list of objects between start and end
|
:returns: list of objects between start and end
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
curs = conn.execute('''
|
curs = conn.execute('''
|
||||||
SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
|
SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
|
||||||
@@ -400,7 +401,11 @@ class DatabaseBroker(object):
|
|||||||
:returns: dict containing keys: hash, id, created_at, put_timestamp,
|
:returns: dict containing keys: hash, id, created_at, put_timestamp,
|
||||||
delete_timestamp, count, max_row, and metadata
|
delete_timestamp, count, max_row, and metadata
|
||||||
"""
|
"""
|
||||||
self._commit_puts()
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
query_part1 = '''
|
query_part1 = '''
|
||||||
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
|
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
|
||||||
%s_count AS count,
|
%s_count AS count,
|
||||||
@@ -450,6 +455,34 @@ class DatabaseBroker(object):
|
|||||||
(rec['sync_point'], rec['remote_id']))
|
(rec['sync_point'], rec['remote_id']))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
def _preallocate(self):
|
||||||
|
"""
|
||||||
|
The idea is to allocate space in front of an expanding db. If it gets
|
||||||
|
within 512k of a boundary, it allocates to the next boundary.
|
||||||
|
Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after.
|
||||||
|
"""
|
||||||
|
if self.db_file == ':memory:':
|
||||||
|
return
|
||||||
|
MB = (1024 * 1024)
|
||||||
|
|
||||||
|
def prealloc_points():
|
||||||
|
for pm in (1, 2, 5, 10, 25, 50):
|
||||||
|
yield pm * MB
|
||||||
|
while True:
|
||||||
|
pm += 50
|
||||||
|
yield pm * MB
|
||||||
|
|
||||||
|
stat = os.stat(self.db_file)
|
||||||
|
file_size = stat.st_size
|
||||||
|
allocated_size = stat.st_blocks * 512
|
||||||
|
for point in prealloc_points():
|
||||||
|
if file_size <= point - MB / 2:
|
||||||
|
prealloc_size = point
|
||||||
|
break
|
||||||
|
if allocated_size < prealloc_size:
|
||||||
|
with open(self.db_file, 'rb+') as fp:
|
||||||
|
fallocate(fp.fileno(), int(prealloc_size))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def metadata(self):
|
def metadata(self):
|
||||||
"""
|
"""
|
||||||
@@ -574,7 +607,7 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
conn.executescript("""
|
conn.executescript("""
|
||||||
CREATE TABLE object (
|
CREATE TABLE object (
|
||||||
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
name TEXT,
|
name TEXT UNIQUE,
|
||||||
created_at TEXT,
|
created_at TEXT,
|
||||||
size INTEGER,
|
size INTEGER,
|
||||||
content_type TEXT,
|
content_type TEXT,
|
||||||
@@ -582,7 +615,7 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
deleted INTEGER DEFAULT 0
|
deleted INTEGER DEFAULT 0
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
|
CREATE INDEX ix_object_deleted ON object (deleted);
|
||||||
|
|
||||||
CREATE TRIGGER object_insert AFTER INSERT ON object
|
CREATE TRIGGER object_insert AFTER INSERT ON object
|
||||||
BEGIN
|
BEGIN
|
||||||
@@ -645,15 +678,6 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
''', (self.account, self.container, normalize_timestamp(time.time()),
|
''', (self.account, self.container, normalize_timestamp(time.time()),
|
||||||
str(uuid4()), put_timestamp))
|
str(uuid4()), put_timestamp))
|
||||||
|
|
||||||
def _get_db_version(self, conn):
|
|
||||||
if self._db_version == -1:
|
|
||||||
self._db_version = 0
|
|
||||||
for row in conn.execute('''
|
|
||||||
SELECT name FROM sqlite_master
|
|
||||||
WHERE name = 'ix_object_deleted_name' '''):
|
|
||||||
self._db_version = 1
|
|
||||||
return self._db_version
|
|
||||||
|
|
||||||
def _newid(self, conn):
|
def _newid(self, conn):
|
||||||
conn.execute('''
|
conn.execute('''
|
||||||
UPDATE container_stat
|
UPDATE container_stat
|
||||||
@@ -693,6 +717,11 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: True if the database has no active objects, False otherwise
|
:returns: True if the database has no active objects, False otherwise
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute(
|
row = conn.execute(
|
||||||
'SELECT object_count from container_stat').fetchone()
|
'SELECT object_count from container_stat').fetchone()
|
||||||
@@ -700,16 +729,17 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
|
|
||||||
def _commit_puts(self, item_list=None):
|
def _commit_puts(self, item_list=None):
|
||||||
"""Handles commiting rows in .pending files."""
|
"""Handles commiting rows in .pending files."""
|
||||||
pending_file = self.db_file + '.pending'
|
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
|
||||||
if self.db_file == ':memory:' or not os.path.exists(pending_file):
|
|
||||||
return
|
|
||||||
if not os.path.getsize(pending_file):
|
|
||||||
os.unlink(pending_file)
|
|
||||||
return
|
return
|
||||||
if item_list is None:
|
if item_list is None:
|
||||||
item_list = []
|
item_list = []
|
||||||
with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
|
with lock_parent_directory(self.pending_file, self.pending_timeout):
|
||||||
with open(pending_file, 'r+b') as fp:
|
self._preallocate()
|
||||||
|
if not os.path.getsize(self.pending_file):
|
||||||
|
if item_list:
|
||||||
|
self.merge_items(item_list)
|
||||||
|
return
|
||||||
|
with open(self.pending_file, 'r+b') as fp:
|
||||||
for entry in fp.read().split(':'):
|
for entry in fp.read().split(':'):
|
||||||
if entry:
|
if entry:
|
||||||
try:
|
try:
|
||||||
@@ -722,11 +752,11 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
_('Invalid pending entry %(file)s: %(entry)s'),
|
_('Invalid pending entry %(file)s: %(entry)s'),
|
||||||
{'file': pending_file, 'entry': entry})
|
{'file': self.pending_file, 'entry': entry})
|
||||||
if item_list:
|
if item_list:
|
||||||
self.merge_items(item_list)
|
self.merge_items(item_list)
|
||||||
try:
|
try:
|
||||||
os.unlink(pending_file)
|
os.ftruncate(fp.fileno(), 0)
|
||||||
except OSError, err:
|
except OSError, err:
|
||||||
if err.errno != errno.ENOENT:
|
if err.errno != errno.ENOENT:
|
||||||
raise
|
raise
|
||||||
@@ -744,6 +774,7 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
delete
|
delete
|
||||||
:param sync_timestamp: max update_at timestamp of sync rows to delete
|
:param sync_timestamp: max update_at timestamp of sync rows to delete
|
||||||
"""
|
"""
|
||||||
|
self._commit_puts()
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
conn.execute("""
|
conn.execute("""
|
||||||
DELETE FROM object
|
DELETE FROM object
|
||||||
@@ -787,9 +818,30 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
record = {'name': name, 'created_at': timestamp, 'size': size,
|
record = {'name': name, 'created_at': timestamp, 'size': size,
|
||||||
'content_type': content_type, 'etag': etag,
|
'content_type': content_type, 'etag': etag,
|
||||||
'deleted': deleted}
|
'deleted': deleted}
|
||||||
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
if self.db_file == ':memory:':
|
||||||
|
self.merge_items([record])
|
||||||
|
return
|
||||||
|
if not os.path.exists(self.db_file):
|
||||||
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
|
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
|
||||||
self.merge_items([record])
|
pending_size = 0
|
||||||
|
try:
|
||||||
|
pending_size = os.path.getsize(self.pending_file)
|
||||||
|
except OSError, err:
|
||||||
|
if err.errno != errno.ENOENT:
|
||||||
|
raise
|
||||||
|
if pending_size > PENDING_CAP:
|
||||||
|
self._commit_puts([record])
|
||||||
|
else:
|
||||||
|
with lock_parent_directory(
|
||||||
|
self.pending_file, self.pending_timeout):
|
||||||
|
with open(self.pending_file, 'a+b') as fp:
|
||||||
|
# Colons aren't used in base64 encoding; so they are our
|
||||||
|
# delimiter
|
||||||
|
fp.write(':')
|
||||||
|
fp.write(pickle.dumps(
|
||||||
|
(name, timestamp, size, content_type, etag, deleted),
|
||||||
|
protocol=PICKLE_PROTOCOL).encode('base64'))
|
||||||
|
fp.flush()
|
||||||
|
|
||||||
def is_deleted(self, timestamp=None):
|
def is_deleted(self, timestamp=None):
|
||||||
"""
|
"""
|
||||||
@@ -799,6 +851,11 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
"""
|
"""
|
||||||
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
||||||
return True
|
return True
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute('''
|
row = conn.execute('''
|
||||||
SELECT put_timestamp, delete_timestamp, object_count
|
SELECT put_timestamp, delete_timestamp, object_count
|
||||||
@@ -821,6 +878,11 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
reported_put_timestamp, reported_delete_timestamp,
|
reported_put_timestamp, reported_delete_timestamp,
|
||||||
reported_object_count, reported_bytes_used, hash, id)
|
reported_object_count, reported_bytes_used, hash, id)
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
return conn.execute('''
|
return conn.execute('''
|
||||||
SELECT account, container, created_at, put_timestamp,
|
SELECT account, container, created_at, put_timestamp,
|
||||||
@@ -857,6 +919,11 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: list of object names
|
:returns: list of object names
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
rv = []
|
rv = []
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute('''
|
row = conn.execute('''
|
||||||
@@ -893,6 +960,11 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
:returns: list of tuples of (name, created_at, size, content_type,
|
:returns: list of tuples of (name, created_at, size, content_type,
|
||||||
etag)
|
etag)
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
if path is not None:
|
if path is not None:
|
||||||
prefix = path
|
prefix = path
|
||||||
if path:
|
if path:
|
||||||
@@ -916,10 +988,7 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
elif prefix:
|
elif prefix:
|
||||||
query += ' name >= ? AND'
|
query += ' name >= ? AND'
|
||||||
query_args.append(prefix)
|
query_args.append(prefix)
|
||||||
if self._get_db_version(conn) < 1:
|
query += ' +deleted = 0 ORDER BY name LIMIT ?'
|
||||||
query += ' +deleted = 0 ORDER BY name LIMIT ?'
|
|
||||||
else:
|
|
||||||
query += ' deleted = 0 ORDER BY name LIMIT ?'
|
|
||||||
query_args.append(limit - len(results))
|
query_args.append(limit - len(results))
|
||||||
curs = conn.execute(query, query_args)
|
curs = conn.execute(query, query_args)
|
||||||
curs.row_factory = None
|
curs.row_factory = None
|
||||||
@@ -967,19 +1036,18 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
max_rowid = -1
|
max_rowid = -1
|
||||||
for rec in item_list:
|
for rec in item_list:
|
||||||
conn.execute('''
|
conn.execute('''
|
||||||
DELETE FROM object WHERE name = ? AND created_at < ? AND
|
DELETE FROM object WHERE name = ? AND
|
||||||
deleted IN (0, 1)
|
(created_at < ?)
|
||||||
''', (rec['name'], rec['created_at']))
|
''', (rec['name'], rec['created_at']))
|
||||||
if not conn.execute('''
|
try:
|
||||||
SELECT name FROM object WHERE name = ? AND
|
|
||||||
deleted IN (0, 1)
|
|
||||||
''', (rec['name'],)).fetchall():
|
|
||||||
conn.execute('''
|
conn.execute('''
|
||||||
INSERT INTO object (name, created_at, size,
|
INSERT INTO object (name, created_at, size,
|
||||||
content_type, etag, deleted)
|
content_type, etag, deleted)
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
''', ([rec['name'], rec['created_at'], rec['size'],
|
''', ([rec['name'], rec['created_at'], rec['size'],
|
||||||
rec['content_type'], rec['etag'], rec['deleted']]))
|
rec['content_type'], rec['etag'], rec['deleted']]))
|
||||||
|
except sqlite3.IntegrityError:
|
||||||
|
pass
|
||||||
if source:
|
if source:
|
||||||
max_rowid = max(max_rowid, rec['ROWID'])
|
max_rowid = max(max_rowid, rec['ROWID'])
|
||||||
if source:
|
if source:
|
||||||
@@ -1023,7 +1091,7 @@ class AccountBroker(DatabaseBroker):
|
|||||||
conn.executescript("""
|
conn.executescript("""
|
||||||
CREATE TABLE container (
|
CREATE TABLE container (
|
||||||
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
name TEXT,
|
name TEXT UNIQUE,
|
||||||
put_timestamp TEXT,
|
put_timestamp TEXT,
|
||||||
delete_timestamp TEXT,
|
delete_timestamp TEXT,
|
||||||
object_count INTEGER,
|
object_count INTEGER,
|
||||||
@@ -1031,9 +1099,8 @@ class AccountBroker(DatabaseBroker):
|
|||||||
deleted INTEGER DEFAULT 0
|
deleted INTEGER DEFAULT 0
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX ix_container_deleted_name ON
|
CREATE INDEX ix_container_deleted ON container (deleted);
|
||||||
container (deleted, name);
|
CREATE INDEX ix_container_name ON container (name);
|
||||||
|
|
||||||
CREATE TRIGGER container_insert AFTER INSERT ON container
|
CREATE TRIGGER container_insert AFTER INSERT ON container
|
||||||
BEGIN
|
BEGIN
|
||||||
UPDATE account_stat
|
UPDATE account_stat
|
||||||
@@ -1097,15 +1164,6 @@ class AccountBroker(DatabaseBroker):
|
|||||||
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
|
''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
|
||||||
put_timestamp))
|
put_timestamp))
|
||||||
|
|
||||||
def _get_db_version(self, conn):
|
|
||||||
if self._db_version == -1:
|
|
||||||
self._db_version = 0
|
|
||||||
for row in conn.execute('''
|
|
||||||
SELECT name FROM sqlite_master
|
|
||||||
WHERE name = 'ix_container_deleted_name' '''):
|
|
||||||
self._db_version = 1
|
|
||||||
return self._db_version
|
|
||||||
|
|
||||||
def update_put_timestamp(self, timestamp):
|
def update_put_timestamp(self, timestamp):
|
||||||
"""
|
"""
|
||||||
Update the put_timestamp. Only modifies it if it is greater than
|
Update the put_timestamp. Only modifies it if it is greater than
|
||||||
@@ -1135,16 +1193,17 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
def _commit_puts(self, item_list=None):
|
def _commit_puts(self, item_list=None):
|
||||||
"""Handles commiting rows in .pending files."""
|
"""Handles commiting rows in .pending files."""
|
||||||
pending_file = self.db_file + '.pending'
|
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
|
||||||
if self.db_file == ':memory:' or not os.path.exists(pending_file):
|
|
||||||
return
|
|
||||||
if not os.path.getsize(pending_file):
|
|
||||||
os.unlink(pending_file)
|
|
||||||
return
|
return
|
||||||
if item_list is None:
|
if item_list is None:
|
||||||
item_list = []
|
item_list = []
|
||||||
with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT):
|
with lock_parent_directory(self.pending_file, self.pending_timeout):
|
||||||
with open(pending_file, 'r+b') as fp:
|
self._preallocate()
|
||||||
|
if not os.path.getsize(self.pending_file):
|
||||||
|
if item_list:
|
||||||
|
self.merge_items(item_list)
|
||||||
|
return
|
||||||
|
with open(self.pending_file, 'r+b') as fp:
|
||||||
for entry in fp.read().split(':'):
|
for entry in fp.read().split(':'):
|
||||||
if entry:
|
if entry:
|
||||||
try:
|
try:
|
||||||
@@ -1160,11 +1219,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
_('Invalid pending entry %(file)s: %(entry)s'),
|
_('Invalid pending entry %(file)s: %(entry)s'),
|
||||||
{'file': pending_file, 'entry': entry})
|
{'file': self.pending_file, 'entry': entry})
|
||||||
if item_list:
|
if item_list:
|
||||||
self.merge_items(item_list)
|
self.merge_items(item_list)
|
||||||
try:
|
try:
|
||||||
os.unlink(pending_file)
|
os.ftruncate(fp.fileno(), 0)
|
||||||
except OSError, err:
|
except OSError, err:
|
||||||
if err.errno != errno.ENOENT:
|
if err.errno != errno.ENOENT:
|
||||||
raise
|
raise
|
||||||
@@ -1175,6 +1234,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: True if the database has no active containers.
|
:returns: True if the database has no active containers.
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute(
|
row = conn.execute(
|
||||||
'SELECT container_count from account_stat').fetchone()
|
'SELECT container_count from account_stat').fetchone()
|
||||||
@@ -1194,6 +1258,7 @@ class AccountBroker(DatabaseBroker):
|
|||||||
:param sync_timestamp: max update_at timestamp of sync rows to delete
|
:param sync_timestamp: max update_at timestamp of sync rows to delete
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self._commit_puts()
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
conn.execute('''
|
conn.execute('''
|
||||||
DELETE FROM container WHERE
|
DELETE FROM container WHERE
|
||||||
@@ -1221,6 +1286,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: put_timestamp of the container
|
:returns: put_timestamp of the container
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
ret = conn.execute('''
|
ret = conn.execute('''
|
||||||
SELECT put_timestamp FROM container
|
SELECT put_timestamp FROM container
|
||||||
@@ -1241,8 +1311,6 @@ class AccountBroker(DatabaseBroker):
|
|||||||
:param object_count: number of objects in the container
|
:param object_count: number of objects in the container
|
||||||
:param bytes_used: number of bytes used by the container
|
:param bytes_used: number of bytes used by the container
|
||||||
"""
|
"""
|
||||||
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
|
||||||
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
|
|
||||||
if delete_timestamp > put_timestamp and \
|
if delete_timestamp > put_timestamp and \
|
||||||
object_count in (None, '', 0, '0'):
|
object_count in (None, '', 0, '0'):
|
||||||
deleted = 1
|
deleted = 1
|
||||||
@@ -1253,7 +1321,24 @@ class AccountBroker(DatabaseBroker):
|
|||||||
'object_count': object_count,
|
'object_count': object_count,
|
||||||
'bytes_used': bytes_used,
|
'bytes_used': bytes_used,
|
||||||
'deleted': deleted}
|
'deleted': deleted}
|
||||||
self.merge_items([record])
|
if self.db_file == ':memory:':
|
||||||
|
self.merge_items([record])
|
||||||
|
return
|
||||||
|
commit = False
|
||||||
|
with lock_parent_directory(self.pending_file, self.pending_timeout):
|
||||||
|
with open(self.pending_file, 'a+b') as fp:
|
||||||
|
# Colons aren't used in base64 encoding; so they are our
|
||||||
|
# delimiter
|
||||||
|
fp.write(':')
|
||||||
|
fp.write(pickle.dumps(
|
||||||
|
(name, put_timestamp, delete_timestamp, object_count,
|
||||||
|
bytes_used, deleted),
|
||||||
|
protocol=PICKLE_PROTOCOL).encode('base64'))
|
||||||
|
fp.flush()
|
||||||
|
if fp.tell() > PENDING_CAP:
|
||||||
|
commit = True
|
||||||
|
if commit:
|
||||||
|
self._commit_puts()
|
||||||
|
|
||||||
def can_delete_db(self, cutoff):
|
def can_delete_db(self, cutoff):
|
||||||
"""
|
"""
|
||||||
@@ -1261,6 +1346,7 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: True if the account can be deleted, False otherwise
|
:returns: True if the account can be deleted, False otherwise
|
||||||
"""
|
"""
|
||||||
|
self._commit_puts()
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute('''
|
row = conn.execute('''
|
||||||
SELECT status, put_timestamp, delete_timestamp, container_count
|
SELECT status, put_timestamp, delete_timestamp, container_count
|
||||||
@@ -1286,6 +1372,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
"""
|
"""
|
||||||
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
|
||||||
return True
|
return True
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute('''
|
row = conn.execute('''
|
||||||
SELECT put_timestamp, delete_timestamp, container_count, status
|
SELECT put_timestamp, delete_timestamp, container_count, status
|
||||||
@@ -1310,6 +1401,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
delete_timestamp, container_count, object_count,
|
delete_timestamp, container_count, object_count,
|
||||||
bytes_used, hash, id)
|
bytes_used, hash, id)
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
return conn.execute('''
|
return conn.execute('''
|
||||||
SELECT account, created_at, put_timestamp, delete_timestamp,
|
SELECT account, created_at, put_timestamp, delete_timestamp,
|
||||||
@@ -1326,6 +1422,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: list of container names
|
:returns: list of container names
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
rv = []
|
rv = []
|
||||||
with self.get() as conn:
|
with self.get() as conn:
|
||||||
row = conn.execute('''
|
row = conn.execute('''
|
||||||
@@ -1359,6 +1460,11 @@ class AccountBroker(DatabaseBroker):
|
|||||||
|
|
||||||
:returns: list of tuples of (name, object_count, bytes_used, 0)
|
:returns: list of tuples of (name, object_count, bytes_used, 0)
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
self._commit_puts()
|
||||||
|
except LockTimeout:
|
||||||
|
if not self.stale_reads_ok:
|
||||||
|
raise
|
||||||
if delimiter and not prefix:
|
if delimiter and not prefix:
|
||||||
prefix = ''
|
prefix = ''
|
||||||
orig_marker = marker
|
orig_marker = marker
|
||||||
@@ -1379,10 +1485,7 @@ class AccountBroker(DatabaseBroker):
|
|||||||
elif prefix:
|
elif prefix:
|
||||||
query += ' name >= ? AND'
|
query += ' name >= ? AND'
|
||||||
query_args.append(prefix)
|
query_args.append(prefix)
|
||||||
if self._get_db_version(conn) < 1:
|
query += ' +deleted = 0 ORDER BY name LIMIT ?'
|
||||||
query += ' +deleted = 0 ORDER BY name LIMIT ?'
|
|
||||||
else:
|
|
||||||
query += ' deleted = 0 ORDER BY name LIMIT ?'
|
|
||||||
query_args.append(limit - len(results))
|
query_args.append(limit - len(results))
|
||||||
curs = conn.execute(query, query_args)
|
curs = conn.execute(query, query_args)
|
||||||
curs.row_factory = None
|
curs.row_factory = None
|
||||||
@@ -1426,39 +1529,51 @@ class AccountBroker(DatabaseBroker):
|
|||||||
record = [rec['name'], rec['put_timestamp'],
|
record = [rec['name'], rec['put_timestamp'],
|
||||||
rec['delete_timestamp'], rec['object_count'],
|
rec['delete_timestamp'], rec['object_count'],
|
||||||
rec['bytes_used'], rec['deleted']]
|
rec['bytes_used'], rec['deleted']]
|
||||||
curs = conn.execute('''
|
try:
|
||||||
SELECT name, put_timestamp, delete_timestamp,
|
conn.execute('''
|
||||||
object_count, bytes_used, deleted
|
INSERT INTO container (name, put_timestamp,
|
||||||
FROM container WHERE name = ? AND
|
delete_timestamp, object_count, bytes_used,
|
||||||
deleted IN (0, 1)
|
deleted)
|
||||||
''', (rec['name'],))
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
curs.row_factory = None
|
''', record)
|
||||||
row = curs.fetchone()
|
except sqlite3.IntegrityError:
|
||||||
if row:
|
curs = conn.execute('''
|
||||||
row = list(row)
|
SELECT name, put_timestamp, delete_timestamp,
|
||||||
for i in xrange(5):
|
object_count, bytes_used, deleted
|
||||||
if record[i] is None and row[i] is not None:
|
FROM container WHERE name = ? AND
|
||||||
record[i] = row[i]
|
(put_timestamp < ? OR delete_timestamp < ? OR
|
||||||
if row[1] > record[1]: # Keep newest put_timestamp
|
object_count != ? OR bytes_used != ?)''',
|
||||||
record[1] = row[1]
|
(rec['name'], rec['put_timestamp'],
|
||||||
if row[2] > record[2]: # Keep newest delete_timestamp
|
rec['delete_timestamp'], rec['object_count'],
|
||||||
record[2] = row[2]
|
rec['bytes_used']))
|
||||||
# If deleted, mark as such
|
curs.row_factory = None
|
||||||
if record[2] > record[1] and \
|
row = curs.fetchone()
|
||||||
record[3] in (None, '', 0, '0'):
|
if row:
|
||||||
record[5] = 1
|
row = list(row)
|
||||||
else:
|
for i in xrange(5):
|
||||||
record[5] = 0
|
if record[i] is None and row[i] is not None:
|
||||||
conn.execute('''
|
record[i] = row[i]
|
||||||
DELETE FROM container WHERE name = ? AND
|
if row[1] > record[1]: # Keep newest put_timestamp
|
||||||
deleted IN (0, 1)
|
record[1] = row[1]
|
||||||
''', (record[0],))
|
if row[2] > record[2]: # Keep newest delete_timestamp
|
||||||
conn.execute('''
|
record[2] = row[2]
|
||||||
INSERT INTO container (name, put_timestamp,
|
conn.execute('DELETE FROM container WHERE name = ?',
|
||||||
delete_timestamp, object_count, bytes_used,
|
(record[0],))
|
||||||
deleted)
|
# If deleted, mark as such
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
if record[2] > record[1] and \
|
||||||
''', record)
|
record[3] in (None, '', 0, '0'):
|
||||||
|
record[5] = 1
|
||||||
|
else:
|
||||||
|
record[5] = 0
|
||||||
|
try:
|
||||||
|
conn.execute('''
|
||||||
|
INSERT INTO container (name, put_timestamp,
|
||||||
|
delete_timestamp, object_count, bytes_used,
|
||||||
|
deleted)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
''', record)
|
||||||
|
except sqlite3.IntegrityError:
|
||||||
|
continue
|
||||||
if source:
|
if source:
|
||||||
max_rowid = max(max_rowid, rec['ROWID'])
|
max_rowid = max(max_rowid, rec['ROWID'])
|
||||||
if source:
|
if source:
|
||||||
|
@@ -180,9 +180,7 @@ class Replicator(Daemon):
|
|||||||
return False
|
return False
|
||||||
# perform block-level sync if the db was modified during the first sync
|
# perform block-level sync if the db was modified during the first sync
|
||||||
if os.path.exists(broker.db_file + '-journal') or \
|
if os.path.exists(broker.db_file + '-journal') or \
|
||||||
os.path.exists(broker.db_file + '-wal') or \
|
os.path.getmtime(broker.db_file) > mtime:
|
||||||
os.path.exists(broker.db_file + '-shm') or \
|
|
||||||
os.path.getmtime(broker.db_file) > mtime:
|
|
||||||
# grab a lock so nobody else can modify it
|
# grab a lock so nobody else can modify it
|
||||||
with broker.lock():
|
with broker.lock():
|
||||||
if not self._rsync_file(broker.db_file, remote_file, False):
|
if not self._rsync_file(broker.db_file, remote_file, False):
|
||||||
@@ -318,7 +316,7 @@ class Replicator(Daemon):
|
|||||||
self.logger.debug(_('Replicating db %s'), object_file)
|
self.logger.debug(_('Replicating db %s'), object_file)
|
||||||
self.stats['attempted'] += 1
|
self.stats['attempted'] += 1
|
||||||
try:
|
try:
|
||||||
broker = self.brokerclass(object_file)
|
broker = self.brokerclass(object_file, pending_timeout=30)
|
||||||
broker.reclaim(time.time() - self.reclaim_age,
|
broker.reclaim(time.time() - self.reclaim_age,
|
||||||
time.time() - (self.reclaim_age * 2))
|
time.time() - (self.reclaim_age * 2))
|
||||||
info = broker.get_replication_info()
|
info = broker.get_replication_info()
|
||||||
|
@@ -219,6 +219,8 @@ class ContainerController(object):
|
|||||||
if self.mount_check and not check_mount(self.root, drive):
|
if self.mount_check and not check_mount(self.root, drive):
|
||||||
return Response(status='507 %s is not mounted' % drive)
|
return Response(status='507 %s is not mounted' % drive)
|
||||||
broker = self._get_container_broker(drive, part, account, container)
|
broker = self._get_container_broker(drive, part, account, container)
|
||||||
|
broker.pending_timeout = 0.1
|
||||||
|
broker.stale_reads_ok = True
|
||||||
if broker.is_deleted():
|
if broker.is_deleted():
|
||||||
return HTTPNotFound(request=req)
|
return HTTPNotFound(request=req)
|
||||||
info = broker.get_info()
|
info = broker.get_info()
|
||||||
@@ -244,6 +246,8 @@ class ContainerController(object):
|
|||||||
if self.mount_check and not check_mount(self.root, drive):
|
if self.mount_check and not check_mount(self.root, drive):
|
||||||
return Response(status='507 %s is not mounted' % drive)
|
return Response(status='507 %s is not mounted' % drive)
|
||||||
broker = self._get_container_broker(drive, part, account, container)
|
broker = self._get_container_broker(drive, part, account, container)
|
||||||
|
broker.pending_timeout = 0.1
|
||||||
|
broker.stale_reads_ok = True
|
||||||
if broker.is_deleted():
|
if broker.is_deleted():
|
||||||
return HTTPNotFound(request=req)
|
return HTTPNotFound(request=req)
|
||||||
info = broker.get_info()
|
info = broker.get_info()
|
||||||
|
Reference in New Issue
Block a user