diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 67f8a392b6..8e399e6512 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -1258,7 +1258,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): remote_broker.initialize(put_timestamp, POLICIES.default.idx) def check_replicate(expected_shard_ranges, from_broker, to_broker): - daemon = replicator.ContainerReplicator({}) + daemon = replicator.ContainerReplicator({}, logger=FakeLogger()) part, node = self._get_broker_part_node(to_broker) info = broker.get_replication_info() success = daemon._repl_to_node(node, from_broker, part, info) @@ -1267,9 +1267,6 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): expected_shard_ranges, to_broker.get_all_shard_range_data() ) - self.assertEqual(1, daemon.stats['deferred']) - self.assertEqual(0, daemon.stats['rsync']) - self.assertEqual(0, daemon.stats['diff']) local_info = self._get_broker( 'a', 'c', node_index=0).get_info() remote_info = self._get_broker( @@ -1280,6 +1277,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertEqual(remote_info[k], v, "mismatch remote %s %r != %r" % ( k, remote_info[k], v)) + return daemon bounds = (('', 'g'), ('g', 'r'), ('r', '')) shard_ranges = [ @@ -1291,12 +1289,50 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): for shard_range in shard_ranges[:2]: for db in (broker, remote_broker): db.merge_shard_ranges(shard_range) - # now add a shard range to the "local" broker only - own_sr = broker.enable_sharding(Timestamp.now()) + # now add a shard range and an object to the "local" broker only broker.merge_shard_ranges(shard_ranges[2]) broker_ranges = broker.get_all_shard_range_data() + self.assertShardRangesEqual(shard_ranges, broker_ranges) + broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain', + EMPTY_ETAG) + # sharding not yet enabled so replication not deferred + daemon = check_replicate(broker_ranges, broker, remote_broker) + self.assertEqual(0, daemon.stats['deferred']) + self.assertEqual(0, daemon.stats['no_change']) + self.assertEqual(0, daemon.stats['rsync']) + self.assertEqual(1, daemon.stats['diff']) + self.assertEqual({'diffs': 1}, + daemon.logger.get_increment_counts()) + + # update one shard range + shard_ranges[1].update_meta(50, 50) + # sharding not yet enabled so replication not deferred, but the two + # brokers' object tables are in sync so no rsync or usync either + daemon = check_replicate(broker_ranges, broker, remote_broker) + self.assertEqual(0, daemon.stats['deferred']) + self.assertEqual(1, daemon.stats['no_change']) + self.assertEqual(0, daemon.stats['rsync']) + self.assertEqual(0, daemon.stats['diff']) + self.assertEqual({'no_changes': 1}, + daemon.logger.get_increment_counts()) + + # now enable local broker for sharding + own_sr = broker.enable_sharding(Timestamp.now()) + # update one shard range + shard_ranges[1].update_meta(13, 123) + broker.merge_shard_ranges(shard_ranges[1]) + broker_ranges = broker.get_all_shard_range_data() self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges) - check_replicate(broker_ranges, broker, remote_broker) + + def check_stats(daemon): + self.assertEqual(1, daemon.stats['deferred']) + self.assertEqual(0, daemon.stats['no_change']) + self.assertEqual(0, daemon.stats['rsync']) + self.assertEqual(0, daemon.stats['diff']) + self.assertFalse(daemon.logger.get_increments()) + + daemon = check_replicate(broker_ranges, broker, remote_broker) + check_stats(daemon) # update one shard range shard_ranges[1].update_meta(99, 0) @@ -1304,7 +1340,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): # sanity check broker_ranges = broker.get_all_shard_range_data() self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges) - check_replicate(broker_ranges, broker, remote_broker) + daemon = check_replicate(broker_ranges, broker, remote_broker) + check_stats(daemon) # delete one shard range shard_ranges[0].deleted = 1 @@ -1313,7 +1350,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): # sanity check broker_ranges = broker.get_all_shard_range_data() self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges) - check_replicate(broker_ranges, broker, remote_broker) + daemon = check_replicate(broker_ranges, broker, remote_broker) + check_stats(daemon) # put a shard range again shard_ranges[2].timestamp = Timestamp.now() @@ -1322,7 +1360,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): # sanity check broker_ranges = broker.get_all_shard_range_data() self.assertShardRangesEqual(shard_ranges + [own_sr], broker_ranges) - check_replicate(broker_ranges, broker, remote_broker) + daemon = check_replicate(broker_ranges, broker, remote_broker) + check_stats(daemon) # update same shard range on local and remote, remote later shard_ranges[-1].meta_timestamp = Timestamp.now() @@ -1338,7 +1377,8 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertShardRangesEqual(remote_shard_ranges + [own_sr], remote_broker_ranges) self.assertShardRangesNotEqual(shard_ranges, remote_shard_ranges) - check_replicate(remote_broker_ranges, broker, remote_broker) + daemon = check_replicate(remote_broker_ranges, broker, remote_broker) + check_stats(daemon) # undelete shard range *on the remote* deleted_ranges = [sr for sr in remote_shard_ranges if sr.deleted] @@ -1351,10 +1391,12 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): self.assertShardRangesEqual(remote_shard_ranges + [own_sr], remote_broker_ranges) self.assertShardRangesNotEqual(shard_ranges, remote_shard_ranges) - check_replicate(remote_broker_ranges, broker, remote_broker) + daemon = check_replicate(remote_broker_ranges, broker, remote_broker) + check_stats(daemon) # reverse replication direction and expect syncs to propagate - check_replicate(remote_broker_ranges, remote_broker, broker) + daemon = check_replicate(remote_broker_ranges, remote_broker, broker) + check_stats(daemon) def test_sync_shard_ranges_error(self): # verify that replication is not considered successful if