Python源码示例:testtools.ExpectedException()

示例1
def test_get_ssh_connection_timeout(self):
        c_mock, aa_mock, client_mock = self._set_ssh_connection_mocks()

        c_mock.return_value = client_mock
        client_mock.connect.side_effect = [
            socket.error,
            socket.error,
            socket.error,
        ]

        client = ssh.Client('localhost', 'root', timeout=2)
        start_time = int(time.time())
        with testtools.ExpectedException(exceptions.SSHTimeout):
            client._get_ssh_connection()
        end_time = int(time.time())
        self.assertLess((end_time - start_time), 5)
        self.assertGreaterEqual((end_time - start_time), 2) 
示例2
def test_is_valid_ttl(self):
        self.CONF.set_override('min_ttl', 10, 'service:central')
        self.service._is_valid_ttl(self.context, 20)

        # policy.check() not to raise: the user is allowed to create low TTLs
        designate.central.service.policy.check = mock.Mock(return_value=None)
        self.service._is_valid_ttl(self.context, None)
        self.service._is_valid_ttl(self.context, 1)

        # policy.check() to raise
        designate.central.service.policy.check = mock.Mock(
            side_effect=exceptions.Forbidden
        )

        with testtools.ExpectedException(exceptions.InvalidTTL):
            self.service._is_valid_ttl(self.context, 3) 
示例3
def test_is_valid_recordset_placement_failing_2(self):
        zone = RoObject(name='example.org.', id=CentralZoneTestCase.zone__id)
        self.service.storage.find_recordsets.return_value = [
            RoObject(),
            RoObject()
        ]
        with testtools.ExpectedException(
                exceptions.InvalidRecordSetLocation) as e:
            self.service._is_valid_recordset_placement(
                self.context,
                zone,
                'example.org.',
                'A',
            )
            self.assertEqual(
                'CNAME recordsets may not share a name with any other records',
                six.text_type(e)) 
示例4
def test_update_recordset_action_fail_on_managed(self):
        self.service.storage.get_zone.return_value = RoObject(
            type='foo',
            name='example.org.',
            tenant_id='2',
            action='bogus',
        )
        recordset = mock.Mock()
        recordset.obj_get_changes.return_value = ['foo']
        recordset.managed = True
        self.context = mock.Mock()
        self.context.edit_managed_records = False

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.service.update_recordset,
                                self.context,
                                recordset)

        self.assertEqual(exceptions.BadRequest, exc.exc_info[0]) 
示例5
def test_delete_recordset_not_found(self):
        self.service.storage.get_zone.return_value = RoObject(
            action='bogus',
            id=CentralZoneTestCase.zone__id_2,
            name='example.org.',
            tenant_id='2',
            type='foo',
        )
        self.service.storage.get_recordset.return_value = RoObject(
            zone_id=CentralZoneTestCase.zone__id,
            id=CentralZoneTestCase.recordset__id,
            managed=False,
        )
        self.context = mock.Mock()
        self.context.edit_managed_records = False

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.service.delete_recordset,
                                self.context,
                                CentralZoneTestCase.zone__id_2,
                                CentralZoneTestCase.recordset__id)

        self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0]) 
示例6
def test_delete_recordset_action_delete(self):
        self.service.storage.get_zone.return_value = RoObject(
            action='DELETE',
            id=CentralZoneTestCase.zone__id_2,
            name='example.org.',
            tenant_id='2',
            type='foo',
        )
        self.service.storage.get_recordset.return_value = RoObject(
            zone_id=CentralZoneTestCase.zone__id_2,
            id=CentralZoneTestCase.recordset__id,
            managed=False,
        )
        self.context = mock.Mock()
        self.context.edit_managed_records = False

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.service.delete_recordset,
                                self.context,
                                CentralZoneTestCase.zone__id_2,
                                CentralZoneTestCase.recordset__id)

        self.assertEqual(exceptions.BadRequest, exc.exc_info[0]) 
示例7
def test_create_record_fail_on_delete(self):
        self.service.storage.get_zone.return_value = RoObject(
            action='DELETE',
            id=CentralZoneTestCase.zone__id_2,
            name='example.org.',
            tenant_id='2',
            type='foo',
        )
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.service.create_record,
                                self.context,
                                CentralZoneTestCase.zone__id,
                                CentralZoneTestCase.recordset__id,
                                RoObject())

        self.assertEqual(exceptions.BadRequest, exc.exc_info[0]) 
示例8
def test_get_record_not_found(self):
        self.service.storage.get_zone.return_value = RoObject(
            id=CentralZoneTestCase.zone__id_2,
        )
        self.service.storage.get_recordset.return_value = RoObject(
            zone_id=CentralZoneTestCase.recordset__id
        )

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.service.get_record,
                                self.context,
                                CentralZoneTestCase.zone__id_2,
                                CentralZoneTestCase.recordset__id,
                                CentralZoneTestCase.record__id)

        self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0]) 
示例9
def test_get_record_not_found_2(self):
        self.service.storage.get_zone.return_value = RoObject(
            id=CentralZoneTestCase.zone__id_2,
            name='example.org.',
            tenant_id=2,
        )
        self.service.storage.get_recordset.return_value = RoObject(
            zone_id=CentralZoneTestCase.zone__id_2,
            id=999,  # not matching record.recordset_id
            name='foo'
        )
        self.service.storage.get_record.return_value = RoObject(
            id=CentralZoneTestCase.record__id,
            zone_id=CentralZoneTestCase.zone__id_2,
            recordset_id=CentralZoneTestCase.recordset__id
        )

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.service.get_record,
                                self.context,
                                CentralZoneTestCase.zone__id_2,
                                CentralZoneTestCase.recordset__id,
                                CentralZoneTestCase.record__id)

        self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0]) 
示例10
def test_is_valid_zone_name(self):
        self.config(max_zone_name_len=10,
                    group='service:central')

        context = self.get_context()

        self.central_service._is_valid_zone_name(context, 'valid.org.')

        with testtools.ExpectedException(exceptions.InvalidZoneName):
            self.central_service._is_valid_zone_name(context, 'example.org.')

        with testtools.ExpectedException(exceptions.InvalidZoneName):
            self.central_service._is_valid_zone_name(context, 'example.tld.')

        with testtools.ExpectedException(exceptions.InvalidZoneName):
            self.central_service._is_valid_zone_name(context, 'tld.') 
示例11
def test_is_valid_zone_name_with_tlds(self):
        # Stop Service
        self.central_service.stop()
        list = objects.TldList()
        list.append(objects.Tld(name='com'))
        list.append(objects.Tld(name='biz'))
        list.append(objects.Tld(name='z'))

        with mock.patch.object(self.central_service.storage, 'find_tlds',
                return_value=list):
            self.central_service.start()

        context = self.get_context()
        with mock.patch.object(self.central_service.storage, 'find_tld',
                return_value=objects.Tld(name='biz')):
            with testtools.ExpectedException(exceptions.InvalidZoneName):
                self.central_service._is_valid_zone_name(context, 'biz.') 
示例12
def test_create_blacklisted_zone_fail(self):
        self.create_blacklist()

        # Set the policy to reject the authz
        self.policy({'use_blacklisted_zone': '!'})

        values = dict(
            name='blacklisted.com.',
            email='info@blacklisted.com'
        )

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.create_zone,
                                self.admin_context,
                                objects.Zone.from_dict(values))

        self.assertEqual(exceptions.InvalidZoneName, exc.exc_info[0]) 
示例13
def test_create_invalid_recordset_location_cname_at_apex(self):
        zone = self.create_zone()

        values = dict(
            name=zone['name'],
            type='CNAME'
        )

        # Attempt to create a CNAME record at the apex
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.create_recordset,
                                self.admin_context,
                                zone['id'],
                                recordset=objects.RecordSet.from_dict(values))

        self.assertEqual(exceptions.InvalidRecordSetLocation, exc.exc_info[0]) 
示例14
def test_create_invalid_recordset_location_cname_sharing(self):
        zone = self.create_zone()
        expected = self.create_recordset(zone)

        values = dict(
            name=expected['name'],
            type='CNAME'
        )

        # Attempt to create a CNAME record alongside another record
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.create_recordset,
                                self.admin_context,
                                zone['id'],
                                recordset=objects.RecordSet.from_dict(values))

        self.assertEqual(exceptions.InvalidRecordSetLocation, exc.exc_info[0]) 
示例15
def test_create_invalid_recordset_location_wrong_zone(self):
        zone = self.create_zone()
        other_zone = self.create_zone(fixture=1)

        values = dict(
            name=other_zone['name'],
            type='A'
        )

        # Attempt to create a record in the incorrect zone
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.create_recordset,
                                self.admin_context,
                                zone['id'],
                                recordset=objects.RecordSet.from_dict(values))

        self.assertEqual(exceptions.InvalidRecordSetLocation, exc.exc_info[0]) 
示例16
def test_delete_recordset(self):
        zone = self.create_zone()
        original_serial = zone.serial

        # Create a recordset
        recordset = self.create_recordset(zone)

        # Delete the recordset
        self.central_service.delete_recordset(
            self.admin_context, zone['id'], recordset['id'])

        # Fetch the recordset again, ensuring an exception is raised
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.get_recordset,
                                self.admin_context, zone['id'],
                                recordset['id'])

        self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0])

        # Fetch the zone again to verify serial number increased
        updated_zone = self.central_service.get_zone(self.admin_context,
                                                     zone.id)
        new_serial = updated_zone.serial
        self.assertThat(new_serial, GreaterThan(original_serial)) 
示例17
def test_get_record_incorrect_recordset_id(self):
        zone = self.create_zone()
        recordset = self.create_recordset(zone)
        other_recordset = self.create_recordset(zone, fixture=1)

        # Create a record
        expected = self.create_record(zone, recordset)

        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.get_record,
                                self.admin_context, zone['id'],
                                other_recordset['id'],
                                expected['id'])

        # Ensure we get a 404 if we use the incorrect recordset_id
        self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0]) 
示例18
def test_update_record_immutable_zone_id(self):
        zone = self.create_zone()
        recordset = self.create_recordset(zone)
        other_zone = self.create_zone(fixture=1)

        # Create a record
        record = self.create_record(zone, recordset)

        # Update the record
        record.zone_id = other_zone.id

        # Ensure we get a BadRequest if we change the zone_id
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.update_record,
                                self.admin_context, record)

        self.assertEqual(exceptions.BadRequest, exc.exc_info[0]) 
示例19
def test_update_record_immutable_recordset_id(self):
        zone = self.create_zone()
        recordset = self.create_recordset(zone)
        other_recordset = self.create_recordset(zone, fixture=1)

        # Create a record
        record = self.create_record(zone, recordset)

        # Update the record
        record.recordset_id = other_recordset.id

        # Ensure we get a BadRequest if we change the recordset_id
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.update_record,
                                self.admin_context, record)

        self.assertEqual(exceptions.BadRequest, exc.exc_info[0]) 
示例20
def test_delete_record_incorrect_recordset_id(self):
        zone = self.create_zone()
        recordset = self.create_recordset(zone)
        other_recordset = self.create_recordset(zone, fixture=1)

        # Create a record
        record = self.create_record(zone, recordset)

        # Ensure we get a 404 if we use the incorrect recordset_id
        exc = self.assertRaises(rpc_dispatcher.ExpectedException,
                                self.central_service.delete_record,
                                self.admin_context, zone['id'],
                                other_recordset['id'],
                                record['id'])

        self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0]) 
示例21
def test_mod_with_wrong_field_type(self):
        msgid = "Test that we handle unused args %(arg1)d"
        params = {'arg1': 'test1'}

        with testtools.ExpectedException(TypeError):
            _message.Message(msgid) % params 
示例22
def test_mod_with_missing_arg(self):
        msgid = "Test that we handle missing args %(arg1)s %(arg2)s"
        params = {'arg1': 'test1'}

        with testtools.ExpectedException(KeyError, '.*arg2.*'):
            _message.Message(msgid) % params 
示例23
def test_proxy_request_other_code(self):
        with testtools.ExpectedException(Exception):
            self._proxy_request_test_helper(response_code=302) 
示例24
def test_timeout_in_exec_command(self):
        chan_mock, poll_mock, _ = self._set_mocks_for_select([0, 0, 0], True)

        # Test for a timeout condition immediately raised
        client = ssh.Client('localhost', 'root', timeout=2)
        with testtools.ExpectedException(exceptions.TimeoutException):
            client.exec_command("test")

        chan_mock.fileno.assert_called_once_with()
        chan_mock.exec_command.assert_called_once_with("test")
        chan_mock.shutdown_write.assert_called_once_with()

        poll_mock.register.assert_called_once_with(
            chan_mock, self.SELECT_POLLIN)
        poll_mock.poll.assert_called_once_with(10) 
示例25
def assertLinePasses(self, func, *args):
        with testtools.ExpectedException(StopIteration):
            next(func(*args)) 
示例26
def test_from_list(self):
        with testtools.ExpectedException(NotImplementedError):
            TestObject.from_list([]) 
示例27
def test_init_invalid(self):
        with testtools.ExpectedException(TypeError):
            TestObject(extra_field='Fail') 
示例28
def test_setattr_neg(self):
        obj = TestObject()

        with testtools.ExpectedException(AttributeError):
            obj.badthing = 'demons' 
示例29
def test_update_unexpected_attribute(self):
        obj = TestObject(id=1, name='test')
        with testtools.ExpectedException(AttributeError):
            obj.update({'id': 'new_id', 'new_key': 3}) 
示例30
def test_validate(self):
        obj = TestValidatableObject()

        # ID is required, so the object is not valid
        with testtools.ExpectedException(exceptions.InvalidObject):
            obj.validate()

        with testtools.ExpectedException(ValueError):
            obj.id = 'MyID'

        # Set the ID field to a valid value
        obj.id = 'ffded5c4-e4f6-4e02-a175-48e13c5c12a0'
        obj.validate()