Merge "Add a read-only role to keystoneauth"
This commit is contained in:
@@ -498,6 +498,14 @@ user_test5_tester5 = testing5 service
|
||||
# move between domains, you should disable backwards compatible name matching
|
||||
# in ACLs by setting allow_names_in_acls to false:
|
||||
# allow_names_in_acls = true
|
||||
#
|
||||
# In OpenStack terms, these reader roles are scoped for system: they
|
||||
# can read anything across projects and domains.
|
||||
# They are used for auditing and compliance fuctions.
|
||||
# In Swift terms, these roles are as powerful as the reseller_admin_role,
|
||||
# only do not modify the cluster.
|
||||
# By default the list of reader roles is empty.
|
||||
# system_reader_roles =
|
||||
|
||||
[filter:s3api]
|
||||
use = egg:swift#s3api
|
||||
|
||||
@@ -181,6 +181,9 @@ class KeystoneAuth(object):
|
||||
service_roles=[]))
|
||||
self.reseller_admin_role = conf.get('reseller_admin_role',
|
||||
'ResellerAdmin').lower()
|
||||
self.system_reader_roles = {role.lower() for role in list_from_csv(
|
||||
conf.get('system_reader_roles', ''))}
|
||||
|
||||
config_is_admin = conf.get('is_admin', "false").lower()
|
||||
if swift_utils.config_true_value(config_is_admin):
|
||||
self.logger.warning("The 'is_admin' option for keystoneauth is no "
|
||||
@@ -423,6 +426,19 @@ class KeystoneAuth(object):
|
||||
req.environ['swift_owner'] = True
|
||||
return
|
||||
|
||||
# The system_reader_role is almost as good as reseller_admin.
|
||||
if self.system_reader_roles.intersection(user_roles):
|
||||
# Note that if a system reader is trying to write, we're letting
|
||||
# the request fall on other access checks below. This way,
|
||||
# a compliance auditor can write a log file as a normal member.
|
||||
if req.method in ('GET', 'HEAD'):
|
||||
msg = 'User %s has system reader authorizing'
|
||||
self.logger.debug(msg, tenant_id)
|
||||
# We aren't setting 'swift_owner' nor 'reseller_request'
|
||||
# because they are only ever used for something that modifies
|
||||
# the contents of the cluster (setting ACL, deleting accounts).
|
||||
return
|
||||
|
||||
# If we are not reseller admin and user is trying to delete its own
|
||||
# account then deny it.
|
||||
if not container and not obj and req.method == 'DELETE':
|
||||
|
||||
@@ -593,7 +593,7 @@ class BaseTestAuthorize(unittest.TestCase):
|
||||
return self.test_auth._keystone_identity(env)
|
||||
|
||||
|
||||
class TestAuthorize(BaseTestAuthorize):
|
||||
class BaseTestAuthorizeCheck(BaseTestAuthorize):
|
||||
def _check_authenticate(self, account=None, identity=None, headers=None,
|
||||
exception=None, acl=None, env=None, path=None):
|
||||
if not identity:
|
||||
@@ -626,6 +626,8 @@ class TestAuthorize(BaseTestAuthorize):
|
||||
self.assertIsNone(result)
|
||||
return req
|
||||
|
||||
|
||||
class TestAuthorize(BaseTestAuthorizeCheck):
|
||||
def test_authorize_fails_for_unauthorized_user(self):
|
||||
self._check_authenticate(exception=HTTP_FORBIDDEN)
|
||||
|
||||
@@ -1508,6 +1510,65 @@ class TestSetProjectDomain(BaseTestAuthorize):
|
||||
sysmeta_project_domain_id='test_id')
|
||||
|
||||
|
||||
class TestAuthorizeReader(BaseTestAuthorizeCheck):
|
||||
|
||||
system_reader_role_1 = 'compliance'
|
||||
system_reader_role_2 = 'integrity'
|
||||
|
||||
# This cannot be in SetUp because it takes arguments from tests.
|
||||
def _setup(self, system_reader_roles):
|
||||
self.test_auth = keystoneauth.filter_factory(
|
||||
{}, system_reader_roles=system_reader_roles)(FakeApp())
|
||||
self.test_auth.logger = FakeLogger()
|
||||
|
||||
# Zero test: make sure that reader role has no default access
|
||||
# when not in the list of system_reader_roles[].
|
||||
def test_reader_none(self):
|
||||
# We could rifle in the KeystoneAuth internals and tweak the list,
|
||||
# but to create the middleware fresh is a clean, future-resistant way.
|
||||
self._setup(None)
|
||||
identity = self._get_identity(roles=[self.system_reader_role_1])
|
||||
self._check_authenticate(exception=HTTP_FORBIDDEN,
|
||||
identity=identity)
|
||||
|
||||
# HEAD is the same, right? No need to check, right?
|
||||
def test_reader_get(self):
|
||||
# While we're at it, test that our parsing of CSV works.
|
||||
self._setup("%s, %s" %
|
||||
(self.system_reader_role_1, self.system_reader_role_2))
|
||||
identity = self._get_identity(roles=[self.system_reader_role_1])
|
||||
self._check_authenticate(identity=identity)
|
||||
|
||||
def test_reader_put(self):
|
||||
self._setup(self.system_reader_role_1)
|
||||
identity = self._get_identity(roles=[self.system_reader_role_1])
|
||||
self._check_authenticate(exception=HTTP_FORBIDDEN,
|
||||
identity=identity,
|
||||
env={'REQUEST_METHOD': 'PUT'})
|
||||
self._check_authenticate(exception=HTTP_FORBIDDEN,
|
||||
identity=identity,
|
||||
env={'REQUEST_METHOD': 'POST'})
|
||||
|
||||
def test_reader_put_to_own(self):
|
||||
roles = operator_roles(self.test_auth) + [self.system_reader_role_1]
|
||||
identity = self._get_identity(roles=roles)
|
||||
req = self._check_authenticate(identity=identity,
|
||||
env={'REQUEST_METHOD': 'PUT'})
|
||||
self.assertTrue(req.environ.get('swift_owner'))
|
||||
|
||||
# This should not be happening, but let's make sure that reader did not
|
||||
# obtain any extra authorizations by combining with swiftoperator,
|
||||
# because that is how reader is going to be used in practice.
|
||||
def test_reader_put_elsewhere_fails(self):
|
||||
roles = operator_roles(self.test_auth) + [self.system_reader_role_1]
|
||||
identity = self._get_identity(roles=roles)
|
||||
account = "%s%s" % (self._get_account(identity), "2")
|
||||
self._check_authenticate(exception=HTTP_FORBIDDEN,
|
||||
identity=identity,
|
||||
account=account,
|
||||
env={'REQUEST_METHOD': 'PUT'})
|
||||
|
||||
|
||||
class ResellerInInfo(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
Reference in New Issue
Block a user