9edf5248818f0d5a7ea87b43887484f1f195b035
[ganeti-github.git] / lib / cmdlib / instance.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with instances."""
23
24 import OpenSSL
25 import copy
26 import itertools
27 import logging
28 import operator
29 import os
30 import time
31
32 from ganeti import compat
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import ht
36 from ganeti import hypervisor
37 from ganeti import locking
38 from ganeti.masterd import iallocator
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import qlang
45 from ganeti import rpc
46 from ganeti import utils
47 from ganeti import query
48
49 from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
50 ResultWithJobs, Tasklet
51
52 from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
53 INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
54 _ShareAll, _GetDefaultIAllocator, _CheckInstanceNodeGroups, \
55 _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
56 _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
57 _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
58 _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
59 _CheckInstanceState, _ExpandNodeName
60 from ganeti.cmdlib.instance_storage import _CreateDisks, \
61 _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
62 _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
63 _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
64 _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
65 _AssembleInstanceDisks, _ExpandCheckDisks
66 from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
67 _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
68 _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
69 _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
70 _GetInstanceInfoText, _RemoveDisks
71
72 import ganeti.masterd.instance
73
74
75 #: Type description for changes as returned by L{ApplyContainerMods}'s
76 #: callbacks
77 _TApplyContModsCbChanges = \
78 ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
79 ht.TNonEmptyString,
80 ht.TAny,
81 ])))
82
83
84 def _CheckHostnameSane(lu, name):
85 """Ensures that a given hostname resolves to a 'sane' name.
86
87 The given name is required to be a prefix of the resolved hostname,
88 to prevent accidental mismatches.
89
90 @param lu: the logical unit on behalf of which we're checking
91 @param name: the name we should resolve and check
92 @return: the resolved hostname object
93
94 """
95 hostname = netutils.GetHostname(name=name)
96 if hostname.name != name:
97 lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
98 if not utils.MatchNameComponent(name, [hostname.name]):
99 raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
100 " same as given hostname '%s'") %
101 (hostname.name, name), errors.ECODE_INVAL)
102 return hostname
103
104
105 def _CheckOpportunisticLocking(op):
106 """Generate error if opportunistic locking is not possible.
107
108 """
109 if op.opportunistic_locking and not op.iallocator:
110 raise errors.OpPrereqError("Opportunistic locking is only available in"
111 " combination with an instance allocator",
112 errors.ECODE_INVAL)
113
114
115 def _CreateInstanceAllocRequest(op, disks, nics, beparams, node_whitelist):
116 """Wrapper around IAReqInstanceAlloc.
117
118 @param op: The instance opcode
119 @param disks: The computed disks
120 @param nics: The computed nics
121 @param beparams: The full filled beparams
122 @param node_whitelist: List of nodes which should appear as online to the
123 allocator (unless the node is already marked offline)
124
125 @returns: A filled L{iallocator.IAReqInstanceAlloc}
126
127 """
128 spindle_use = beparams[constants.BE_SPINDLE_USE]
129 return iallocator.IAReqInstanceAlloc(name=op.instance_name,
130 disk_template=op.disk_template,
131 tags=op.tags,
132 os=op.os_type,
133 vcpus=beparams[constants.BE_VCPUS],
134 memory=beparams[constants.BE_MAXMEM],
135 spindle_use=spindle_use,
136 disks=disks,
137 nics=[n.ToDict() for n in nics],
138 hypervisor=op.hypervisor,
139 node_whitelist=node_whitelist)
140
141
142 def _ComputeFullBeParams(op, cluster):
143 """Computes the full beparams.
144
145 @param op: The instance opcode
146 @param cluster: The cluster config object
147
148 @return: The fully filled beparams
149
150 """
151 default_beparams = cluster.beparams[constants.PP_DEFAULT]
152 for param, value in op.beparams.iteritems():
153 if value == constants.VALUE_AUTO:
154 op.beparams[param] = default_beparams[param]
155 objects.UpgradeBeParams(op.beparams)
156 utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES)
157 return cluster.SimpleFillBE(op.beparams)
158
159
160 def _ComputeNics(op, cluster, default_ip, cfg, ec_id):
161 """Computes the nics.
162
163 @param op: The instance opcode
164 @param cluster: Cluster configuration object
165 @param default_ip: The default ip to assign
166 @param cfg: An instance of the configuration object
167 @param ec_id: Execution context ID
168
169 @returns: The build up nics
170
171 """
172 nics = []
173 for nic in op.nics:
174 nic_mode_req = nic.get(constants.INIC_MODE, None)
175 nic_mode = nic_mode_req
176 if nic_mode is None or nic_mode == constants.VALUE_AUTO:
177 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
178
179 net = nic.get(constants.INIC_NETWORK, None)
180 link = nic.get(constants.NIC_LINK, None)
181 ip = nic.get(constants.INIC_IP, None)
182
183 if net is None or net.lower() == constants.VALUE_NONE:
184 net = None
185 else:
186 if nic_mode_req is not None or link is not None:
187 raise errors.OpPrereqError("If network is given, no mode or link"
188 " is allowed to be passed",
189 errors.ECODE_INVAL)
190
191 # ip validity checks
192 if ip is None or ip.lower() == constants.VALUE_NONE:
193 nic_ip = None
194 elif ip.lower() == constants.VALUE_AUTO:
195 if not op.name_check:
196 raise errors.OpPrereqError("IP address set to auto but name checks"
197 " have been skipped",
198 errors.ECODE_INVAL)
199 nic_ip = default_ip
200 else:
201 # We defer pool operations until later, so that the iallocator has
202 # filled in the instance's node(s) dimara
203 if ip.lower() == constants.NIC_IP_POOL:
204 if net is None:
205 raise errors.OpPrereqError("if ip=pool, parameter network"
206 " must be passed too",
207 errors.ECODE_INVAL)
208
209 elif not netutils.IPAddress.IsValid(ip):
210 raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
211 errors.ECODE_INVAL)
212
213 nic_ip = ip
214
215 # TODO: check the ip address for uniqueness
216 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
217 raise errors.OpPrereqError("Routed nic mode requires an ip address",
218 errors.ECODE_INVAL)
219
220 # MAC address verification
221 mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
222 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
223 mac = utils.NormalizeAndValidateMac(mac)
224
225 try:
226 # TODO: We need to factor this out
227 cfg.ReserveMAC(mac, ec_id)
228 except errors.ReservationError:
229 raise errors.OpPrereqError("MAC address %s already in use"
230 " in cluster" % mac,
231 errors.ECODE_NOTUNIQUE)
232
233 # Build nic parameters
234 nicparams = {}
235 if nic_mode_req:
236 nicparams[constants.NIC_MODE] = nic_mode
237 if link:
238 nicparams[constants.NIC_LINK] = link
239
240 check_params = cluster.SimpleFillNIC(nicparams)
241 objects.NIC.CheckParameterSyntax(check_params)
242 net_uuid = cfg.LookupNetwork(net)
243 name = nic.get(constants.INIC_NAME, None)
244 if name is not None and name.lower() == constants.VALUE_NONE:
245 name = None
246 nic_obj = objects.NIC(mac=mac, ip=nic_ip, name=name,
247 network=net_uuid, nicparams=nicparams)
248 nic_obj.uuid = cfg.GenerateUniqueID(ec_id)
249 nics.append(nic_obj)
250
251 return nics
252
253
254 def _CheckForConflictingIp(lu, ip, node):
255 """In case of conflicting IP address raise error.
256
257 @type ip: string
258 @param ip: IP address
259 @type node: string
260 @param node: node name
261
262 """
263 (conf_net, _) = lu.cfg.CheckIPInNodeGroup(ip, node)
264 if conf_net is not None:
265 raise errors.OpPrereqError(("The requested IP address (%s) belongs to"
266 " network %s, but the target NIC does not." %
267 (ip, conf_net)),
268 errors.ECODE_STATE)
269
270 return (None, None)
271
272
273 def _ComputeIPolicyInstanceSpecViolation(
274 ipolicy, instance_spec, disk_template,
275 _compute_fn=_ComputeIPolicySpecViolation):
276 """Compute if instance specs meets the specs of ipolicy.
277
278 @type ipolicy: dict
279 @param ipolicy: The ipolicy to verify against
280 @param instance_spec: dict
281 @param instance_spec: The instance spec to verify
282 @type disk_template: string
283 @param disk_template: the disk template of the instance
284 @param _compute_fn: The function to verify ipolicy (unittest only)
285 @see: L{_ComputeIPolicySpecViolation}
286
287 """
288 mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
289 cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
290 disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
291 disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
292 nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
293 spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
294
295 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
296 disk_sizes, spindle_use, disk_template)
297
298
299 def _CheckOSVariant(os_obj, name):
300 """Check whether an OS name conforms to the os variants specification.
301
302 @type os_obj: L{objects.OS}
303 @param os_obj: OS object to check
304 @type name: string
305 @param name: OS name passed by the user, to check for validity
306
307 """
308 variant = objects.OS.GetVariant(name)
309 if not os_obj.supported_variants:
310 if variant:
311 raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
312 " passed)" % (os_obj.name, variant),
313 errors.ECODE_INVAL)
314 return
315 if not variant:
316 raise errors.OpPrereqError("OS name must include a variant",
317 errors.ECODE_INVAL)
318
319 if variant not in os_obj.supported_variants:
320 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
321
322
323 def _CheckNodeHasOS(lu, node, os_name, force_variant):
324 """Ensure that a node supports a given OS.
325
326 @param lu: the LU on behalf of which we make the check
327 @param node: the node to check
328 @param os_name: the OS to query about
329 @param force_variant: whether to ignore variant errors
330 @raise errors.OpPrereqError: if the node is not supporting the OS
331
332 """
333 result = lu.rpc.call_os_get(node, os_name)
334 result.Raise("OS '%s' not in supported OS list for node %s" %
335 (os_name, node),
336 prereq=True, ecode=errors.ECODE_INVAL)
337 if not force_variant:
338 _CheckOSVariant(result.payload, os_name)
339
340
341 def _CheckNicsBridgesExist(lu, target_nics, target_node):
342 """Check that the brigdes needed by a list of nics exist.
343
344 """
345 cluster = lu.cfg.GetClusterInfo()
346 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
347 brlist = [params[constants.NIC_LINK] for params in paramslist
348 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
349 if brlist:
350 result = lu.rpc.call_bridges_exist(target_node, brlist)
351 result.Raise("Error checking bridges on destination node '%s'" %
352 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
353
354
355 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
356 """Checks if a node has enough free memory.
357
358 This function checks if a given node has the needed amount of free
359 memory. In case the node has less memory or we cannot get the
360 information from the node, this function raises an OpPrereqError
361 exception.
362
363 @type lu: C{LogicalUnit}
364 @param lu: a logical unit from which we get configuration data
365 @type node: C{str}
366 @param node: the node to check
367 @type reason: C{str}
368 @param reason: string to use in the error message
369 @type requested: C{int}
370 @param requested: the amount of memory in MiB to check for
371 @type hypervisor_name: C{str}
372 @param hypervisor_name: the hypervisor to ask for memory stats
373 @rtype: integer
374 @return: node current free memory
375 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
376 we cannot check the node
377
378 """
379 nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
380 nodeinfo[node].Raise("Can't get data from node %s" % node,
381 prereq=True, ecode=errors.ECODE_ENVIRON)
382 (_, _, (hv_info, )) = nodeinfo[node].payload
383
384 free_mem = hv_info.get("memory_free", None)
385 if not isinstance(free_mem, int):
386 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
387 " was '%s'" % (node, free_mem),
388 errors.ECODE_ENVIRON)
389 if requested > free_mem:
390 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
391 " needed %s MiB, available %s MiB" %
392 (node, reason, requested, free_mem),
393 errors.ECODE_NORES)
394 return free_mem
395
396
397 class LUInstanceCreate(LogicalUnit):
398 """Create an instance.
399
400 """
401 HPATH = "instance-add"
402 HTYPE = constants.HTYPE_INSTANCE
403 REQ_BGL = False
404
405 def CheckArguments(self):
406 """Check arguments.
407
408 """
409 # do not require name_check to ease forward/backward compatibility
410 # for tools
411 if self.op.no_install and self.op.start:
412 self.LogInfo("No-installation mode selected, disabling startup")
413 self.op.start = False
414 # validate/normalize the instance name
415 self.op.instance_name = \
416 netutils.Hostname.GetNormalizedName(self.op.instance_name)
417
418 if self.op.ip_check and not self.op.name_check:
419 # TODO: make the ip check more flexible and not depend on the name check
420 raise errors.OpPrereqError("Cannot do IP address check without a name"
421 " check", errors.ECODE_INVAL)
422
423 # check nics' parameter names
424 for nic in self.op.nics:
425 utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES)
426 # check that NIC's parameters names are unique and valid
427 utils.ValidateDeviceNames("NIC", self.op.nics)
428
429 # check that disk's names are unique and valid
430 utils.ValidateDeviceNames("disk", self.op.disks)
431
432 cluster = self.cfg.GetClusterInfo()
433 if not self.op.disk_template in cluster.enabled_disk_templates:
434 raise errors.OpPrereqError("Cannot create an instance with disk template"
435 " '%s', because it is not enabled in the"
436 " cluster. Enabled disk templates are: %s." %
437 (self.op.disk_template,
438 ",".join(cluster.enabled_disk_templates)))
439
440 # check disks. parameter names and consistent adopt/no-adopt strategy
441 has_adopt = has_no_adopt = False
442 for disk in self.op.disks:
443 if self.op.disk_template != constants.DT_EXT:
444 utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
445 if constants.IDISK_ADOPT in disk:
446 has_adopt = True
447 else:
448 has_no_adopt = True
449 if has_adopt and has_no_adopt:
450 raise errors.OpPrereqError("Either all disks are adopted or none is",
451 errors.ECODE_INVAL)
452 if has_adopt:
453 if self.op.disk_template not in constants.DTS_MAY_ADOPT:
454 raise errors.OpPrereqError("Disk adoption is not supported for the"
455 " '%s' disk template" %
456 self.op.disk_template,
457 errors.ECODE_INVAL)
458 if self.op.iallocator is not None:
459 raise errors.OpPrereqError("Disk adoption not allowed with an"
460 " iallocator script", errors.ECODE_INVAL)
461 if self.op.mode == constants.INSTANCE_IMPORT:
462 raise errors.OpPrereqError("Disk adoption not allowed for"
463 " instance import", errors.ECODE_INVAL)
464 else:
465 if self.op.disk_template in constants.DTS_MUST_ADOPT:
466 raise errors.OpPrereqError("Disk template %s requires disk adoption,"
467 " but no 'adopt' parameter given" %
468 self.op.disk_template,
469 errors.ECODE_INVAL)
470
471 self.adopt_disks = has_adopt
472
473 # instance name verification
474 if self.op.name_check:
475 self.hostname1 = _CheckHostnameSane(self, self.op.instance_name)
476 self.op.instance_name = self.hostname1.name
477 # used in CheckPrereq for ip ping check
478 self.check_ip = self.hostname1.ip
479 else:
480 self.check_ip = None
481
482 # file storage checks
483 if (self.op.file_driver and
484 not self.op.file_driver in constants.FILE_DRIVER):
485 raise errors.OpPrereqError("Invalid file driver name '%s'" %
486 self.op.file_driver, errors.ECODE_INVAL)
487
488 if self.op.disk_template == constants.DT_FILE:
489 opcodes.RequireFileStorage()
490 elif self.op.disk_template == constants.DT_SHARED_FILE:
491 opcodes.RequireSharedFileStorage()
492
493 ### Node/iallocator related checks
494 _CheckIAllocatorOrNode(self, "iallocator", "pnode")
495
496 if self.op.pnode is not None:
497 if self.op.disk_template in constants.DTS_INT_MIRROR:
498 if self.op.snode is None:
499 raise errors.OpPrereqError("The networked disk templates need"
500 " a mirror node", errors.ECODE_INVAL)
501 elif self.op.snode:
502 self.LogWarning("Secondary node will be ignored on non-mirrored disk"
503 " template")
504 self.op.snode = None
505
506 _CheckOpportunisticLocking(self.op)
507
508 self._cds = _GetClusterDomainSecret()
509
510 if self.op.mode == constants.INSTANCE_IMPORT:
511 # On import force_variant must be True, because if we forced it at
512 # initial install, our only chance when importing it back is that it
513 # works again!
514 self.op.force_variant = True
515
516 if self.op.no_install:
517 self.LogInfo("No-installation mode has no effect during import")
518
519 elif self.op.mode == constants.INSTANCE_CREATE:
520 if self.op.os_type is None:
521 raise errors.OpPrereqError("No guest OS specified",
522 errors.ECODE_INVAL)
523 if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
524 raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
525 " installation" % self.op.os_type,
526 errors.ECODE_STATE)
527 if self.op.disk_template is None:
528 raise errors.OpPrereqError("No disk template specified",
529 errors.ECODE_INVAL)
530
531 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
532 # Check handshake to ensure both clusters have the same domain secret
533 src_handshake = self.op.source_handshake
534 if not src_handshake:
535 raise errors.OpPrereqError("Missing source handshake",
536 errors.ECODE_INVAL)
537
538 errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
539 src_handshake)
540 if errmsg:
541 raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
542 errors.ECODE_INVAL)
543
544 # Load and check source CA
545 self.source_x509_ca_pem = self.op.source_x509_ca
546 if not self.source_x509_ca_pem:
547 raise errors.OpPrereqError("Missing source X509 CA",
548 errors.ECODE_INVAL)
549
550 try:
551 (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
552 self._cds)
553 except OpenSSL.crypto.Error, err:
554 raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
555 (err, ), errors.ECODE_INVAL)
556
557 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
558 if errcode is not None:
559 raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
560 errors.ECODE_INVAL)
561
562 self.source_x509_ca = cert
563
564 src_instance_name = self.op.source_instance_name
565 if not src_instance_name:
566 raise errors.OpPrereqError("Missing source instance name",
567 errors.ECODE_INVAL)
568
569 self.source_instance_name = \
570 netutils.GetHostname(name=src_instance_name).name
571
572 else:
573 raise errors.OpPrereqError("Invalid instance creation mode %r" %
574 self.op.mode, errors.ECODE_INVAL)
575
576 def ExpandNames(self):
577 """ExpandNames for CreateInstance.
578
579 Figure out the right locks for instance creation.
580
581 """
582 self.needed_locks = {}
583
584 instance_name = self.op.instance_name
585 # this is just a preventive check, but someone might still add this
586 # instance in the meantime, and creation will fail at lock-add time
587 if instance_name in self.cfg.GetInstanceList():
588 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
589 instance_name, errors.ECODE_EXISTS)
590
591 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
592
593 if self.op.iallocator:
594 # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
595 # specifying a group on instance creation and then selecting nodes from
596 # that group
597 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
598 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
599
600 if self.op.opportunistic_locking:
601 self.opportunistic_locks[locking.LEVEL_NODE] = True
602 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
603 else:
604 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
605 nodelist = [self.op.pnode]
606 if self.op.snode is not None:
607 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
608 nodelist.append(self.op.snode)
609 self.needed_locks[locking.LEVEL_NODE] = nodelist
610
611 # in case of import lock the source node too
612 if self.op.mode == constants.INSTANCE_IMPORT:
613 src_node = self.op.src_node
614 src_path = self.op.src_path
615
616 if src_path is None:
617 self.op.src_path = src_path = self.op.instance_name
618
619 if src_node is None:
620 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
621 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
622 self.op.src_node = None
623 if os.path.isabs(src_path):
624 raise errors.OpPrereqError("Importing an instance from a path"
625 " requires a source node option",
626 errors.ECODE_INVAL)
627 else:
628 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
629 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
630 self.needed_locks[locking.LEVEL_NODE].append(src_node)
631 if not os.path.isabs(src_path):
632 self.op.src_path = src_path = \
633 utils.PathJoin(pathutils.EXPORT_DIR, src_path)
634
635 self.needed_locks[locking.LEVEL_NODE_RES] = \
636 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
637
638 def _RunAllocator(self):
639 """Run the allocator based on input opcode.
640
641 """
642 if self.op.opportunistic_locking:
643 # Only consider nodes for which a lock is held
644 node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
645 else:
646 node_whitelist = None
647
648 #TODO Export network to iallocator so that it chooses a pnode
649 # in a nodegroup that has the desired network connected to
650 req = _CreateInstanceAllocRequest(self.op, self.disks,
651 self.nics, self.be_full,
652 node_whitelist)
653 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
654
655 ial.Run(self.op.iallocator)
656
657 if not ial.success:
658 # When opportunistic locks are used only a temporary failure is generated
659 if self.op.opportunistic_locking:
660 ecode = errors.ECODE_TEMP_NORES
661 else:
662 ecode = errors.ECODE_NORES
663
664 raise errors.OpPrereqError("Can't compute nodes using"
665 " iallocator '%s': %s" %
666 (self.op.iallocator, ial.info),
667 ecode)
668
669 self.op.pnode = ial.result[0]
670 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
671 self.op.instance_name, self.op.iallocator,
672 utils.CommaJoin(ial.result))
673
674 assert req.RequiredNodes() in (1, 2), "Wrong node count from iallocator"
675
676 if req.RequiredNodes() == 2:
677 self.op.snode = ial.result[1]
678
679 def BuildHooksEnv(self):
680 """Build hooks env.
681
682 This runs on master, primary and secondary nodes of the instance.
683
684 """
685 env = {
686 "ADD_MODE": self.op.mode,
687 }
688 if self.op.mode == constants.INSTANCE_IMPORT:
689 env["SRC_NODE"] = self.op.src_node
690 env["SRC_PATH"] = self.op.src_path
691 env["SRC_IMAGES"] = self.src_images
692
693 env.update(_BuildInstanceHookEnv(
694 name=self.op.instance_name,
695 primary_node=self.op.pnode,
696 secondary_nodes=self.secondaries,
697 status=self.op.start,
698 os_type=self.op.os_type,
699 minmem=self.be_full[constants.BE_MINMEM],
700 maxmem=self.be_full[constants.BE_MAXMEM],
701 vcpus=self.be_full[constants.BE_VCPUS],
702 nics=_NICListToTuple(self, self.nics),
703 disk_template=self.op.disk_template,
704 disks=[(d[constants.IDISK_NAME], d[constants.IDISK_SIZE],
705 d[constants.IDISK_MODE]) for d in self.disks],
706 bep=self.be_full,
707 hvp=self.hv_full,
708 hypervisor_name=self.op.hypervisor,
709 tags=self.op.tags,
710 ))
711
712 return env
713
714 def BuildHooksNodes(self):
715 """Build hooks nodes.
716
717 """
718 nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
719 return nl, nl
720
721 def _ReadExportInfo(self):
722 """Reads the export information from disk.
723
724 It will override the opcode source node and path with the actual
725 information, if these two were not specified before.
726
727 @return: the export information
728
729 """
730 assert self.op.mode == constants.INSTANCE_IMPORT
731
732 src_node = self.op.src_node
733 src_path = self.op.src_path
734
735 if src_node is None:
736 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
737 exp_list = self.rpc.call_export_list(locked_nodes)
738 found = False
739 for node in exp_list:
740 if exp_list[node].fail_msg:
741 continue
742 if src_path in exp_list[node].payload:
743 found = True
744 self.op.src_node = src_node = node
745 self.op.src_path = src_path = utils.PathJoin(pathutils.EXPORT_DIR,
746 src_path)
747 break
748 if not found:
749 raise errors.OpPrereqError("No export found for relative path %s" %
750 src_path, errors.ECODE_INVAL)
751
752 _CheckNodeOnline(self, src_node)
753 result = self.rpc.call_export_info(src_node, src_path)
754 result.Raise("No export or invalid export found in dir %s" % src_path)
755
756 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
757 if not export_info.has_section(constants.INISECT_EXP):
758 raise errors.ProgrammerError("Corrupted export config",
759 errors.ECODE_ENVIRON)
760
761 ei_version = export_info.get(constants.INISECT_EXP, "version")
762 if (int(ei_version) != constants.EXPORT_VERSION):
763 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
764 (ei_version, constants.EXPORT_VERSION),
765 errors.ECODE_ENVIRON)
766 return export_info
767
768 def _ReadExportParams(self, einfo):
769 """Use export parameters as defaults.
770
771 In case the opcode doesn't specify (as in override) some instance
772 parameters, then try to use them from the export information, if
773 that declares them.
774
775 """
776 self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
777
778 if self.op.disk_template is None:
779 if einfo.has_option(constants.INISECT_INS, "disk_template"):
780 self.op.disk_template = einfo.get(constants.INISECT_INS,
781 "disk_template")
782 if self.op.disk_template not in constants.DISK_TEMPLATES:
783 raise errors.OpPrereqError("Disk template specified in configuration"
784 " file is not one of the allowed values:"
785 " %s" %
786 " ".join(constants.DISK_TEMPLATES),
787 errors.ECODE_INVAL)
788 else:
789 raise errors.OpPrereqError("No disk template specified and the export"
790 " is missing the disk_template information",
791 errors.ECODE_INVAL)
792
793 if not self.op.disks:
794 disks = []
795 # TODO: import the disk iv_name too
796 for idx in range(constants.MAX_DISKS):
797 if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
798 disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
799 disks.append({constants.IDISK_SIZE: disk_sz})
800 self.op.disks = disks
801 if not disks and self.op.disk_template != constants.DT_DISKLESS:
802 raise errors.OpPrereqError("No disk info specified and the export"
803 " is missing the disk information",
804 errors.ECODE_INVAL)
805
806 if not self.op.nics:
807 nics = []
808 for idx in range(constants.MAX_NICS):
809 if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
810 ndict = {}
811 for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
812 v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
813 ndict[name] = v
814 nics.append(ndict)
815 else:
816 break
817 self.op.nics = nics
818
819 if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
820 self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
821
822 if (self.op.hypervisor is None and
823 einfo.has_option(constants.INISECT_INS, "hypervisor")):
824 self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
825
826 if einfo.has_section(constants.INISECT_HYP):
827 # use the export parameters but do not override the ones
828 # specified by the user
829 for name, value in einfo.items(constants.INISECT_HYP):
830 if name not in self.op.hvparams:
831 self.op.hvparams[name] = value
832
833 if einfo.has_section(constants.INISECT_BEP):
834 # use the parameters, without overriding
835 for name, value in einfo.items(constants.INISECT_BEP):
836 if name not in self.op.beparams:
837 self.op.beparams[name] = value
838 # Compatibility for the old "memory" be param
839 if name == constants.BE_MEMORY:
840 if constants.BE_MAXMEM not in self.op.beparams:
841 self.op.beparams[constants.BE_MAXMEM] = value
842 if constants.BE_MINMEM not in self.op.beparams:
843 self.op.beparams[constants.BE_MINMEM] = value
844 else:
845 # try to read the parameters old style, from the main section
846 for name in constants.BES_PARAMETERS:
847 if (name not in self.op.beparams and
848 einfo.has_option(constants.INISECT_INS, name)):
849 self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
850
851 if einfo.has_section(constants.INISECT_OSP):
852 # use the parameters, without overriding
853 for name, value in einfo.items(constants.INISECT_OSP):
854 if name not in self.op.osparams:
855 self.op.osparams[name] = value
856
857 def _RevertToDefaults(self, cluster):
858 """Revert the instance parameters to the default values.
859
860 """
861 # hvparams
862 hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {})
863 for name in self.op.hvparams.keys():
864 if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
865 del self.op.hvparams[name]
866 # beparams
867 be_defs = cluster.SimpleFillBE({})
868 for name in self.op.beparams.keys():
869 if name in be_defs and be_defs[name] == self.op.beparams[name]:
870 del self.op.beparams[name]
871 # nic params
872 nic_defs = cluster.SimpleFillNIC({})
873 for nic in self.op.nics:
874 for name in constants.NICS_PARAMETERS:
875 if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
876 del nic[name]
877 # osparams
878 os_defs = cluster.SimpleFillOS(self.op.os_type, {})
879 for name in self.op.osparams.keys():
880 if name in os_defs and os_defs[name] == self.op.osparams[name]:
881 del self.op.osparams[name]
882
883 def _CalculateFileStorageDir(self):
884 """Calculate final instance file storage dir.
885
886 """
887 # file storage dir calculation/check
888 self.instance_file_storage_dir = None
889 if self.op.disk_template in constants.DTS_FILEBASED:
890 # build the full file storage dir path
891 joinargs = []
892
893 if self.op.disk_template == constants.DT_SHARED_FILE:
894 get_fsd_fn = self.cfg.GetSharedFileStorageDir
895 else:
896 get_fsd_fn = self.cfg.GetFileStorageDir
897
898 cfg_storagedir = get_fsd_fn()
899 if not cfg_storagedir:
900 raise errors.OpPrereqError("Cluster file storage dir not defined",
901 errors.ECODE_STATE)
902 joinargs.append(cfg_storagedir)
903
904 if self.op.file_storage_dir is not None:
905 joinargs.append(self.op.file_storage_dir)
906
907 joinargs.append(self.op.instance_name)
908
909 # pylint: disable=W0142
910 self.instance_file_storage_dir = utils.PathJoin(*joinargs)
911
912 def CheckPrereq(self): # pylint: disable=R0914
913 """Check prerequisites.
914
915 """
916 self._CalculateFileStorageDir()
917
918 if self.op.mode == constants.INSTANCE_IMPORT:
919 export_info = self._ReadExportInfo()
920 self._ReadExportParams(export_info)
921 self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
922 else:
923 self._old_instance_name = None
924
925 if (not self.cfg.GetVGName() and
926 self.op.disk_template not in constants.DTS_NOT_LVM):
927 raise errors.OpPrereqError("Cluster does not support lvm-based"
928 " instances", errors.ECODE_STATE)
929
930 if (self.op.hypervisor is None or
931 self.op.hypervisor == constants.VALUE_AUTO):
932 self.op.hypervisor = self.cfg.GetHypervisorType()
933
934 cluster = self.cfg.GetClusterInfo()
935 enabled_hvs = cluster.enabled_hypervisors
936 if self.op.hypervisor not in enabled_hvs:
937 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
938 " cluster (%s)" %
939 (self.op.hypervisor, ",".join(enabled_hvs)),
940 errors.ECODE_STATE)
941
942 # Check tag validity
943 for tag in self.op.tags:
944 objects.TaggableObject.ValidateTag(tag)
945
946 # check hypervisor parameter syntax (locally)
947 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
948 filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
949 self.op.hvparams)
950 hv_type = hypervisor.GetHypervisorClass(self.op.hypervisor)
951 hv_type.CheckParameterSyntax(filled_hvp)
952 self.hv_full = filled_hvp
953 # check that we don't specify global parameters on an instance
954 _CheckParamsNotGlobal(self.op.hvparams, constants.HVC_GLOBALS, "hypervisor",
955 "instance", "cluster")
956
957 # fill and remember the beparams dict
958 self.be_full = _ComputeFullBeParams(self.op, cluster)
959
960 # build os parameters
961 self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams)
962
963 # now that hvp/bep are in final format, let's reset to defaults,
964 # if told to do so
965 if self.op.identify_defaults:
966 self._RevertToDefaults(cluster)
967
968 # NIC buildup
969 self.nics = _ComputeNics(self.op, cluster, self.check_ip, self.cfg,
970 self.proc.GetECId())
971
972 # disk checks/pre-build
973 default_vg = self.cfg.GetVGName()
974 self.disks = _ComputeDisks(self.op, default_vg)
975
976 if self.op.mode == constants.INSTANCE_IMPORT:
977 disk_images = []
978 for idx in range(len(self.disks)):
979 option = "disk%d_dump" % idx
980 if export_info.has_option(constants.INISECT_INS, option):
981 # FIXME: are the old os-es, disk sizes, etc. useful?
982 export_name = export_info.get(constants.INISECT_INS, option)
983 image = utils.PathJoin(self.op.src_path, export_name)
984 disk_images.append(image)
985 else:
986 disk_images.append(False)
987
988 self.src_images = disk_images
989
990 if self.op.instance_name == self._old_instance_name:
991 for idx, nic in enumerate(self.nics):
992 if nic.mac == constants.VALUE_AUTO:
993 nic_mac_ini = "nic%d_mac" % idx
994 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
995
996 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
997
998 # ip ping checks (we use the same ip that was resolved in ExpandNames)
999 if self.op.ip_check:
1000 if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
1001 raise errors.OpPrereqError("IP %s of instance %s already in use" %
1002 (self.check_ip, self.op.instance_name),
1003 errors.ECODE_NOTUNIQUE)
1004
1005 #### mac address generation
1006 # By generating here the mac address both the allocator and the hooks get
1007 # the real final mac address rather than the 'auto' or 'generate' value.
1008 # There is a race condition between the generation and the instance object
1009 # creation, which means that we know the mac is valid now, but we're not
1010 # sure it will be when we actually add the instance. If things go bad
1011 # adding the instance will abort because of a duplicate mac, and the
1012 # creation job will fail.
1013 for nic in self.nics:
1014 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
1015 nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
1016
1017 #### allocator run
1018
1019 if self.op.iallocator is not None:
1020 self._RunAllocator()
1021
1022 # Release all unneeded node locks
1023 keep_locks = filter(None, [self.op.pnode, self.op.snode, self.op.src_node])
1024 _ReleaseLocks(self, locking.LEVEL_NODE, keep=keep_locks)
1025 _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=keep_locks)
1026 _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
1027
1028 assert (self.owned_locks(locking.LEVEL_NODE) ==
1029 self.owned_locks(locking.LEVEL_NODE_RES)), \
1030 "Node locks differ from node resource locks"
1031
1032 #### node related checks
1033
1034 # check primary node
1035 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
1036 assert self.pnode is not None, \
1037 "Cannot retrieve locked node %s" % self.op.pnode
1038 if pnode.offline:
1039 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
1040 pnode.name, errors.ECODE_STATE)
1041 if pnode.drained:
1042 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
1043 pnode.name, errors.ECODE_STATE)
1044 if not pnode.vm_capable:
1045 raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
1046 " '%s'" % pnode.name, errors.ECODE_STATE)
1047
1048 self.secondaries = []
1049
1050 # Fill in any IPs from IP pools. This must happen here, because we need to
1051 # know the nic's primary node, as specified by the iallocator
1052 for idx, nic in enumerate(self.nics):
1053 net_uuid = nic.network
1054 if net_uuid is not None:
1055 nobj = self.cfg.GetNetwork(net_uuid)
1056 netparams = self.cfg.GetGroupNetParams(net_uuid, self.pnode.name)
1057 if netparams is None:
1058 raise errors.OpPrereqError("No netparams found for network"
1059 " %s. Propably not connected to"
1060 " node's %s nodegroup" %
1061 (nobj.name, self.pnode.name),
1062 errors.ECODE_INVAL)
1063 self.LogInfo("NIC/%d inherits netparams %s" %
1064 (idx, netparams.values()))
1065 nic.nicparams = dict(netparams)
1066 if nic.ip is not None:
1067 if nic.ip.lower() == constants.NIC_IP_POOL:
1068 try:
1069 nic.ip = self.cfg.GenerateIp(net_uuid, self.proc.GetECId())
1070 except errors.ReservationError:
1071 raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
1072 " from the address pool" % idx,
1073 errors.ECODE_STATE)
1074 self.LogInfo("Chose IP %s from network %s", nic.ip, nobj.name)
1075 else:
1076 try:
1077 self.cfg.ReserveIp(net_uuid, nic.ip, self.proc.GetECId())
1078 except errors.ReservationError:
1079 raise errors.OpPrereqError("IP address %s already in use"
1080 " or does not belong to network %s" %
1081 (nic.ip, nobj.name),
1082 errors.ECODE_NOTUNIQUE)
1083
1084 # net is None, ip None or given
1085 elif self.op.conflicts_check:
1086 _CheckForConflictingIp(self, nic.ip, self.pnode.name)
1087
1088 # mirror node verification
1089 if self.op.disk_template in constants.DTS_INT_MIRROR:
1090 if self.op.snode == pnode.name:
1091 raise errors.OpPrereqError("The secondary node cannot be the"
1092 " primary node", errors.ECODE_INVAL)
1093 _CheckNodeOnline(self, self.op.snode)
1094 _CheckNodeNotDrained(self, self.op.snode)
1095 _CheckNodeVmCapable(self, self.op.snode)
1096 self.secondaries.append(self.op.snode)
1097
1098 snode = self.cfg.GetNodeInfo(self.op.snode)
1099 if pnode.group != snode.group:
1100 self.LogWarning("The primary and secondary nodes are in two"
1101 " different node groups; the disk parameters"
1102 " from the first disk's node group will be"
1103 " used")
1104
1105 if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
1106 nodes = [pnode]
1107 if self.op.disk_template in constants.DTS_INT_MIRROR:
1108 nodes.append(snode)
1109 has_es = lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n)
1110 if compat.any(map(has_es, nodes)):
1111 raise errors.OpPrereqError("Disk template %s not supported with"
1112 " exclusive storage" % self.op.disk_template,
1113 errors.ECODE_STATE)
1114
1115 nodenames = [pnode.name] + self.secondaries
1116
1117 if not self.adopt_disks:
1118 if self.op.disk_template == constants.DT_RBD:
1119 # _CheckRADOSFreeSpace() is just a placeholder.
1120 # Any function that checks prerequisites can be placed here.
1121 # Check if there is enough space on the RADOS cluster.
1122 _CheckRADOSFreeSpace()
1123 elif self.op.disk_template == constants.DT_EXT:
1124 # FIXME: Function that checks prereqs if needed
1125 pass
1126 else:
1127 # Check lv size requirements, if not adopting
1128 req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
1129 _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
1130
1131 elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
1132 all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
1133 disk[constants.IDISK_ADOPT])
1134 for disk in self.disks])
1135 if len(all_lvs) != len(self.disks):
1136 raise errors.OpPrereqError("Duplicate volume names given for adoption",
1137 errors.ECODE_INVAL)
1138 for lv_name in all_lvs:
1139 try:
1140 # FIXME: lv_name here is "vg/lv" need to ensure that other calls
1141 # to ReserveLV uses the same syntax
1142 self.cfg.ReserveLV(lv_name, self.proc.GetECId())
1143 except errors.ReservationError:
1144 raise errors.OpPrereqError("LV named %s used by another instance" %
1145 lv_name, errors.ECODE_NOTUNIQUE)
1146
1147 vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
1148 vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
1149
1150 node_lvs = self.rpc.call_lv_list([pnode.name],
1151 vg_names.payload.keys())[pnode.name]
1152 node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
1153 node_lvs = node_lvs.payload
1154
1155 delta = all_lvs.difference(node_lvs.keys())
1156 if delta:
1157 raise errors.OpPrereqError("Missing logical volume(s): %s" %
1158 utils.CommaJoin(delta),
1159 errors.ECODE_INVAL)
1160 online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
1161 if online_lvs:
1162 raise errors.OpPrereqError("Online logical volumes found, cannot"
1163 " adopt: %s" % utils.CommaJoin(online_lvs),
1164 errors.ECODE_STATE)
1165 # update the size of disk based on what is found
1166 for dsk in self.disks:
1167 dsk[constants.IDISK_SIZE] = \
1168 int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
1169 dsk[constants.IDISK_ADOPT])][0]))
1170
1171 elif self.op.disk_template == constants.DT_BLOCK:
1172 # Normalize and de-duplicate device paths
1173 all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
1174 for disk in self.disks])
1175 if len(all_disks) != len(self.disks):
1176 raise errors.OpPrereqError("Duplicate disk names given for adoption",
1177 errors.ECODE_INVAL)
1178 baddisks = [d for d in all_disks
1179 if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
1180 if baddisks:
1181 raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
1182 " cannot be adopted" %
1183 (utils.CommaJoin(baddisks),
1184 constants.ADOPTABLE_BLOCKDEV_ROOT),
1185 errors.ECODE_INVAL)
1186
1187 node_disks = self.rpc.call_bdev_sizes([pnode.name],
1188 list(all_disks))[pnode.name]
1189 node_disks.Raise("Cannot get block device information from node %s" %
1190 pnode.name)
1191 node_disks = node_disks.payload
1192 delta = all_disks.difference(node_disks.keys())
1193 if delta:
1194 raise errors.OpPrereqError("Missing block device(s): %s" %
1195 utils.CommaJoin(delta),
1196 errors.ECODE_INVAL)
1197 for dsk in self.disks:
1198 dsk[constants.IDISK_SIZE] = \
1199 int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
1200
1201 # Verify instance specs
1202 spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
1203 ispec = {
1204 constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
1205 constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
1206 constants.ISPEC_DISK_COUNT: len(self.disks),
1207 constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE]
1208 for disk in self.disks],
1209 constants.ISPEC_NIC_COUNT: len(self.nics),
1210 constants.ISPEC_SPINDLE_USE: spindle_use,
1211 }
1212
1213 group_info = self.cfg.GetNodeGroup(pnode.group)
1214 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1215 res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec,
1216 self.op.disk_template)
1217 if not self.op.ignore_ipolicy and res:
1218 msg = ("Instance allocation to group %s (%s) violates policy: %s" %
1219 (pnode.group, group_info.name, utils.CommaJoin(res)))
1220 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1221
1222 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
1223
1224 _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
1225 # check OS parameters (remotely)
1226 _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full)
1227
1228 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
1229
1230 #TODO: _CheckExtParams (remotely)
1231 # Check parameters for extstorage
1232
1233 # memory check on primary node
1234 #TODO(dynmem): use MINMEM for checking
1235 if self.op.start:
1236 _CheckNodeFreeMemory(self, self.pnode.name,
1237 "creating instance %s" % self.op.instance_name,
1238 self.be_full[constants.BE_MAXMEM],
1239 self.op.hypervisor)
1240
1241 self.dry_run_result = list(nodenames)
1242
1243 def Exec(self, feedback_fn):
1244 """Create and add the instance to the cluster.
1245
1246 """
1247 instance = self.op.instance_name
1248 pnode_name = self.pnode.name
1249
1250 assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
1251 self.owned_locks(locking.LEVEL_NODE)), \
1252 "Node locks differ from node resource locks"
1253 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1254
1255 ht_kind = self.op.hypervisor
1256 if ht_kind in constants.HTS_REQ_PORT:
1257 network_port = self.cfg.AllocatePort()
1258 else:
1259 network_port = None
1260
1261 # This is ugly but we got a chicken-egg problem here
1262 # We can only take the group disk parameters, as the instance
1263 # has no disks yet (we are generating them right here).
1264 node = self.cfg.GetNodeInfo(pnode_name)
1265 nodegroup = self.cfg.GetNodeGroup(node.group)
1266 disks = _GenerateDiskTemplate(self,
1267 self.op.disk_template,
1268 instance, pnode_name,
1269 self.secondaries,
1270 self.disks,
1271 self.instance_file_storage_dir,
1272 self.op.file_driver,
1273 0,
1274 feedback_fn,
1275 self.cfg.GetGroupDiskParams(nodegroup))
1276
1277 iobj = objects.Instance(name=instance, os=self.op.os_type,
1278 primary_node=pnode_name,
1279 nics=self.nics, disks=disks,
1280 disk_template=self.op.disk_template,
1281 admin_state=constants.ADMINST_DOWN,
1282 network_port=network_port,
1283 beparams=self.op.beparams,
1284 hvparams=self.op.hvparams,
1285 hypervisor=self.op.hypervisor,
1286 osparams=self.op.osparams,
1287 )
1288
1289 if self.op.tags:
1290 for tag in self.op.tags:
1291 iobj.AddTag(tag)
1292
1293 if self.adopt_disks:
1294 if self.op.disk_template == constants.DT_PLAIN:
1295 # rename LVs to the newly-generated names; we need to construct
1296 # 'fake' LV disks with the old data, plus the new unique_id
1297 tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
1298 rename_to = []
1299 for t_dsk, a_dsk in zip(tmp_disks, self.disks):
1300 rename_to.append(t_dsk.logical_id)
1301 t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
1302 self.cfg.SetDiskID(t_dsk, pnode_name)
1303 result = self.rpc.call_blockdev_rename(pnode_name,
1304 zip(tmp_disks, rename_to))
1305 result.Raise("Failed to rename adoped LVs")
1306 else:
1307 feedback_fn("* creating instance disks...")
1308 try:
1309 _CreateDisks(self, iobj)
1310 except errors.OpExecError:
1311 self.LogWarning("Device creation failed")
1312 self.cfg.ReleaseDRBDMinors(instance)
1313 raise
1314
1315 feedback_fn("adding instance %s to cluster config" % instance)
1316
1317 self.cfg.AddInstance(iobj, self.proc.GetECId())
1318
1319 # Declare that we don't want to remove the instance lock anymore, as we've
1320 # added the instance to the config
1321 del self.remove_locks[locking.LEVEL_INSTANCE]
1322
1323 if self.op.mode == constants.INSTANCE_IMPORT:
1324 # Release unused nodes
1325 _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
1326 else:
1327 # Release all nodes
1328 _ReleaseLocks(self, locking.LEVEL_NODE)
1329
1330 disk_abort = False
1331 if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
1332 feedback_fn("* wiping instance disks...")
1333 try:
1334 _WipeDisks(self, iobj)
1335 except errors.OpExecError, err:
1336 logging.exception("Wiping disks failed")
1337 self.LogWarning("Wiping instance disks failed (%s)", err)
1338 disk_abort = True
1339
1340 if disk_abort:
1341 # Something is already wrong with the disks, don't do anything else
1342 pass
1343 elif self.op.wait_for_sync:
1344 disk_abort = not _WaitForSync(self, iobj)
1345 elif iobj.disk_template in constants.DTS_INT_MIRROR:
1346 # make sure the disks are not degraded (still sync-ing is ok)
1347 feedback_fn("* checking mirrors status")
1348 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
1349 else:
1350 disk_abort = False
1351
1352 if disk_abort:
1353 _RemoveDisks(self, iobj)
1354 self.cfg.RemoveInstance(iobj.name)
1355 # Make sure the instance lock gets removed
1356 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
1357 raise errors.OpExecError("There are some degraded disks for"
1358 " this instance")
1359
1360 # Release all node resource locks
1361 _ReleaseLocks(self, locking.LEVEL_NODE_RES)
1362
1363 if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
1364 # we need to set the disks ID to the primary node, since the
1365 # preceding code might or might have not done it, depending on
1366 # disk template and other options
1367 for disk in iobj.disks:
1368 self.cfg.SetDiskID(disk, pnode_name)
1369 if self.op.mode == constants.INSTANCE_CREATE:
1370 if not self.op.no_install:
1371 pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
1372 not self.op.wait_for_sync)
1373 if pause_sync:
1374 feedback_fn("* pausing disk sync to install instance OS")
1375 result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1376 (iobj.disks,
1377 iobj), True)
1378 for idx, success in enumerate(result.payload):
1379 if not success:
1380 logging.warn("pause-sync of instance %s for disk %d failed",
1381 instance, idx)
1382
1383 feedback_fn("* running the instance OS create scripts...")
1384 # FIXME: pass debug option from opcode to backend
1385 os_add_result = \
1386 self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
1387 self.op.debug_level)
1388 if pause_sync:
1389 feedback_fn("* resuming disk sync")
1390 result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
1391 (iobj.disks,
1392 iobj), False)
1393 for idx, success in enumerate(result.payload):
1394 if not success:
1395 logging.warn("resume-sync of instance %s for disk %d failed",
1396 instance, idx)
1397
1398 os_add_result.Raise("Could not add os for instance %s"
1399 " on node %s" % (instance, pnode_name))
1400
1401 else:
1402 if self.op.mode == constants.INSTANCE_IMPORT:
1403 feedback_fn("* running the instance OS import scripts...")
1404
1405 transfers = []
1406
1407 for idx, image in enumerate(self.src_images):
1408 if not image:
1409 continue
1410
1411 # FIXME: pass debug option from opcode to backend
1412 dt = masterd.instance.DiskTransfer("disk/%s" % idx,
1413 constants.IEIO_FILE, (image, ),
1414 constants.IEIO_SCRIPT,
1415 (iobj.disks[idx], idx),
1416 None)
1417 transfers.append(dt)
1418
1419 import_result = \
1420 masterd.instance.TransferInstanceData(self, feedback_fn,
1421 self.op.src_node, pnode_name,
1422 self.pnode.secondary_ip,
1423 iobj, transfers)
1424 if not compat.all(import_result):
1425 self.LogWarning("Some disks for instance %s on node %s were not"
1426 " imported successfully" % (instance, pnode_name))
1427
1428 rename_from = self._old_instance_name
1429
1430 elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
1431 feedback_fn("* preparing remote import...")
1432 # The source cluster will stop the instance before attempting to make
1433 # a connection. In some cases stopping an instance can take a long
1434 # time, hence the shutdown timeout is added to the connection
1435 # timeout.
1436 connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
1437 self.op.source_shutdown_timeout)
1438 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1439
1440 assert iobj.primary_node == self.pnode.name
1441 disk_results = \
1442 masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
1443 self.source_x509_ca,
1444 self._cds, timeouts)
1445 if not compat.all(disk_results):
1446 # TODO: Should the instance still be started, even if some disks
1447 # failed to import (valid for local imports, too)?
1448 self.LogWarning("Some disks for instance %s on node %s were not"
1449 " imported successfully" % (instance, pnode_name))
1450
1451 rename_from = self.source_instance_name
1452
1453 else:
1454 # also checked in the prereq part
1455 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
1456 % self.op.mode)
1457
1458 # Run rename script on newly imported instance
1459 assert iobj.name == instance
1460 feedback_fn("Running rename script for %s" % instance)
1461 result = self.rpc.call_instance_run_rename(pnode_name, iobj,
1462 rename_from,
1463 self.op.debug_level)
1464 if result.fail_msg:
1465 self.LogWarning("Failed to run rename script for %s on node"
1466 " %s: %s" % (instance, pnode_name, result.fail_msg))
1467
1468 assert not self.owned_locks(locking.LEVEL_NODE_RES)
1469
1470 if self.op.start:
1471 iobj.admin_state = constants.ADMINST_UP
1472 self.cfg.Update(iobj, feedback_fn)
1473 logging.info("Starting instance %s on node %s", instance, pnode_name)
1474 feedback_fn("* starting instance...")
1475 result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
1476 False, self.op.reason)
1477 result.Raise("Could not start instance")
1478
1479 return list(iobj.all_nodes)
1480
1481
1482 class LUInstanceRename(LogicalUnit):
1483 """Rename an instance.
1484
1485 """
1486 HPATH = "instance-rename"
1487 HTYPE = constants.HTYPE_INSTANCE
1488
1489 def CheckArguments(self):
1490 """Check arguments.
1491
1492 """
1493 if self.op.ip_check and not self.op.name_check:
1494 # TODO: make the ip check more flexible and not depend on the name check
1495 raise errors.OpPrereqError("IP address check requires a name check",
1496 errors.ECODE_INVAL)
1497
1498 def BuildHooksEnv(self):
1499 """Build hooks env.
1500
1501 This runs on master, primary and secondary nodes of the instance.
1502
1503 """
1504 env = _BuildInstanceHookEnvByObject(self, self.instance)
1505 env["INSTANCE_NEW_NAME"] = self.op.new_name
1506 return env
1507
1508 def BuildHooksNodes(self):
1509 """Build hooks nodes.
1510
1511 """
1512 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1513 return (nl, nl)
1514
1515 def CheckPrereq(self):
1516 """Check prerequisites.
1517
1518 This checks that the instance is in the cluster and is not running.
1519
1520 """
1521 self.op.instance_name = _ExpandInstanceName(self.cfg,
1522 self.op.instance_name)
1523 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1524 assert instance is not None
1525 _CheckNodeOnline(self, instance.primary_node)
1526 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
1527 msg="cannot rename")
1528 self.instance = instance
1529
1530 new_name = self.op.new_name
1531 if self.op.name_check:
1532 hostname = _CheckHostnameSane(self, new_name)
1533 new_name = self.op.new_name = hostname.name
1534 if (self.op.ip_check and
1535 netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
1536 raise errors.OpPrereqError("IP %s of instance %s already in use" %
1537 (hostname.ip, new_name),
1538 errors.ECODE_NOTUNIQUE)
1539
1540 instance_list = self.cfg.GetInstanceList()
1541 if new_name in instance_list and new_name != instance.name:
1542 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
1543 new_name, errors.ECODE_EXISTS)
1544
1545 def Exec(self, feedback_fn):
1546 """Rename the instance.
1547
1548 """
1549 inst = self.instance
1550 old_name = inst.name
1551
1552 rename_file_storage = False
1553 if (inst.disk_template in constants.DTS_FILEBASED and
1554 self.op.new_name != inst.name):
1555 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1556 rename_file_storage = True
1557
1558 self.cfg.RenameInstance(inst.name, self.op.new_name)
1559 # Change the instance lock. This is definitely safe while we hold the BGL.
1560 # Otherwise the new lock would have to be added in acquired mode.
1561 assert self.REQ_BGL
1562 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1563 self.glm.remove(locking.LEVEL_INSTANCE, old_name)
1564 self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
1565
1566 # re-read the instance from the configuration after rename
1567 inst = self.cfg.GetInstanceInfo(self.op.new_name)
1568
1569 if rename_file_storage:
1570 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
1571 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
1572 old_file_storage_dir,
1573 new_file_storage_dir)
1574 result.Raise("Could not rename on node %s directory '%s' to '%s'"
1575 " (but the instance has been renamed in Ganeti)" %
1576 (inst.primary_node, old_file_storage_dir,
1577 new_file_storage_dir))
1578
1579 _StartInstanceDisks(self, inst, None)
1580 # update info on disks
1581 info = _GetInstanceInfoText(inst)
1582 for (idx, disk) in enumerate(inst.disks):
1583 for node in inst.all_nodes:
1584 self.cfg.SetDiskID(disk, node)
1585 result = self.rpc.call_blockdev_setinfo(node, disk, info)
1586 if result.fail_msg:
1587 self.LogWarning("Error setting info on node %s for disk %s: %s",
1588 node, idx, result.fail_msg)
1589 try:
1590 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
1591 old_name, self.op.debug_level)
1592 msg = result.fail_msg
1593 if msg:
1594 msg = ("Could not run OS rename script for instance %s on node %s"
1595 " (but the instance has been renamed in Ganeti): %s" %
1596 (inst.name, inst.primary_node, msg))
1597 self.LogWarning(msg)
1598 finally:
1599 _ShutdownInstanceDisks(self, inst)
1600
1601 return inst.name
1602
1603
1604 class LUInstanceRemove(LogicalUnit):
1605 """Remove an instance.
1606
1607 """
1608 HPATH = "instance-remove"
1609 HTYPE = constants.HTYPE_INSTANCE
1610 REQ_BGL = False
1611
1612 def ExpandNames(self):
1613 self._ExpandAndLockInstance()
1614 self.needed_locks[locking.LEVEL_NODE] = []
1615 self.needed_locks[locking.LEVEL_NODE_RES] = []
1616 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1617
1618 def DeclareLocks(self, level):
1619 if level == locking.LEVEL_NODE:
1620 self._LockInstancesNodes()
1621 elif level == locking.LEVEL_NODE_RES:
1622 # Copy node locks
1623 self.needed_locks[locking.LEVEL_NODE_RES] = \
1624 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1625
1626 def BuildHooksEnv(self):
1627 """Build hooks env.
1628
1629 This runs on master, primary and secondary nodes of the instance.
1630
1631 """
1632 env = _BuildInstanceHookEnvByObject(self, self.instance)
1633 env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
1634 return env
1635
1636 def BuildHooksNodes(self):
1637 """Build hooks nodes.
1638
1639 """
1640 nl = [self.cfg.GetMasterNode()]
1641 nl_post = list(self.instance.all_nodes) + nl
1642 return (nl, nl_post)
1643
1644 def CheckPrereq(self):
1645 """Check prerequisites.
1646
1647 This checks that the instance is in the cluster.
1648
1649 """
1650 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651 assert self.instance is not None, \
1652 "Cannot retrieve locked instance %s" % self.op.instance_name
1653
1654 def Exec(self, feedback_fn):
1655 """Remove the instance.
1656
1657 """
1658 instance = self.instance
1659 logging.info("Shutting down instance %s on node %s",
1660 instance.name, instance.primary_node)
1661
1662 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
1663 self.op.shutdown_timeout,
1664 self.op.reason)
1665 msg = result.fail_msg
1666 if msg:
1667 if self.op.ignore_failures:
1668 feedback_fn("Warning: can't shutdown instance: %s" % msg)
1669 else:
1670 raise errors.OpExecError("Could not shutdown instance %s on"
1671 " node %s: %s" %
1672 (instance.name, instance.primary_node, msg))
1673
1674 assert (self.owned_locks(locking.LEVEL_NODE) ==
1675 self.owned_locks(locking.LEVEL_NODE_RES))
1676 assert not (set(instance.all_nodes) -
1677 self.owned_locks(locking.LEVEL_NODE)), \
1678 "Not owning correct locks"
1679
1680 _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
1681
1682
1683 def _CheckInstanceBridgesExist(lu, instance, node=None):
1684 """Check that the brigdes needed by an instance exist.
1685
1686 """
1687 if node is None:
1688 node = instance.primary_node
1689 _CheckNicsBridgesExist(lu, instance.nics, node)
1690
1691
1692 class LUInstanceMove(LogicalUnit):
1693 """Move an instance by data-copying.
1694
1695 """
1696 HPATH = "instance-move"
1697 HTYPE = constants.HTYPE_INSTANCE
1698 REQ_BGL = False
1699
1700 def ExpandNames(self):
1701 self._ExpandAndLockInstance()
1702 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
1703 self.op.target_node = target_node
1704 self.needed_locks[locking.LEVEL_NODE] = [target_node]
1705 self.needed_locks[locking.LEVEL_NODE_RES] = []
1706 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1707
1708 def DeclareLocks(self, level):
1709 if level == locking.LEVEL_NODE:
1710 self._LockInstancesNodes(primary_only=True)
1711 elif level == locking.LEVEL_NODE_RES:
1712 # Copy node locks
1713 self.needed_locks[locking.LEVEL_NODE_RES] = \
1714 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1715
1716 def BuildHooksEnv(self):
1717 """Build hooks env.
1718
1719 This runs on master, primary and secondary nodes of the instance.
1720
1721 """
1722 env = {
1723 "TARGET_NODE": self.op.target_node,
1724 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
1725 }
1726 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1727 return env
1728
1729 def BuildHooksNodes(self):
1730 """Build hooks nodes.
1731
1732 """
1733 nl = [
1734 self.cfg.GetMasterNode(),
1735 self.instance.primary_node,
1736 self.op.target_node,
1737 ]
1738 return (nl, nl)
1739
1740 def CheckPrereq(self):
1741 """Check prerequisites.
1742
1743 This checks that the instance is in the cluster.
1744
1745 """
1746 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1747 assert self.instance is not None, \
1748 "Cannot retrieve locked instance %s" % self.op.instance_name
1749
1750 if instance.disk_template not in constants.DTS_COPYABLE:
1751 raise errors.OpPrereqError("Disk template %s not suitable for copying" %
1752 instance.disk_template, errors.ECODE_STATE)
1753
1754 node = self.cfg.GetNodeInfo(self.op.target_node)
1755 assert node is not None, \
1756 "Cannot retrieve locked node %s" % self.op.target_node
1757
1758 self.target_node = target_node = node.name
1759
1760 if target_node == instance.primary_node:
1761 raise errors.OpPrereqError("Instance %s is already on the node %s" %
1762 (instance.name, target_node),
1763 errors.ECODE_STATE)
1764
1765 bep = self.cfg.GetClusterInfo().FillBE(instance)
1766
1767 for idx, dsk in enumerate(instance.disks):
1768 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
1769 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
1770 " cannot copy" % idx, errors.ECODE_STATE)
1771
1772 _CheckNodeOnline(self, target_node)
1773 _CheckNodeNotDrained(self, target_node)
1774 _CheckNodeVmCapable(self, target_node)
1775 cluster = self.cfg.GetClusterInfo()
1776 group_info = self.cfg.GetNodeGroup(node.group)
1777 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
1778 _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
1779 ignore=self.op.ignore_ipolicy)
1780
1781 if instance.admin_state == constants.ADMINST_UP:
1782 # check memory requirements on the secondary node
1783 _CheckNodeFreeMemory(self, target_node,
1784 "failing over instance %s" %
1785 instance.name, bep[constants.BE_MAXMEM],
1786 instance.hypervisor)
1787 else:
1788 self.LogInfo("Not checking memory on the secondary node as"
1789 " instance will not be started")
1790
1791 # check bridge existance
1792 _CheckInstanceBridgesExist(self, instance, node=target_node)
1793
1794 def Exec(self, feedback_fn):
1795 """Move an instance.
1796
1797 The move is done by shutting it down on its present node, copying
1798 the data over (slow) and starting it on the new node.
1799
1800 """
1801 instance = self.instance
1802
1803 source_node = instance.primary_node
1804 target_node = self.target_node
1805
1806 self.LogInfo("Shutting down instance %s on source node %s",
1807 instance.name, source_node)
1808
1809 assert (self.owned_locks(locking.LEVEL_NODE) ==
1810 self.owned_locks(locking.LEVEL_NODE_RES))
1811
1812 result = self.rpc.call_instance_shutdown(source_node, instance,
1813 self.op.shutdown_timeout,
1814 self.op.reason)
1815 msg = result.fail_msg
1816 if msg:
1817 if self.op.ignore_consistency:
1818 self.LogWarning("Could not shutdown instance %s on node %s."
1819 " Proceeding anyway. Please make sure node"
1820 " %s is down. Error details: %s",
1821 instance.name, source_node, source_node, msg)
1822 else:
1823 raise errors.OpExecError("Could not shutdown instance %s on"
1824 " node %s: %s" %
1825 (instance.name, source_node, msg))
1826
1827 # create the target disks
1828 try:
1829 _CreateDisks(self, instance, target_node=target_node)
1830 except errors.OpExecError:
1831 self.LogWarning("Device creation failed")
1832 self.cfg.ReleaseDRBDMinors(instance.name)
1833 raise
1834
1835 cluster_name = self.cfg.GetClusterInfo().cluster_name
1836
1837 errs = []
1838 # activate, get path, copy the data over
1839 for idx, disk in enumerate(instance.disks):
1840 self.LogInfo("Copying data for disk %d", idx)
1841 result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
1842 instance.name, True, idx)
1843 if result.fail_msg:
1844 self.LogWarning("Can't assemble newly created disk %d: %s",
1845 idx, result.fail_msg)
1846 errs.append(result.fail_msg)
1847 break
1848 dev_path = result.payload
1849 result = self.rpc.call_blockdev_export(source_node, (disk, instance),
1850 target_node, dev_path,
1851 cluster_name)
1852 if result.fail_msg:
1853 self.LogWarning("Can't copy data over for disk %d: %s",
1854 idx, result.fail_msg)
1855 errs.append(result.fail_msg)
1856 break
1857
1858 if errs:
1859 self.LogWarning("Some disks failed to copy, aborting")
1860 try:
1861 _RemoveDisks(self, instance, target_node=target_node)
1862 finally:
1863 self.cfg.ReleaseDRBDMinors(instance.name)
1864 raise errors.OpExecError("Errors during disk copy: %s" %
1865 (",".join(errs),))
1866
1867 instance.primary_node = target_node
1868 self.cfg.Update(instance, feedback_fn)
1869
1870 self.LogInfo("Removing the disks on the original node")
1871 _RemoveDisks(self, instance, target_node=source_node)
1872
1873 # Only start the instance if it's marked as up
1874 if instance.admin_state == constants.ADMINST_UP:
1875 self.LogInfo("Starting instance %s on node %s",
1876 instance.name, target_node)
1877
1878 disks_ok, _ = _AssembleInstanceDisks(self, instance,
1879 ignore_secondaries=True)
1880 if not disks_ok:
1881 _ShutdownInstanceDisks(self, instance)
1882 raise errors.OpExecError("Can't activate the instance's disks")
1883
1884 result = self.rpc.call_instance_start(target_node,
1885 (instance, None, None), False,
1886 self.op.reason)
1887 msg = result.fail_msg
1888 if msg:
1889 _ShutdownInstanceDisks(self, instance)
1890 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
1891 (instance.name, target_node, msg))
1892
1893
1894 def _GetInstanceConsole(cluster, instance):
1895 """Returns console information for an instance.
1896
1897 @type cluster: L{objects.Cluster}
1898 @type instance: L{objects.Instance}
1899 @rtype: dict
1900
1901 """
1902 hyper = hypervisor.GetHypervisorClass(instance.hypervisor)
1903 # beparams and hvparams are passed separately, to avoid editing the
1904 # instance and then saving the defaults in the instance itself.
1905 hvparams = cluster.FillHV(instance)
1906 beparams = cluster.FillBE(instance)
1907 console = hyper.GetInstanceConsole(instance, hvparams, beparams)
1908
1909 assert console.instance == instance.name
1910 assert console.Validate()
1911
1912 return console.ToDict()
1913
1914
1915 class _InstanceQuery(_QueryBase):
1916 FIELDS = query.INSTANCE_FIELDS
1917
1918 def ExpandNames(self, lu):
1919 lu.needed_locks = {}
1920 lu.share_locks = _ShareAll()
1921
1922 if self.names:
1923 self.wanted = _GetWantedInstances(lu, self.names)
1924 else:
1925 self.wanted = locking.ALL_SET
1926
1927 self.do_locking = (self.use_locking and
1928 query.IQ_LIVE in self.requested_data)
1929 if self.do_locking:
1930 lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1931 lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1932 lu.needed_locks[locking.LEVEL_NODE] = []
1933 lu.needed_locks[locking.LEVEL_NETWORK] = []
1934 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1935
1936 self.do_grouplocks = (self.do_locking and
1937 query.IQ_NODES in self.requested_data)
1938
1939 def DeclareLocks(self, lu, level):
1940 if self.do_locking:
1941 if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1942 assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1943
1944 # Lock all groups used by instances optimistically; this requires going
1945 # via the node before it's locked, requiring verification later on
1946 lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1947 set(group_uuid
1948 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1949 for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1950 elif level == locking.LEVEL_NODE:
1951 lu._LockInstancesNodes() # pylint: disable=W0212
1952
1953 elif level == locking.LEVEL_NETWORK:
1954 lu.needed_locks[locking.LEVEL_NETWORK] = \
1955 frozenset(net_uuid
1956 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1957 for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1958
1959 @staticmethod
1960 def _CheckGroupLocks(lu):
1961 owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1962 owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1963
1964 # Check if node groups for locked instances are still correct
1965 for instance_name in owned_instances:
1966 _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1967
1968 def _GetQueryData(self, lu):
1969 """Computes the list of instances and their attributes.
1970
1971 """
1972 if self.do_grouplocks:
1973 self._CheckGroupLocks(lu)
1974
1975 cluster = lu.cfg.GetClusterInfo()
1976 all_info = lu.cfg.GetAllInstancesInfo()
1977
1978 instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1979
1980 instance_list = [all_info[name] for name in instance_names]
1981 nodes = frozenset(itertools.chain(*(inst.all_nodes
1982 for inst in instance_list)))
1983 hv_list = list(set([inst.hypervisor for inst in instance_list]))
1984 bad_nodes = []
1985 offline_nodes = []
1986 wrongnode_inst = set()
1987
1988 # Gather data as requested
1989 if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1990 live_data = {}
1991 node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1992 for name in nodes:
1993 result = node_data[name]
1994 if result.offline:
1995 # offline nodes will be in both lists
1996 assert result.fail_msg
1997 offline_nodes.append(name)
1998 if result.fail_msg:
1999 bad_nodes.append(name)
2000 elif result.payload:
2001 for inst in result.payload:
2002 if inst in all_info:
2003 if all_info[inst].primary_node == name:
2004 live_data.update(result.payload)
2005 else:
2006 wrongnode_inst.add(inst)
2007 else:
2008 # orphan instance; we don't list it here as we don't
2009 # handle this case yet in the output of instance listing
2010 logging.warning("Orphan instance '%s' found on node %s",
2011 inst, name)
2012 # else no instance is alive
2013 else:
2014 live_data = {}
2015
2016 if query.IQ_DISKUSAGE in self.requested_data:
2017 gmi = ganeti.masterd.instance
2018 disk_usage = dict((inst.name,
2019 gmi.ComputeDiskSize(inst.disk_template,
2020 [{constants.IDISK_SIZE: disk.size}
2021 for disk in inst.disks]))
2022 for inst in instance_list)
2023 else:
2024 disk_usage = None
2025
2026 if query.IQ_CONSOLE in self.requested_data:
2027 consinfo = {}
2028 for inst in instance_list:
2029 if inst.name in live_data:
2030 # Instance is running
2031 consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
2032 else:
2033 consinfo[inst.name] = None
2034 assert set(consinfo.keys()) == set(instance_names)
2035 else:
2036 consinfo = None
2037
2038 if query.IQ_NODES in self.requested_data:
2039 node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
2040 instance_list)))
2041 nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
2042 groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
2043 for uuid in set(map(operator.attrgetter("group"),
2044 nodes.values())))
2045 else:
2046 nodes = None
2047 groups = None
2048
2049 if query.IQ_NETWORKS in self.requested_data:
2050 net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
2051 for i in instance_list))
2052 networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
2053 else:
2054 networks = None
2055
2056 return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
2057 disk_usage, offline_nodes, bad_nodes,
2058 live_data, wrongnode_inst, consinfo,
2059 nodes, groups, networks)
2060
2061
2062 class LUInstanceQuery(NoHooksLU):
2063 """Logical unit for querying instances.
2064
2065 """
2066 # pylint: disable=W0142
2067 REQ_BGL = False
2068
2069 def CheckArguments(self):
2070 self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2071 self.op.output_fields, self.op.use_locking)
2072
2073 def ExpandNames(self):
2074 self.iq.ExpandNames(self)
2075
2076 def DeclareLocks(self, level):
2077 self.iq.DeclareLocks(self, level)
2078
2079 def Exec(self, feedback_fn):
2080 return self.iq.OldStyleQuery(self)
2081
2082
2083 class LUInstanceQueryData(NoHooksLU):
2084 """Query runtime instance data.
2085
2086 """
2087 REQ_BGL = False
2088
2089 def ExpandNames(self):
2090 self.needed_locks = {}
2091
2092 # Use locking if requested or when non-static information is wanted
2093 if not (self.op.static or self.op.use_locking):
2094 self.LogWarning("Non-static data requested, locks need to be acquired")
2095 self.op.use_locking = True
2096
2097 if self.op.instances or not self.op.use_locking:
2098 # Expand instance names right here
2099 self.wanted_names = _GetWantedInstances(self, self.op.instances)
2100 else:
2101 # Will use acquired locks
2102 self.wanted_names = None
2103
2104 if self.op.use_locking:
2105 self.share_locks = _ShareAll()
2106
2107 if self.wanted_names is None:
2108 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2109 else:
2110 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
2111
2112 self.needed_locks[locking.LEVEL_NODEGROUP] = []
2113 self.needed_locks[locking.LEVEL_NODE] = []
2114 self.needed_locks[locking.LEVEL_NETWORK] = []
2115 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116
2117 def DeclareLocks(self, level):
2118 if self.op.use_locking:
2119 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
2120 if level == locking.LEVEL_NODEGROUP:
2121
2122 # Lock all groups used by instances optimistically; this requires going
2123 # via the node before it's locked, requiring verification later on
2124 self.needed_locks[locking.LEVEL_NODEGROUP] = \
2125 frozenset(group_uuid
2126 for instance_name in owned_instances
2127 for group_uuid in
2128 self.cfg.GetInstanceNodeGroups(instance_name))
2129
2130 elif level == locking.LEVEL_NODE:
2131 self._LockInstancesNodes()
2132
2133 elif level == locking.LEVEL_NETWORK:
2134 self.needed_locks[locking.LEVEL_NETWORK] = \
2135 frozenset(net_uuid
2136 for instance_name in owned_instances
2137 for net_uuid in
2138 self.cfg.GetInstanceNetworks(instance_name))
2139
2140 def CheckPrereq(self):
2141 """Check prerequisites.
2142
2143 This only checks the optional instance list against the existing names.
2144
2145 """
2146 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
2147 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
2148 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
2149 owned_networks = frozenset(self.owned_locks(locking.LEVEL_NETWORK))
2150
2151 if self.wanted_names is None:
2152 assert self.op.use_locking, "Locking was not used"
2153 self.wanted_names = owned_instances
2154
2155 instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
2156
2157 if self.op.use_locking:
2158 _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
2159 None)
2160 else:
2161 assert not (owned_instances or owned_groups or
2162 owned_nodes or owned_networks)
2163
2164 self.wanted_instances = instances.values()
2165
2166 def _ComputeBlockdevStatus(self, node, instance, dev):
2167 """Returns the status of a block device
2168
2169 """
2170 if self.op.static or not node:
2171 return None
2172
2173 self.cfg.SetDiskID(dev, node)
2174
2175 result = self.rpc.call_blockdev_find(node, dev)
2176 if result.offline:
2177 return None
2178
2179 result.Raise("Can't compute disk status for %s" % instance.name)
2180
2181 status = result.payload
2182 if status is None:
2183 return None
2184
2185 return (status.dev_path, status.major, status.minor,
2186 status.sync_percent, status.estimated_time,
2187 status.is_degraded, status.ldisk_status)
2188
2189 def _ComputeDiskStatus(self, instance, snode, dev):
2190 """Compute block device status.
2191
2192 """
2193 (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
2194
2195 return self._ComputeDiskStatusInner(instance, snode, anno_dev)
2196
2197 def _ComputeDiskStatusInner(self, instance, snode, dev):
2198 """Compute block device status.
2199
2200 @attention: The device has to be annotated already.
2201
2202 """
2203 if dev.dev_type in constants.LDS_DRBD:
2204 # we change the snode then (otherwise we use the one passed in)
2205 if dev.logical_id[0] == instance.primary_node:
2206 snode = dev.logical_id[1]
2207 else:
2208 snode = dev.logical_id[0]
2209
2210 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
2211 instance, dev)
2212 dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
2213
2214 if dev.children:
2215 dev_children = map(compat.partial(self._ComputeDiskStatusInner,
2216 instance, snode),
2217 dev.children)
2218 else:
2219 dev_children = []
2220
2221 return {
2222 "iv_name": dev.iv_name,
2223 "dev_type": dev.dev_type,
2224 "logical_id": dev.logical_id,
2225 "physical_id": dev.physical_id,
2226 "pstatus": dev_pstatus,
2227 "sstatus": dev_sstatus,
2228 "children": dev_children,
2229 "mode": dev.mode,
2230 "size": dev.size,
2231 "name": dev.name,
2232 "uuid": dev.uuid,
2233 }
2234
2235 def Exec(self, feedback_fn):
2236 """Gather and return data"""
2237 result = {}
2238
2239 cluster = self.cfg.GetClusterInfo()
2240
2241 node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
2242 nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
2243
2244 groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
2245 for node in nodes.values()))
2246
2247 group2name_fn = lambda uuid: groups[uuid].name
2248 for instance in self.wanted_instances:
2249 pnode = nodes[instance.primary_node]
2250
2251 if self.op.static or pnode.offline:
2252 remote_state = None
2253 if pnode.offline:
2254 self.LogWarning("Primary node %s is marked offline, returning static"
2255 " information only for instance %s" %
2256 (pnode.name, instance.name))
2257 else:
2258 remote_info = self.rpc.call_instance_info(instance.primary_node,
2259 instance.name,
2260 instance.hypervisor)
2261 remote_info.Raise("Error checking node %s" % instance.primary_node)
2262 remote_info = remote_info.payload
2263 if remote_info and "state" in remote_info:
2264 remote_state = "up"
2265 else:
2266 if instance.admin_state == constants.ADMINST_UP:
2267 remote_state = "down"
2268 else:
2269 remote_state = instance.admin_state
2270
2271 disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
2272 instance.disks)
2273
2274 snodes_group_uuids = [nodes[snode_name].group
2275 for snode_name in instance.secondary_nodes]
2276
2277 result[instance.name] = {
2278 "name": instance.name,
2279 "config_state": instance.admin_state,
2280 "run_state": remote_state,
2281 "pnode": instance.primary_node,
2282 "pnode_group_uuid": pnode.group,
2283 "pnode_group_name": group2name_fn(pnode.group),
2284 "snodes": instance.secondary_nodes,
2285 "snodes_group_uuids": snodes_group_uuids,
2286 "snodes_group_names": map(group2name_fn, snodes_group_uuids),
2287 "os": instance.os,
2288 # this happens to be the same format used for hooks
2289 "nics": _NICListToTuple(self, instance.nics),
2290 "disk_template": instance.disk_template,
2291 "disks": disks,
2292 "hypervisor": instance.hypervisor,
2293 "network_port": instance.network_port,
2294 "hv_instance": instance.hvparams,
2295 "hv_actual": cluster.FillHV(instance, skip_globals=True),
2296 "be_instance": instance.beparams,
2297 "be_actual": cluster.FillBE(instance),
2298 "os_instance": instance.osparams,
2299 "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
2300 "serial_no": instance.serial_no,
2301 "mtime": instance.mtime,
2302 "ctime": instance.ctime,
2303 "uuid": instance.uuid,
2304 }
2305
2306 return result
2307
2308
2309 class LUInstanceStartup(LogicalUnit):
2310 """Starts an instance.
2311
2312 """
2313 HPATH = "instance-start"
2314 HTYPE = constants.HTYPE_INSTANCE
2315 REQ_BGL = False
2316
2317 def CheckArguments(self):
2318 # extra beparams
2319 if self.op.beparams:
2320 # fill the beparams dict
2321 objects.UpgradeBeParams(self.op.beparams)
2322 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2323
2324 def ExpandNames(self):
2325 self._ExpandAndLockInstance()
2326 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2327
2328 def DeclareLocks(self, level):
2329 if level == locking.LEVEL_NODE_RES:
2330 self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
2331
2332 def BuildHooksEnv(self):
2333 """Build hooks env.
2334
2335 This runs on master, primary and secondary nodes of the instance.
2336
2337 """
2338 env = {
2339 "FORCE": self.op.force,
2340 }
2341
2342 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2343
2344 return env
2345
2346 def BuildHooksNodes(self):
2347 """Build hooks nodes.
2348
2349 """
2350 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2351 return (nl, nl)
2352
2353 def CheckPrereq(self):
2354 """Check prerequisites.
2355
2356 This checks that the instance is in the cluster.
2357
2358 """
2359 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2360 assert self.instance is not None, \
2361 "Cannot retrieve locked instance %s" % self.op.instance_name
2362
2363 # extra hvparams
2364 if self.op.hvparams:
2365 # check hypervisor parameter syntax (locally)
2366 cluster = self.cfg.GetClusterInfo()
2367 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
2368 filled_hvp = cluster.FillHV(instance)
2369 filled_hvp.update(self.op.hvparams)
2370 hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
2371 hv_type.CheckParameterSyntax(filled_hvp)
2372 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2373
2374 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2375
2376 self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
2377
2378 if self.primary_offline and self.op.ignore_offline_nodes:
2379 self.LogWarning("Ignoring offline primary node")
2380
2381 if self.op.hvparams or self.op.beparams:
2382 self.LogWarning("Overridden parameters are ignored")
2383 else:
2384 _CheckNodeOnline(self, instance.primary_node)
2385
2386 bep = self.cfg.GetClusterInfo().FillBE(instance)
2387 bep.update(self.op.beparams)
2388
2389 # check bridges existence
2390 _CheckInstanceBridgesExist(self, instance)
2391
2392 remote_info = self.rpc.call_instance_info(instance.primary_node,
2393 instance.name,
2394 instance.hypervisor)
2395 remote_info.Raise("Error checking node %s" % instance.primary_node,
2396 prereq=True, ecode=errors.ECODE_ENVIRON)
2397 if not remote_info.payload: # not running already
2398 _CheckNodeFreeMemory(self, instance.primary_node,
2399 "starting instance %s" % instance.name,
2400 bep[constants.BE_MINMEM], instance.hypervisor)
2401
2402 def Exec(self, feedback_fn):
2403 """Start the instance.
2404
2405 """
2406 instance = self.instance
2407 force = self.op.force
2408 reason = self.op.reason
2409
2410 if not self.op.no_remember:
2411 self.cfg.MarkInstanceUp(instance.name)
2412
2413 if self.primary_offline:
2414 assert self.op.ignore_offline_nodes
2415 self.LogInfo("Primary node offline, marked instance as started")
2416 else:
2417 node_current = instance.primary_node
2418
2419 _StartInstanceDisks(self, instance, force)
2420
2421 result = \
2422 self.rpc.call_instance_start(node_current,
2423 (instance, self.op.hvparams,
2424 self.op.beparams),
2425 self.op.startup_paused, reason)
2426 msg = result.fail_msg
2427 if msg:
2428 _ShutdownInstanceDisks(self, instance)
2429 raise errors.OpExecError("Could not start instance: %s" % msg)
2430
2431
2432 class LUInstanceShutdown(LogicalUnit):
2433 """Shutdown an instance.
2434
2435 """
2436 HPATH = "instance-stop"
2437 HTYPE = constants.HTYPE_INSTANCE
2438 REQ_BGL = False
2439
2440 def ExpandNames(self):
2441 self._ExpandAndLockInstance()
2442
2443 def BuildHooksEnv(self):
2444 """Build hooks env.
2445
2446 This runs on master, primary and secondary nodes of the instance.
2447
2448 """
2449 env = _BuildInstanceHookEnvByObject(self, self.instance)
2450 env["TIMEOUT"] = self.op.timeout
2451 return env
2452
2453 def BuildHooksNodes(self):
2454 """Build hooks nodes.
2455
2456 """
2457 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2458 return (nl, nl)
2459
2460 def CheckPrereq(self):
2461 """Check prerequisites.
2462
2463 This checks that the instance is in the cluster.
2464
2465 """
2466 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2467 assert self.instance is not None, \
2468 "Cannot retrieve locked instance %s" % self.op.instance_name
2469
2470 if not self.op.force:
2471 _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
2472 else:
2473 self.LogWarning("Ignoring offline instance check")
2474
2475 self.primary_offline = \
2476 self.cfg.GetNodeInfo(self.instance.primary_node).offline
2477
2478 if self.primary_offline and self.op.ignore_offline_nodes:
2479 self.LogWarning("Ignoring offline primary node")
2480 else:
2481 _CheckNodeOnline(self, self.instance.primary_node)
2482
2483 def Exec(self, feedback_fn):
2484 """Shutdown the instance.
2485
2486 """
2487 instance = self.instance
2488 node_current = instance.primary_node
2489 timeout = self.op.timeout
2490 reason = self.op.reason
2491
2492 # If the instance is offline we shouldn't mark it as down, as that
2493 # resets the offline flag.
2494 if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
2495 self.cfg.MarkInstanceDown(instance.name)
2496
2497 if self.primary_offline:
2498 assert self.op.ignore_offline_nodes
2499 self.LogInfo("Primary node offline, marked instance as stopped")
2500 else:
2501 result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
2502 reason)
2503 msg = result.fail_msg
2504 if msg:
2505 self.LogWarning("Could not shutdown instance: %s", msg)
2506
2507 _ShutdownInstanceDisks(self, instance)
2508
2509
2510 class LUInstanceReinstall(LogicalUnit):
2511 """Reinstall an instance.
2512
2513 """
2514 HPATH = "instance-reinstall"
2515 HTYPE = constants.HTYPE_INSTANCE
2516 REQ_BGL = False
2517
2518 def ExpandNames(self):
2519 self._ExpandAndLockInstance()
2520
2521 def BuildHooksEnv(self):
2522 """Build hooks env.
2523
2524 This runs on master, primary and secondary nodes of the instance.
2525
2526 """
2527 return _BuildInstanceHookEnvByObject(self, self.instance)
2528
2529 def BuildHooksNodes(self):
2530 """Build hooks nodes.
2531
2532 """
2533 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2534 return (nl, nl)
2535
2536 def CheckPrereq(self):
2537 """Check prerequisites.
2538
2539 This checks that the instance is in the cluster and is not running.
2540
2541 """
2542 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2543 assert instance is not None, \
2544 "Cannot retrieve locked instance %s" % self.op.instance_name
2545 _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
2546 " offline, cannot reinstall")
2547
2548 if instance.disk_template == constants.DT_DISKLESS:
2549 raise errors.OpPrereqError("Instance '%s' has no disks" %
2550 self.op.instance_name,
2551 errors.ECODE_INVAL)
2552 _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
2553
2554 if self.op.os_type is not None:
2555 # OS verification
2556 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
2557 _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
2558 instance_os = self.op.os_type
2559 else:
2560 instance_os = instance.os
2561
2562 nodelist = list(instance.all_nodes)
2563
2564 if self.op.osparams:
2565 i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
2566 _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
2567 self.os_inst = i_osdict # the new dict (without defaults)
2568 else:
2569 self.os_inst = None
2570
2571 self.instance = instance
2572
2573 def Exec(self, feedback_fn):
2574 """Reinstall the instance.
2575
2576 """
2577 inst = self.instance
2578
2579 if self.op.os_type is not None:
2580 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2581 inst.os = self.op.os_type
2582 # Write to configuration
2583 self.cfg.Update(inst, feedback_fn)
2584
2585 _StartInstanceDisks(self, inst, None)
2586 try:
2587 feedback_fn("Running the instance OS create scripts...")
2588 # FIXME: pass debug option from opcode to backend
2589 result = self.rpc.call_instance_os_add(inst.primary_node,
2590 (inst, self.os_inst), True,
2591 self.op.debug_level)
2592 result.Raise("Could not install OS for instance %s on node %s" %
2593 (inst.name, inst.primary_node))
2594 finally:
2595 _ShutdownInstanceDisks(self, inst)
2596
2597
2598 class LUInstanceReboot(LogicalUnit):
2599 """Reboot an instance.
2600
2601 """
2602 HPATH = "instance-reboot"
2603 HTYPE = constants.HTYPE_INSTANCE
2604 REQ_BGL = False
2605
2606 def ExpandNames(self):
2607 self._ExpandAndLockInstance()
2608
2609 def BuildHooksEnv(self):
2610 """Build hooks env.
2611
2612 This runs on master, primary and secondary nodes of the instance.
2613
2614 """
2615 env = {
2616 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2617 "REBOOT_TYPE": self.op.reboot_type,
2618 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2619 }
2620
2621 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2622
2623 return env
2624
2625 def BuildHooksNodes(self):
2626 """Build hooks nodes.
2627
2628 """
2629 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2630 return (nl, nl)
2631
2632 def CheckPrereq(self):
2633 """Check prerequisites.
2634
2635 This checks that the instance is in the cluster.
2636
2637 """
2638 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2639 assert self.instance is not None, \
2640 "Cannot retrieve locked instance %s" % self.op.instance_name
2641 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
2642 _CheckNodeOnline(self, instance.primary_node)
2643
2644 # check bridges existence
2645 _CheckInstanceBridgesExist(self, instance)
2646
2647 def Exec(self, feedback_fn):
2648 """Reboot the instance.
2649
2650 """
2651 instance = self.instance
2652 ignore_secondaries = self.op.ignore_secondaries
2653 reboot_type = self.op.reboot_type
2654 reason = self.op.reason
2655
2656 remote_info = self.rpc.call_instance_info(instance.primary_node,
2657 instance.name,
2658 instance.hypervisor)
2659 remote_info.Raise("Error checking node %s" % instance.primary_node)
2660 instance_running = bool(remote_info.payload)
2661
2662 node_current = instance.primary_node
2663
2664 if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2665 constants.INSTANCE_REBOOT_HARD]:
2666 for disk in instance.disks:
2667 self.cfg.SetDiskID(disk, node_current)
2668 result = self.rpc.call_instance_reboot(node_current, instance,
2669 reboot_type,
2670 self.op.shutdown_timeout, reason)
2671 result.Raise("Could not reboot instance")
2672 else:
2673 if instance_running:
2674 result = self.rpc.call_instance_shutdown(node_current, instance,
2675 self.op.shutdown_timeout,
2676 reason)
2677 result.Raise("Could not shutdown instance for full reboot")
2678 _ShutdownInstanceDisks(self, instance)
2679 else:
2680 self.LogInfo("Instance %s was already stopped, starting now",
2681 instance.name)
2682 _StartInstanceDisks(self, instance, ignore_secondaries)
2683 result = self.rpc.call_instance_start(node_current,
2684 (instance, None, None), False,
2685 reason)
2686 msg = result.fail_msg
2687 if msg:
2688 _ShutdownInstanceDisks(self, instance)
2689 raise errors.OpExecError("Could not start instance for"
2690 " full reboot: %s" % msg)
2691
2692 self.cfg.MarkInstanceUp(instance.name)
2693
2694
2695 class LUInstanceConsole(NoHooksLU):
2696 """Connect to an instance's console.
2697
2698 This is somewhat special in that it returns the command line that
2699 you need to run on the master node in order to connect to the
2700 console.
2701
2702 """
2703 REQ_BGL = False
2704
2705 def ExpandNames(self):
2706 self.share_locks = _ShareAll()
2707 self._ExpandAndLockInstance()
2708
2709 def CheckPrereq(self):
2710 """Check prerequisites.
2711
2712 This checks that the instance is in the cluster.
2713
2714 """
2715 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2716 assert self.instance is not None, \
2717 "Cannot retrieve locked instance %s" % self.op.instance_name
2718 _CheckNodeOnline(self, self.instance.primary_node)
2719
2720 def Exec(self, feedback_fn):
2721 """Connect to the console of an instance
2722
2723 """
2724 instance = self.instance
2725 node = instance.primary_node
2726
2727 node_insts = self.rpc.call_instance_list([node],
2728 [instance.hypervisor])[node]
2729 node_insts.Raise("Can't get node information from %s" % node)
2730
2731 if instance.name not in node_insts.payload:
2732 if instance.admin_state == constants.ADMINST_UP:
2733 state = constants.INSTST_ERRORDOWN
2734 elif instance.admin_state == constants.ADMINST_DOWN:
2735 state = constants.INSTST_ADMINDOWN
2736 else:
2737 state = constants.INSTST_ADMINOFFLINE
2738 raise errors.OpExecError("Instance %s is not running (state %s)" %
2739 (instance.name, state))
2740
2741 logging.debug("Connecting to console of %s on %s", instance.name, node)
2742
2743 return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
2744
2745
2746 def _DeclareLocksForMigration(lu, level):
2747 """Declares locks for L{TLMigrateInstance}.
2748
2749 @type lu: L{LogicalUnit}
2750 @param level: Lock level
2751
2752 """
2753 if level == locking.LEVEL_NODE_ALLOC:
2754 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2755
2756 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2757
2758 # Node locks are already declared here rather than at LEVEL_NODE as we need
2759 # the instance object anyway to declare the node allocation lock.
2760 if instance.disk_template in constants.DTS_EXT_MIRROR:
2761 if lu.op.target_node is None:
2762 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2763 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2764 else:
2765 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2766 lu.op.target_node]
2767 del lu.recalculate_locks[locking.LEVEL_NODE]
2768 else:
2769 lu._LockInstancesNodes() # pylint: disable=W0212
2770
2771 elif level == locking.LEVEL_NODE:
2772 # Node locks are declared together with the node allocation lock
2773 assert (lu.needed_locks[locking.LEVEL_NODE] or
2774 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2775
2776 elif level == locking.LEVEL_NODE_RES:
2777 # Copy node locks
2778 lu.needed_locks[locking.LEVEL_NODE_RES] = \
2779 _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2780
2781
2782 def _ExpandNamesForMigration(lu):
2783 """Expands names for use with L{TLMigrateInstance}.
2784
2785 @type lu: L{LogicalUnit}
2786
2787 """
2788 if lu.op.target_node is not None:
2789 lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2790
2791 lu.needed_locks[locking.LEVEL_NODE] = []
2792 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2793
2794 lu.needed_locks[locking.LEVEL_NODE_RES] = []
2795 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2796
2797 # The node allocation lock is actually only needed for externally replicated
2798 # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2799 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2800
2801
2802 class LUInstanceFailover(LogicalUnit):
2803 """Failover an instance.
2804
2805 """
2806 HPATH = "instance-failover"
2807 HTYPE = constants.HTYPE_INSTANCE
2808 REQ_BGL = False
2809
2810 def CheckArguments(self):
2811 """Check the arguments.
2812
2813 """
2814 self.iallocator = getattr(self.op, "iallocator", None)
2815 self.target_node = getattr(self.op, "target_node", None)
2816
2817 def ExpandNames(self):
2818 self._ExpandAndLockInstance()
2819 _ExpandNamesForMigration(self)
2820
2821 self._migrater = \
2822 TLMigrateInstance(self, self.op.instance_name, False, True, False,
2823 self.op.ignore_consistency, True,
2824 self.op.shutdown_timeout, self.op.ignore_ipolicy)
2825
2826 self.tasklets = [self._migrater]
2827
2828 def DeclareLocks(self, level):
2829 _DeclareLocksForMigration(self, level)
2830
2831 def BuildHooksEnv(self):
2832 """Build hooks env.
2833
2834 This runs on master, primary and secondary nodes of the instance.
2835
2836 """
2837 instance = self._migrater.instance
2838 source_node = instance.primary_node
2839 target_node = self.op.target_node
2840 env = {
2841 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2842 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2843 "OLD_PRIMARY": source_node,
2844 "NEW_PRIMARY": target_node,
2845 }
2846
2847 if instance.disk_template in constants.DTS_INT_MIRROR:
2848 env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2849 env["NEW_SECONDARY"] = source_node
2850 else:
2851 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2852
2853 env.update(_BuildInstanceHookEnvByObject(self, instance))
2854
2855 return env
2856
2857 def BuildHooksNodes(self):
2858 """Build hooks nodes.
2859
2860 """
2861 instance = self._migrater.instance
2862 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2863 return (nl, nl + [instance.primary_node])
2864
2865
2866 class LUInstanceMigrate(LogicalUnit):
2867 """Migrate an instance.
2868
2869 This is migration without shutting down, compared to the failover,
2870 which is done with shutdown.
2871
2872 """
2873 HPATH = "instance-migrate"
2874 HTYPE = constants.HTYPE_INSTANCE
2875 REQ_BGL = False
2876
2877 def ExpandNames(self):
2878 self._ExpandAndLockInstance()
2879 _ExpandNamesForMigration(self)
2880
2881 self._migrater = \
2882 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2883 False, self.op.allow_failover, False,
2884 self.op.allow_runtime_changes,
2885 constants.DEFAULT_SHUTDOWN_TIMEOUT,
2886 self.op.ignore_ipolicy)
2887
2888 self.tasklets = [self._migrater]
2889
2890 def DeclareLocks(self, level):
2891 _DeclareLocksForMigration(self, level)
2892
2893 def BuildHooksEnv(self):
2894 """Build hooks env.
2895
2896 This runs on master, primary and secondary nodes of the instance.
2897
2898 """
2899 instance = self._migrater.instance
2900 source_node = instance.primary_node
2901 target_node = self.op.target_node
2902 env = _BuildInstanceHookEnvByObject(self, instance)
2903 env.update({
2904 "MIGRATE_LIVE": self._migrater.live,
2905 "MIGRATE_CLEANUP": self.op.cleanup,
2906 "OLD_PRIMARY": source_node,
2907 "NEW_PRIMARY": target_node,
2908 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2909 })
2910
2911 if instance.disk_template in constants.DTS_INT_MIRROR:
2912 env["OLD_SECONDARY"] = target_node
2913 env["NEW_SECONDARY"] = source_node
2914 else:
2915 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2916
2917 return env
2918
2919 def BuildHooksNodes(self):
2920 """Build hooks nodes.
2921
2922 """
2923 instance = self._migrater.instance
2924 snodes = list(instance.secondary_nodes)
2925 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2926 return (nl, nl)
2927
2928
2929 class LUInstanceMultiAlloc(NoHooksLU):
2930 """Allocates multiple instances at the same time.
2931
2932 """
2933 REQ_BGL = False
2934
2935 def CheckArguments(self):
2936 """Check arguments.
2937
2938 """
2939 nodes = []
2940 for inst in self.op.instances:
2941 if inst.iallocator is not None:
2942 raise errors.OpPrereqError("iallocator are not allowed to be set on"
2943 " instance objects", errors.ECODE_INVAL)
2944 nodes.append(bool(inst.pnode))
2945 if inst.disk_template in constants.DTS_INT_MIRROR:
2946 nodes.append(bool(inst.snode))
2947
2948 has_nodes = compat.any(nodes)
2949 if compat.all(nodes) ^ has_nodes:
2950 raise errors.OpPrereqError("There are instance objects providing"
2951 " pnode/snode while others do not",
2952 errors.ECODE_INVAL)
2953
2954 if self.op.iallocator is None:
2955 default_iallocator = self.cfg.GetDefaultIAllocator()
2956 if default_iallocator and has_nodes:
2957 self.op.iallocator = default_iallocator
2958 else:
2959 raise errors.OpPrereqError("No iallocator or nodes on the instances"
2960 " given and no cluster-wide default"
2961 " iallocator found; please specify either"
2962 " an iallocator or nodes on the instances"
2963 " or set a cluster-wide default iallocator",
2964 errors.ECODE_INVAL)
2965
2966 _CheckOpportunisticLocking(self.op)
2967
2968 dups = utils.FindDuplicates([op.instance_name for op in self.op.instances])
2969 if dups:
2970 raise errors.OpPrereqError("There are duplicate instance names: %s" %
2971 utils.CommaJoin(dups), errors.ECODE_INVAL)
2972
2973 def ExpandNames(self):
2974 """Calculate the locks.
2975
2976 """
2977 self.share_locks = _ShareAll()
2978 self.needed_locks = {
2979 # iallocator will select nodes and even if no iallocator is used,
2980 # collisions with LUInstanceCreate should be avoided
2981 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2982 }
2983
2984 if self.op.iallocator:
2985 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2986 self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
2987
2988 if self.op.opportunistic_locking:
2989 self.opportunistic_locks[locking.LEVEL_NODE] = True
2990 self.opportunistic_locks[locking.LEVEL_NODE_RES] = True
2991 else:
2992 nodeslist = []
2993 for inst in self.op.instances:
2994 inst.pnode = _ExpandNodeName(self.cfg, inst.pnode)
2995 nodeslist.append(inst.pnode)
2996 if inst.snode is not None:
2997 inst.snode = _ExpandNodeName(self.cfg, inst.snode)
2998 nodeslist.append(inst.snode)
2999
3000 self.needed_locks[locking.LEVEL_NODE] = nodeslist
3001 # Lock resources of instance's primary and secondary nodes (copy to
3002 # prevent accidential modification)
3003 self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist)
3004
3005 def CheckPrereq(self):
3006 """Check prerequisite.
3007
3008 """
3009 cluster = self.cfg.GetClusterInfo()
3010 default_vg = self.cfg.GetVGName()
3011 ec_id = self.proc.GetECId()
3012
3013 if self.op.opportunistic_locking:
3014 # Only consider nodes for which a lock is held
3015 node_whitelist = list(self.owned_locks(locking.LEVEL_NODE))
3016 else:
3017 node_whitelist = None
3018
3019 insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg),
3020 _ComputeNics(op, cluster, None,
3021 self.cfg, ec_id),
3022 _ComputeFullBeParams(op, cluster),
3023 node_whitelist)
3024 for op in self.op.instances]
3025
3026 req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
3027 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3028
3029 ial.Run(self.op.iallocator)
3030
3031 if not ial.success:
3032 raise errors.OpPrereqError("Can't compute nodes using"
3033 " iallocator '%s': %s" %
3034 (self.op.iallocator, ial.info),
3035 errors.ECODE_NORES)
3036
3037 self.ia_result = ial.result
3038
3039 if self.op.dry_run:
3040 self.dry_run_result = objects.FillDict(self._ConstructPartialResult(), {
3041 constants.JOB_IDS_KEY: [],
3042 })
3043
3044 def _ConstructPartialResult(self):
3045 """Contructs the partial result.
3046
3047 """
3048 (allocatable, failed) = self.ia_result
3049 return {
3050 opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY:
3051 map(compat.fst, allocatable),
3052 opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed,
3053 }
3054
3055 def Exec(self, feedback_fn):
3056 """Executes the opcode.
3057
3058 """
3059 op2inst = dict((op.instance_name, op) for op in self.op.instances)
3060 (allocatable, failed) = self.ia_result
3061
3062 jobs = []
3063 for (name, nodes) in allocatable:
3064 op = op2inst.pop(name)
3065
3066 if len(nodes) > 1: