self.mocked_running_inst_state = "running"
self.mocked_running_inst_time = 10938474
+ self.mocked_disk_uuid = "mock_uuid_1134"
+ self.mocked_disk_name = "mock_disk_1134"
+
bootid = "mock_bootid"
storage_info = [
{
op = self.CopyOpCode(self.op,
disks=[[constants.DDM_ADD, -1,
{
- "uuid": "mock_uuid_1134"
+ "uuid": self.mocked_disk_uuid
}]])
self.ExecOpCodeExpectException(
op, errors.TypeEnforcementError, "Unknown parameter 'uuid'")
def testAttachDiskWrongTemplate(self):
msg = "Instance has '%s' template while disk has '%s' template" % \
(constants.DT_PLAIN, constants.DT_BLOCK)
- self.cfg.AddOrphanDisk(name="mock_disk_1134", dev_type=constants.DT_BLOCK)
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name, dev_type=constants.DT_BLOCK)
op = self.CopyOpCode(self.op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
)
self.ExecOpCodeExpectOpPrereqError(op, msg)
def testAttachDiskWrongNodes(self):
msg = "Disk nodes are \['mock_node_1134'\]"
- self.cfg.AddOrphanDisk(name="mock_disk_1134", primary_node="mock_node_1134")
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name, primary_node="mock_node_1134")
op = self.CopyOpCode(self.op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
)
self.ExecOpCodeExpectOpPrereqError(op, msg)
def testAttachDiskRunningInstance(self):
- self.cfg.AddOrphanDisk(name="mock_disk_1134")
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name, primary_node=self.master.uuid)
self.rpc.call_blockdev_assemble.return_value = \
self.RpcResultsBuilder() \
.CreateSuccessfulNodeResult(self.master,
op = self.CopyOpCode(self.running_op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
)
self.ExecOpCode(op)
self.assertFalse(self.rpc.call_blockdev_shutdown.called)
def testAttachDiskRunningInstanceNoWaitForSync(self):
- self.cfg.AddOrphanDisk(name="mock_disk_1134")
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name, primary_node=self.master.uuid)
self.rpc.call_blockdev_assemble.return_value = \
self.RpcResultsBuilder() \
.CreateSuccessfulNodeResult(self.master,
op = self.CopyOpCode(self.running_op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
wait_for_sync=False)
self.ExecOpCode(op)
self.assertFalse(self.rpc.call_blockdev_shutdown.called)
def testAttachDiskDownInstance(self):
- self.cfg.AddOrphanDisk(name="mock_disk_1134")
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name, primary_node=self.master.uuid)
op = self.CopyOpCode(self.op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]])
self.ExecOpCode(op)
self.assertTrue(self.rpc.call_blockdev_shutdown.called)
def testAttachDiskDownInstanceNoWaitForSync(self):
- self.cfg.AddOrphanDisk(name="mock_disk_1134")
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name)
op = self.CopyOpCode(self.op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
wait_for_sync=False)
self.ExecOpCodeExpectOpPrereqError(
" and --no-wait-for-sync given.")
def testHotAttachDisk(self):
- self.cfg.AddOrphanDisk(name="mock_disk_1134")
+ self.cfg.AddOrphanDisk(name=self.mocked_disk_name, primary_node=self.master.uuid)
self.rpc.call_blockdev_assemble.return_value = \
self.RpcResultsBuilder() \
.CreateSuccessfulNodeResult(self.master,
op = self.CopyOpCode(self.op,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
hotplug=True)
self.rpc.call_hotplug_supported.return_value = \
# well as the expected path where it will be moved.
mock_disk = self.cfg.CreateDisk(
name='mock_disk_1134', dev_type=constants.DT_FILE,
- logical_id=('loop', '/tmp/instance/disk'))
+ logical_id=('loop', '/tmp/instance/disk'), primary_node=self.master.uuid)
# Create a file-based instance
file_disk = self.cfg.CreateDisk(
inst = self.cfg.AddNewInstance(name='instance',
disk_template=constants.DT_FILE,
disks=[file_disk, mock_disk],
- )
+ )
# Detach the disk and assert that it has been moved to the upper directory
op = self.CopyOpCode(self.op,
instance_name=inst.name,
disks=[[constants.DDM_ATTACH, -1,
{
- constants.IDISK_NAME: "mock_disk_1134"
+ constants.IDISK_NAME: self.mocked_disk_name
}]],
)
self.ExecOpCode(op)
Also, check if the operations succeed both with name and uuid.
"""
- disk1 = self.cfg.CreateDisk(uuid="mock_uuid_1134")
- disk2 = self.cfg.CreateDisk(name="mock_name_1134")
+ disk1 = self.cfg.CreateDisk(uuid=self.mocked_disk_uuid,
+ primary_node=self.master.uuid)
+ disk2 = self.cfg.CreateDisk(name="mock_name_1134",
+ primary_node=self.master.uuid)
inst = self.cfg.AddNewInstance(disks=[disk1, disk2])
op = self.CopyOpCode(self.op,
instance_name=inst.name,
- disks=[[constants.DDM_DETACH, "mock_uuid_1134",
+ disks=[[constants.DDM_DETACH, self.mocked_disk_uuid,
{}]])
self.ExecOpCode(op)
self.assertEqual([disk2], self.cfg.GetInstanceDisks(inst.uuid))
instance_name=inst.name,
disks=[[constants.DDM_ATTACH, 0,
{
- 'uuid': "mock_uuid_1134"
+ 'uuid': self.mocked_disk_uuid
}]])
self.ExecOpCode(op)
self.assertEqual([disk1, disk2], self.cfg.GetInstanceDisks(inst.uuid))