cmdlib: Cleanup public/private functions
[ganeti-github.git] / lib / cmdlib / instance_migration.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with instance migration an failover."""
23
24 import logging
25 import time
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti.masterd import iallocator
31 from ganeti import utils
32 from ganeti.cmdlib.base import LogicalUnit, Tasklet
33 from ganeti.cmdlib.common import ExpandInstanceName, \
34 CheckIAllocatorOrNode, ExpandNodeName
35 from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
36 ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
37 from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
38 CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
39 CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
40
41 import ganeti.masterd.instance
42
43
44 def _ExpandNamesForMigration(lu):
45 """Expands names for use with L{TLMigrateInstance}.
46
47 @type lu: L{LogicalUnit}
48
49 """
50 if lu.op.target_node is not None:
51 lu.op.target_node = ExpandNodeName(lu.cfg, lu.op.target_node)
52
53 lu.needed_locks[locking.LEVEL_NODE] = []
54 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
55
56 lu.needed_locks[locking.LEVEL_NODE_RES] = []
57 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
58
59 # The node allocation lock is actually only needed for externally replicated
60 # instances (e.g. sharedfile or RBD) and if an iallocator is used.
61 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
62
63
64 def _DeclareLocksForMigration(lu, level):
65 """Declares locks for L{TLMigrateInstance}.
66
67 @type lu: L{LogicalUnit}
68 @param level: Lock level
69
70 """
71 if level == locking.LEVEL_NODE_ALLOC:
72 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
73
74 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
75
76 # Node locks are already declared here rather than at LEVEL_NODE as we need
77 # the instance object anyway to declare the node allocation lock.
78 if instance.disk_template in constants.DTS_EXT_MIRROR:
79 if lu.op.target_node is None:
80 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
81 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
82 else:
83 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
84 lu.op.target_node]
85 del lu.recalculate_locks[locking.LEVEL_NODE]
86 else:
87 lu._LockInstancesNodes() # pylint: disable=W0212
88
89 elif level == locking.LEVEL_NODE:
90 # Node locks are declared together with the node allocation lock
91 assert (lu.needed_locks[locking.LEVEL_NODE] or
92 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
93
94 elif level == locking.LEVEL_NODE_RES:
95 # Copy node locks
96 lu.needed_locks[locking.LEVEL_NODE_RES] = \
97 CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
98
99
100 class LUInstanceFailover(LogicalUnit):
101 """Failover an instance.
102
103 """
104 HPATH = "instance-failover"
105 HTYPE = constants.HTYPE_INSTANCE
106 REQ_BGL = False
107
108 def CheckArguments(self):
109 """Check the arguments.
110
111 """
112 self.iallocator = getattr(self.op, "iallocator", None)
113 self.target_node = getattr(self.op, "target_node", None)
114
115 def ExpandNames(self):
116 self._ExpandAndLockInstance()
117 _ExpandNamesForMigration(self)
118
119 self._migrater = \
120 TLMigrateInstance(self, self.op.instance_name, False, True, False,
121 self.op.ignore_consistency, True,
122 self.op.shutdown_timeout, self.op.ignore_ipolicy)
123
124 self.tasklets = [self._migrater]
125
126 def DeclareLocks(self, level):
127 _DeclareLocksForMigration(self, level)
128
129 def BuildHooksEnv(self):
130 """Build hooks env.
131
132 This runs on master, primary and secondary nodes of the instance.
133
134 """
135 instance = self._migrater.instance
136 source_node = instance.primary_node
137 target_node = self.op.target_node
138 env = {
139 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
140 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
141 "OLD_PRIMARY": source_node,
142 "NEW_PRIMARY": target_node,
143 }
144
145 if instance.disk_template in constants.DTS_INT_MIRROR:
146 env["OLD_SECONDARY"] = instance.secondary_nodes[0]
147 env["NEW_SECONDARY"] = source_node
148 else:
149 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
150
151 env.update(BuildInstanceHookEnvByObject(self, instance))
152
153 return env
154
155 def BuildHooksNodes(self):
156 """Build hooks nodes.
157
158 """
159 instance = self._migrater.instance
160 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
161 return (nl, nl + [instance.primary_node])
162
163
164 class LUInstanceMigrate(LogicalUnit):
165 """Migrate an instance.
166
167 This is migration without shutting down, compared to the failover,
168 which is done with shutdown.
169
170 """
171 HPATH = "instance-migrate"
172 HTYPE = constants.HTYPE_INSTANCE
173 REQ_BGL = False
174
175 def ExpandNames(self):
176 self._ExpandAndLockInstance()
177 _ExpandNamesForMigration(self)
178
179 self._migrater = \
180 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
181 False, self.op.allow_failover, False,
182 self.op.allow_runtime_changes,
183 constants.DEFAULT_SHUTDOWN_TIMEOUT,
184 self.op.ignore_ipolicy)
185
186 self.tasklets = [self._migrater]
187
188 def DeclareLocks(self, level):
189 _DeclareLocksForMigration(self, level)
190
191 def BuildHooksEnv(self):
192 """Build hooks env.
193
194 This runs on master, primary and secondary nodes of the instance.
195
196 """
197 instance = self._migrater.instance
198 source_node = instance.primary_node
199 target_node = self.op.target_node
200 env = BuildInstanceHookEnvByObject(self, instance)
201 env.update({
202 "MIGRATE_LIVE": self._migrater.live,
203 "MIGRATE_CLEANUP": self.op.cleanup,
204 "OLD_PRIMARY": source_node,
205 "NEW_PRIMARY": target_node,
206 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
207 })
208
209 if instance.disk_template in constants.DTS_INT_MIRROR:
210 env["OLD_SECONDARY"] = target_node
211 env["NEW_SECONDARY"] = source_node
212 else:
213 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
214
215 return env
216
217 def BuildHooksNodes(self):
218 """Build hooks nodes.
219
220 """
221 instance = self._migrater.instance
222 snodes = list(instance.secondary_nodes)
223 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
224 return (nl, nl)
225
226
227 class TLMigrateInstance(Tasklet):
228 """Tasklet class for instance migration.
229
230 @type live: boolean
231 @ivar live: whether the migration will be done live or non-live;
232 this variable is initalized only after CheckPrereq has run
233 @type cleanup: boolean
234 @ivar cleanup: Wheater we cleanup from a failed migration
235 @type iallocator: string
236 @ivar iallocator: The iallocator used to determine target_node
237 @type target_node: string
238 @ivar target_node: If given, the target_node to reallocate the instance to
239 @type failover: boolean
240 @ivar failover: Whether operation results in failover or migration
241 @type fallback: boolean
242 @ivar fallback: Whether fallback to failover is allowed if migration not
243 possible
244 @type ignore_consistency: boolean
245 @ivar ignore_consistency: Wheter we should ignore consistency between source
246 and target node
247 @type shutdown_timeout: int
248 @ivar shutdown_timeout: In case of failover timeout of the shutdown
249 @type ignore_ipolicy: bool
250 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
251
252 """
253
254 # Constants
255 _MIGRATION_POLL_INTERVAL = 1 # seconds
256 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
257
258 def __init__(self, lu, instance_name, cleanup, failover, fallback,
259 ignore_consistency, allow_runtime_changes, shutdown_timeout,
260 ignore_ipolicy):
261 """Initializes this class.
262
263 """
264 Tasklet.__init__(self, lu)
265
266 # Parameters
267 self.instance_name = instance_name
268 self.cleanup = cleanup
269 self.live = False # will be overridden later
270 self.failover = failover
271 self.fallback = fallback
272 self.ignore_consistency = ignore_consistency
273 self.shutdown_timeout = shutdown_timeout
274 self.ignore_ipolicy = ignore_ipolicy
275 self.allow_runtime_changes = allow_runtime_changes
276
277 def CheckPrereq(self):
278 """Check prerequisites.
279
280 This checks that the instance is in the cluster.
281
282 """
283 instance_name = ExpandInstanceName(self.lu.cfg, self.instance_name)
284 instance = self.cfg.GetInstanceInfo(instance_name)
285 assert instance is not None
286 self.instance = instance
287 cluster = self.cfg.GetClusterInfo()
288
289 if (not self.cleanup and
290 not instance.admin_state == constants.ADMINST_UP and
291 not self.failover and self.fallback):
292 self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
293 " switching to failover")
294 self.failover = True
295
296 if instance.disk_template not in constants.DTS_MIRRORED:
297 if self.failover:
298 text = "failovers"
299 else:
300 text = "migrations"
301 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
302 " %s" % (instance.disk_template, text),
303 errors.ECODE_STATE)
304
305 if instance.disk_template in constants.DTS_EXT_MIRROR:
306 CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
307
308 if self.lu.op.iallocator:
309 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
310 self._RunAllocator()
311 else:
312 # We set set self.target_node as it is required by
313 # BuildHooksEnv
314 self.target_node = self.lu.op.target_node
315
316 # Check that the target node is correct in terms of instance policy
317 nodeinfo = self.cfg.GetNodeInfo(self.target_node)
318 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
319 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
320 group_info)
321 CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
322 ignore=self.ignore_ipolicy)
323
324 # self.target_node is already populated, either directly or by the
325 # iallocator run
326 target_node = self.target_node
327 if self.target_node == instance.primary_node:
328 raise errors.OpPrereqError("Cannot migrate instance %s"
329 " to its primary (%s)" %
330 (instance.name, instance.primary_node),
331 errors.ECODE_STATE)
332
333 if len(self.lu.tasklets) == 1:
334 # It is safe to release locks only when we're the only tasklet
335 # in the LU
336 ReleaseLocks(self.lu, locking.LEVEL_NODE,
337 keep=[instance.primary_node, self.target_node])
338 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
339
340 else:
341 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
342
343 secondary_nodes = instance.secondary_nodes
344 if not secondary_nodes:
345 raise errors.ConfigurationError("No secondary node but using"
346 " %s disk template" %
347 instance.disk_template)
348 target_node = secondary_nodes[0]
349 if self.lu.op.iallocator or (self.lu.op.target_node and
350 self.lu.op.target_node != target_node):
351 if self.failover:
352 text = "failed over"
353 else:
354 text = "migrated"
355 raise errors.OpPrereqError("Instances with disk template %s cannot"
356 " be %s to arbitrary nodes"
357 " (neither an iallocator nor a target"
358 " node can be passed)" %
359 (instance.disk_template, text),
360 errors.ECODE_INVAL)
361 nodeinfo = self.cfg.GetNodeInfo(target_node)
362 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
363 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
364 group_info)
365 CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
366 ignore=self.ignore_ipolicy)
367
368 i_be = cluster.FillBE(instance)
369
370 # check memory requirements on the secondary node
371 if (not self.cleanup and
372 (not self.failover or instance.admin_state == constants.ADMINST_UP)):
373 self.tgt_free_mem = CheckNodeFreeMemory(self.lu, target_node,
374 "migrating instance %s" %
375 instance.name,
376 i_be[constants.BE_MINMEM],
377 instance.hypervisor)
378 else:
379 self.lu.LogInfo("Not checking memory on the secondary node as"
380 " instance will not be started")
381
382 # check if failover must be forced instead of migration
383 if (not self.cleanup and not self.failover and
384 i_be[constants.BE_ALWAYS_FAILOVER]):
385 self.lu.LogInfo("Instance configured to always failover; fallback"
386 " to failover")
387 self.failover = True
388
389 # check bridge existance
390 CheckInstanceBridgesExist(self.lu, instance, node=target_node)
391
392 if not self.cleanup:
393 CheckNodeNotDrained(self.lu, target_node)
394 if not self.failover:
395 result = self.rpc.call_instance_migratable(instance.primary_node,
396 instance)
397 if result.fail_msg and self.fallback:
398 self.lu.LogInfo("Can't migrate, instance offline, fallback to"
399 " failover")
400 self.failover = True
401 else:
402 result.Raise("Can't migrate, please use failover",
403 prereq=True, ecode=errors.ECODE_STATE)
404
405 assert not (self.failover and self.cleanup)
406
407 if not self.failover:
408 if self.lu.op.live is not None and self.lu.op.mode is not None:
409 raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
410 " parameters are accepted",
411 errors.ECODE_INVAL)
412 if self.lu.op.live is not None:
413 if self.lu.op.live:
414 self.lu.op.mode = constants.HT_MIGRATION_LIVE
415 else:
416 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
417 # reset the 'live' parameter to None so that repeated
418 # invocations of CheckPrereq do not raise an exception
419 self.lu.op.live = None
420 elif self.lu.op.mode is None:
421 # read the default value from the hypervisor
422 i_hv = cluster.FillHV(self.instance, skip_globals=False)
423 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
424
425 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
426 else:
427 # Failover is never live
428 self.live = False
429
430 if not (self.failover or self.cleanup):
431 remote_info = self.rpc.call_instance_info(instance.primary_node,
432 instance.name,
433 instance.hypervisor)
434 remote_info.Raise("Error checking instance on node %s" %
435 instance.primary_node)
436 instance_running = bool(remote_info.payload)
437 if instance_running:
438 self.current_mem = int(remote_info.payload["memory"])
439
440 def _RunAllocator(self):
441 """Run the allocator based on input opcode.
442
443 """
444 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
445
446 # FIXME: add a self.ignore_ipolicy option
447 req = iallocator.IAReqRelocate(name=self.instance_name,
448 relocate_from=[self.instance.primary_node])
449 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
450
451 ial.Run(self.lu.op.iallocator)
452
453 if not ial.success:
454 raise errors.OpPrereqError("Can't compute nodes using"
455 " iallocator '%s': %s" %
456 (self.lu.op.iallocator, ial.info),
457 errors.ECODE_NORES)
458 self.target_node = ial.result[0]
459 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
460 self.instance_name, self.lu.op.iallocator,
461 utils.CommaJoin(ial.result))
462
463 def _WaitUntilSync(self):
464 """Poll with custom rpc for disk sync.
465
466 This uses our own step-based rpc call.
467
468 """
469 self.feedback_fn("* wait until resync is done")
470 all_done = False
471 while not all_done:
472 all_done = True
473 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
474 self.nodes_ip,
475 (self.instance.disks,
476 self.instance))
477 min_percent = 100
478 for node, nres in result.items():
479 nres.Raise("Cannot resync disks on node %s" % node)
480 node_done, node_percent = nres.payload
481 all_done = all_done and node_done
482 if node_percent is not None:
483 min_percent = min(min_percent, node_percent)
484 if not all_done:
485 if min_percent < 100:
486 self.feedback_fn(" - progress: %.1f%%" % min_percent)
487 time.sleep(2)
488
489 def _EnsureSecondary(self, node):
490 """Demote a node to secondary.
491
492 """
493 self.feedback_fn("* switching node %s to secondary mode" % node)
494
495 for dev in self.instance.disks:
496 self.cfg.SetDiskID(dev, node)
497
498 result = self.rpc.call_blockdev_close(node, self.instance.name,
499 self.instance.disks)
500 result.Raise("Cannot change disk to secondary on node %s" % node)
501
502 def _GoStandalone(self):
503 """Disconnect from the network.
504
505 """
506 self.feedback_fn("* changing into standalone mode")
507 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
508 self.instance.disks)
509 for node, nres in result.items():
510 nres.Raise("Cannot disconnect disks node %s" % node)
511
512 def _GoReconnect(self, multimaster):
513 """Reconnect to the network.
514
515 """
516 if multimaster:
517 msg = "dual-master"
518 else:
519 msg = "single-master"
520 self.feedback_fn("* changing disks into %s mode" % msg)
521 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
522 (self.instance.disks, self.instance),
523 self.instance.name, multimaster)
524 for node, nres in result.items():
525 nres.Raise("Cannot change disks config on node %s" % node)
526
527 def _ExecCleanup(self):
528 """Try to cleanup after a failed migration.
529
530 The cleanup is done by:
531 - check that the instance is running only on one node
532 (and update the config if needed)
533 - change disks on its secondary node to secondary
534 - wait until disks are fully synchronized
535 - disconnect from the network
536 - change disks into single-master mode
537 - wait again until disks are fully synchronized
538
539 """
540 instance = self.instance
541 target_node = self.target_node
542 source_node = self.source_node
543
544 # check running on only one node
545 self.feedback_fn("* checking where the instance actually runs"
546 " (if this hangs, the hypervisor might be in"
547 " a bad state)")
548 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
549 for node, result in ins_l.items():
550 result.Raise("Can't contact node %s" % node)
551
552 runningon_source = instance.name in ins_l[source_node].payload
553 runningon_target = instance.name in ins_l[target_node].payload
554
555 if runningon_source and runningon_target:
556 raise errors.OpExecError("Instance seems to be running on two nodes,"
557 " or the hypervisor is confused; you will have"
558 " to ensure manually that it runs only on one"
559 " and restart this operation")
560
561 if not (runningon_source or runningon_target):
562 raise errors.OpExecError("Instance does not seem to be running at all;"
563 " in this case it's safer to repair by"
564 " running 'gnt-instance stop' to ensure disk"
565 " shutdown, and then restarting it")
566
567 if runningon_target:
568 # the migration has actually succeeded, we need to update the config
569 self.feedback_fn("* instance running on secondary node (%s),"
570 " updating config" % target_node)
571 instance.primary_node = target_node
572 self.cfg.Update(instance, self.feedback_fn)
573 demoted_node = source_node
574 else:
575 self.feedback_fn("* instance confirmed to be running on its"
576 " primary node (%s)" % source_node)
577 demoted_node = target_node
578
579 if instance.disk_template in constants.DTS_INT_MIRROR:
580 self._EnsureSecondary(demoted_node)
581 try:
582 self._WaitUntilSync()
583 except errors.OpExecError:
584 # we ignore here errors, since if the device is standalone, it
585 # won't be able to sync
586 pass
587 self._GoStandalone()
588 self._GoReconnect(False)
589 self._WaitUntilSync()
590
591 self.feedback_fn("* done")
592
593 def _RevertDiskStatus(self):
594 """Try to revert the disk status after a failed migration.
595
596 """
597 target_node = self.target_node
598 if self.instance.disk_template in constants.DTS_EXT_MIRROR:
599 return
600
601 try:
602 self._EnsureSecondary(target_node)
603 self._GoStandalone()
604 self._GoReconnect(False)
605 self._WaitUntilSync()
606 except errors.OpExecError, err:
607 self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
608 " please try to recover the instance manually;"
609 " error '%s'" % str(err))
610
611 def _AbortMigration(self):
612 """Call the hypervisor code to abort a started migration.
613
614 """
615 instance = self.instance
616 target_node = self.target_node
617 source_node = self.source_node
618 migration_info = self.migration_info
619
620 abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
621 instance,
622 migration_info,
623 False)
624 abort_msg = abort_result.fail_msg
625 if abort_msg:
626 logging.error("Aborting migration failed on target node %s: %s",
627 target_node, abort_msg)
628 # Don't raise an exception here, as we stil have to try to revert the
629 # disk status, even if this step failed.
630
631 abort_result = self.rpc.call_instance_finalize_migration_src(
632 source_node, instance, False, self.live)
633 abort_msg = abort_result.fail_msg
634 if abort_msg:
635 logging.error("Aborting migration failed on source node %s: %s",
636 source_node, abort_msg)
637
638 def _ExecMigration(self):
639 """Migrate an instance.
640
641 The migrate is done by:
642 - change the disks into dual-master mode
643 - wait until disks are fully synchronized again
644 - migrate the instance
645 - change disks on the new secondary node (the old primary) to secondary
646 - wait until disks are fully synchronized
647 - change disks into single-master mode
648
649 """
650 instance = self.instance
651 target_node = self.target_node
652 source_node = self.source_node
653
654 # Check for hypervisor version mismatch and warn the user.
655 nodeinfo = self.rpc.call_node_info([source_node, target_node],
656 None, [self.instance.hypervisor], False)
657 for ninfo in nodeinfo.values():
658 ninfo.Raise("Unable to retrieve node information from node '%s'" %
659 ninfo.node)
660 (_, _, (src_info, )) = nodeinfo[source_node].payload
661 (_, _, (dst_info, )) = nodeinfo[target_node].payload
662
663 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
664 (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
665 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
666 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
667 if src_version != dst_version:
668 self.feedback_fn("* warning: hypervisor version mismatch between"
669 " source (%s) and target (%s) node" %
670 (src_version, dst_version))
671
672 self.feedback_fn("* checking disk consistency between source and target")
673 for (idx, dev) in enumerate(instance.disks):
674 if not CheckDiskConsistency(self.lu, instance, dev, target_node, False):
675 raise errors.OpExecError("Disk %s is degraded or not fully"
676 " synchronized on target node,"
677 " aborting migration" % idx)
678
679 if self.current_mem > self.tgt_free_mem:
680 if not self.allow_runtime_changes:
681 raise errors.OpExecError("Memory ballooning not allowed and not enough"
682 " free memory to fit instance %s on target"
683 " node %s (have %dMB, need %dMB)" %
684 (instance.name, target_node,
685 self.tgt_free_mem, self.current_mem))
686 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
687 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
688 instance,
689 self.tgt_free_mem)
690 rpcres.Raise("Cannot modify instance runtime memory")
691
692 # First get the migration information from the remote node
693 result = self.rpc.call_migration_info(source_node, instance)
694 msg = result.fail_msg
695 if msg:
696 log_err = ("Failed fetching source migration information from %s: %s" %
697 (source_node, msg))
698 logging.error(log_err)
699 raise errors.OpExecError(log_err)
700
701 self.migration_info = migration_info = result.payload
702
703 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
704 # Then switch the disks to master/master mode
705 self._EnsureSecondary(target_node)
706 self._GoStandalone()
707 self._GoReconnect(True)
708 self._WaitUntilSync()
709
710 self.feedback_fn("* preparing %s to accept the instance" % target_node)
711 result = self.rpc.call_accept_instance(target_node,
712 instance,
713 migration_info,
714 self.nodes_ip[target_node])
715
716 msg = result.fail_msg
717 if msg:
718 logging.error("Instance pre-migration failed, trying to revert"
719 " disk status: %s", msg)
720 self.feedback_fn("Pre-migration failed, aborting")
721 self._AbortMigration()
722 self._RevertDiskStatus()
723 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
724 (instance.name, msg))
725
726 self.feedback_fn("* migrating instance to %s" % target_node)
727 result = self.rpc.call_instance_migrate(source_node, instance,
728 self.nodes_ip[target_node],
729 self.live)
730 msg = result.fail_msg
731 if msg:
732 logging.error("Instance migration failed, trying to revert"
733 " disk status: %s", msg)
734 self.feedback_fn("Migration failed, aborting")
735 self._AbortMigration()
736 self._RevertDiskStatus()
737 raise errors.OpExecError("Could not migrate instance %s: %s" %
738 (instance.name, msg))
739
740 self.feedback_fn("* starting memory transfer")
741 last_feedback = time.time()
742 while True:
743 result = self.rpc.call_instance_get_migration_status(source_node,
744 instance)
745 msg = result.fail_msg
746 ms = result.payload # MigrationStatus instance
747 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
748 logging.error("Instance migration failed, trying to revert"
749 " disk status: %s", msg)
750 self.feedback_fn("Migration failed, aborting")
751 self._AbortMigration()
752 self._RevertDiskStatus()
753 if not msg:
754 msg = "hypervisor returned failure"
755 raise errors.OpExecError("Could not migrate instance %s: %s" %
756 (instance.name, msg))
757
758 if result.payload.status != constants.HV_MIGRATION_ACTIVE:
759 self.feedback_fn("* memory transfer complete")
760 break
761
762 if (utils.TimeoutExpired(last_feedback,
763 self._MIGRATION_FEEDBACK_INTERVAL) and
764 ms.transferred_ram is not None):
765 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
766 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
767 last_feedback = time.time()
768
769 time.sleep(self._MIGRATION_POLL_INTERVAL)
770
771 result = self.rpc.call_instance_finalize_migration_src(source_node,
772 instance,
773 True,
774 self.live)
775 msg = result.fail_msg
776 if msg:
777 logging.error("Instance migration succeeded, but finalization failed"
778 " on the source node: %s", msg)
779 raise errors.OpExecError("Could not finalize instance migration: %s" %
780 msg)
781
782 instance.primary_node = target_node
783
784 # distribute new instance config to the other nodes
785 self.cfg.Update(instance, self.feedback_fn)
786
787 result = self.rpc.call_instance_finalize_migration_dst(target_node,
788 instance,
789 migration_info,
790 True)
791 msg = result.fail_msg
792 if msg:
793 logging.error("Instance migration succeeded, but finalization failed"
794 " on the target node: %s", msg)
795 raise errors.OpExecError("Could not finalize instance migration: %s" %
796 msg)
797
798 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
799 self._EnsureSecondary(source_node)
800 self._WaitUntilSync()
801 self._GoStandalone()
802 self._GoReconnect(False)
803 self._WaitUntilSync()
804
805 # If the instance's disk template is `rbd' or `ext' and there was a
806 # successful migration, unmap the device from the source node.
807 if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
808 disks = ExpandCheckDisks(instance, instance.disks)
809 self.feedback_fn("* unmapping instance's disks from %s" % source_node)
810 for disk in disks:
811 result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
812 msg = result.fail_msg
813 if msg:
814 logging.error("Migration was successful, but couldn't unmap the"
815 " block device %s on source node %s: %s",
816 disk.iv_name, source_node, msg)
817 logging.error("You need to unmap the device %s manually on %s",
818 disk.iv_name, source_node)
819
820 self.feedback_fn("* done")
821
822 def _ExecFailover(self):
823 """Failover an instance.
824
825 The failover is done by shutting it down on its present node and
826 starting it on the secondary.
827
828 """
829 instance = self.instance
830 primary_node = self.cfg.GetNodeInfo(instance.primary_node)
831
832 source_node = instance.primary_node
833 target_node = self.target_node
834
835 if instance.admin_state == constants.ADMINST_UP:
836 self.feedback_fn("* checking disk consistency between source and target")
837 for (idx, dev) in enumerate(instance.disks):
838 # for drbd, these are drbd over lvm
839 if not CheckDiskConsistency(self.lu, instance, dev, target_node,
840 False):
841 if primary_node.offline:
842 self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
843 " target node %s" %
844 (primary_node.name, idx, target_node))
845 elif not self.ignore_consistency:
846 raise errors.OpExecError("Disk %s is degraded on target node,"
847 " aborting failover" % idx)
848 else:
849 self.feedback_fn("* not checking disk consistency as instance is not"
850 " running")
851
852 self.feedback_fn("* shutting down instance on source node")
853 logging.info("Shutting down instance %s on node %s",
854 instance.name, source_node)
855
856 result = self.rpc.call_instance_shutdown(source_node, instance,
857 self.shutdown_timeout,
858 self.lu.op.reason)
859 msg = result.fail_msg
860 if msg:
861 if self.ignore_consistency or primary_node.offline:
862 self.lu.LogWarning("Could not shutdown instance %s on node %s,"
863 " proceeding anyway; please make sure node"
864 " %s is down; error details: %s",
865 instance.name, source_node, source_node, msg)
866 else:
867 raise errors.OpExecError("Could not shutdown instance %s on"
868 " node %s: %s" %
869 (instance.name, source_node, msg))
870
871 self.feedback_fn("* deactivating the instance's disks on source node")
872 if not ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
873 raise errors.OpExecError("Can't shut down the instance's disks")
874
875 instance.primary_node = target_node
876 # distribute new instance config to the other nodes
877 self.cfg.Update(instance, self.feedback_fn)
878
879 # Only start the instance if it's marked as up
880 if instance.admin_state == constants.ADMINST_UP:
881 self.feedback_fn("* activating the instance's disks on target node %s" %
882 target_node)
883 logging.info("Starting instance %s on node %s",
884 instance.name, target_node)
885
886 disks_ok, _ = AssembleInstanceDisks(self.lu, instance,
887 ignore_secondaries=True)
888 if not disks_ok:
889 ShutdownInstanceDisks(self.lu, instance)
890 raise errors.OpExecError("Can't activate the instance's disks")
891
892 self.feedback_fn("* starting the instance on the target node %s" %
893 target_node)
894 result = self.rpc.call_instance_start(target_node, (instance, None, None),
895 False, self.lu.op.reason)
896 msg = result.fail_msg
897 if msg:
898 ShutdownInstanceDisks(self.lu, instance)
899 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
900 (instance.name, target_node, msg))
901
902 def Exec(self, feedback_fn):
903 """Perform the migration.
904
905 """
906 self.feedback_fn = feedback_fn
907 self.source_node = self.instance.primary_node
908
909 # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
910 if self.instance.disk_template in constants.DTS_INT_MIRROR:
911 self.target_node = self.instance.secondary_nodes[0]
912 # Otherwise self.target_node has been populated either
913 # directly, or through an iallocator.
914
915 self.all_nodes = [self.source_node, self.target_node]
916 self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
917 in self.cfg.GetMultiNodeInfo(self.all_nodes))
918
919 if self.failover:
920 feedback_fn("Failover instance %s" % self.instance.name)
921 self._ExecFailover()
922 else:
923 feedback_fn("Migrating instance %s" % self.instance.name)
924
925 if self.cleanup:
926 return self._ExecCleanup()
927 else:
928 return self._ExecMigration()