Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Tool to merge two or more clusters together.
31
32 The clusters have to run the same version of Ganeti!
33
34 """
35
36 # pylint: disable=C0103
37 # C0103: Invalid name cluster-merge
38
39 import logging
40 import os
41 import optparse
42 import shutil
43 import sys
44 import tempfile
45
46 from ganeti import cli
47 from ganeti import config
48 from ganeti import constants
49 from ganeti import errors
50 from ganeti import ssh
51 from ganeti import utils
52 from ganeti import pathutils
53 from ganeti import compat
54
55
56 _GROUPS_MERGE = "merge"
57 _GROUPS_RENAME = "rename"
58 _CLUSTERMERGE_ECID = "clustermerge-ecid"
59 _RESTART_ALL = "all"
60 _RESTART_UP = "up"
61 _RESTART_NONE = "none"
62 _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
63 _PARAMS_STRICT = "strict"
64 _PARAMS_WARN = "warn"
65 _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
66
67
68 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
69                                   action="store", type="int",
70                                   dest="pause_period",
71                                   help=("Amount of time in seconds watcher"
72                                         " should be suspended from running"))
73 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
74                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
75                             dest="groups",
76                             help=("How to handle groups that have the"
77                                   " same name (One of: %s/%s)" %
78                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
79 PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
80                             metavar="STRATEGY",
81                             choices=_PARAMS_CHOICES,
82                             dest="params",
83                             help=("How to handle params that have"
84                                   " different values (One of: %s/%s)" %
85                                   _PARAMS_CHOICES))
86
87 RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
88                              metavar="STRATEGY",
89                              choices=_RESTART_CHOICES,
90                              dest="restart",
91                              help=("How to handle restarting instances"
92                                    " same name (One of: %s/%s/%s)" %
93                                    _RESTART_CHOICES))
94
95 SKIP_STOP_INSTANCES_OPT = \
96   cli.cli_option("--skip-stop-instances", default=True, action="store_false",
97                  dest="stop_instances",
98                  help=("Don't stop the instances on the clusters, just check "
99                        "that none is running"))
100
101
102 def Flatten(unflattened_list):
103   """Flattens a list.
104
105   @param unflattened_list: A list of unflattened list objects.
106   @return: A flattened list
107
108   """
109   flattened_list = []
110
111   for item in unflattened_list:
112     if isinstance(item, list):
113       flattened_list.extend(Flatten(item))
114     else:
115       flattened_list.append(item)
116   return flattened_list
117
118
119 class MergerData(object):
120   """Container class to hold data used for merger.
121
122   """
123   def __init__(self, cluster, key_path, nodes, instances, master_node,
124                config_path=None):
125     """Initialize the container.
126
127     @param cluster: The name of the cluster
128     @param key_path: Path to the ssh private key used for authentication
129     @param nodes: List of online nodes in the merging cluster
130     @param instances: List of instances running on merging cluster
131     @param master_node: Name of the master node
132     @param config_path: Path to the merging cluster config
133
134     """
135     self.cluster = cluster
136     self.key_path = key_path
137     self.nodes = nodes
138     self.instances = instances
139     self.master_node = master_node
140     self.config_path = config_path
141
142
143 class Merger(object):
144   """Handling the merge.
145
146   """
147   RUNNING_STATUSES = compat.UniqueFrozenset([
148     constants.INSTST_RUNNING,
149     constants.INSTST_ERRORUP,
150     ])
151
152   def __init__(self, clusters, pause_period, groups, restart, params,
153                stop_instances):
154     """Initialize object with sane defaults and infos required.
155
156     @param clusters: The list of clusters to merge in
157     @param pause_period: The time watcher shall be disabled for
158     @param groups: How to handle group conflicts
159     @param restart: How to handle instance restart
160     @param stop_instances: Indicates whether the instances must be stopped
161                            (True) or if the Merger must only check if no
162                            instances are running on the mergee clusters (False)
163
164     """
165     self.merger_data = []
166     self.clusters = clusters
167     self.pause_period = pause_period
168     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
169     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
170     self.ssh_runner = ssh.SshRunner(self.cluster_name)
171     self.groups = groups
172     self.restart = restart
173     self.params = params
174     self.stop_instances = stop_instances
175     if self.restart == _RESTART_UP:
176       raise NotImplementedError
177
178   def Setup(self):
179     """Sets up our end so we can do the merger.
180
181     This method is setting us up as a preparation for the merger.
182     It makes the initial contact and gathers information needed.
183
184     @raise errors.RemoteError: for errors in communication/grabbing
185
186     """
187     (remote_path, _, _) = ssh.GetUserFiles("root")
188
189     if self.cluster_name in self.clusters:
190       raise errors.CommandError("Cannot merge cluster %s with itself" %
191                                 self.cluster_name)
192
193     # Fetch remotes private key
194     for cluster in self.clusters:
195       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
196                             ask_key=False)
197       if result.failed:
198         raise errors.RemoteError("There was an error while grabbing ssh private"
199                                  " key from %s. Fail reason: %s; output: %s" %
200                                  (cluster, result.fail_reason, result.output))
201
202       key_path = utils.PathJoin(self.work_dir, cluster)
203       utils.WriteFile(key_path, mode=0600, data=result.stdout)
204
205       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
206                             " --no-headers --separator=,", private_key=key_path)
207       if result.failed:
208         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
209                                  " Fail reason: %s; output: %s" %
210                                  (cluster, result.fail_reason, result.output))
211       nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
212       nodes = [node_status[0] for node_status in nodes_statuses
213                if node_status[1] == "N"]
214
215       result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
216                             private_key=key_path)
217       if result.failed:
218         raise errors.RemoteError("Unable to retrieve list of instances from"
219                                  " %s. Fail reason: %s; output: %s" %
220                                  (cluster, result.fail_reason, result.output))
221       instances = result.stdout.splitlines()
222
223       path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
224                             constants.SS_MASTER_NODE)
225       result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
226       if result.failed:
227         raise errors.RemoteError("Unable to retrieve the master node name from"
228                                  " %s. Fail reason: %s; output: %s" %
229                                  (cluster, result.fail_reason, result.output))
230       master_node = result.stdout.strip()
231
232       self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
233                                          master_node))
234
235   def _PrepareAuthorizedKeys(self):
236     """Prepare the authorized_keys on every merging node.
237
238     This method add our public key to remotes authorized_key for further
239     communication.
240
241     """
242     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
243     pub_key = utils.ReadFile(pub_key_file)
244
245     for data in self.merger_data:
246       for node in data.nodes:
247         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
248                                      (auth_keys, pub_key)),
249                               private_key=data.key_path, max_attempts=3)
250
251         if result.failed:
252           raise errors.RemoteError("Unable to add our public key to %s in %s."
253                                    " Fail reason: %s; output: %s" %
254                                    (node, data.cluster, result.fail_reason,
255                                     result.output))
256
257   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
258               strict_host_check=False, private_key=None, batch=True,
259               ask_key=False, max_attempts=1):
260     """Wrapping SshRunner.Run with default parameters.
261
262     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
263
264     """
265     for _ in range(max_attempts):
266       result = self.ssh_runner.Run(hostname=hostname, command=command,
267                                    user=user, use_cluster_key=use_cluster_key,
268                                    strict_host_check=strict_host_check,
269                                    private_key=private_key, batch=batch,
270                                    ask_key=ask_key)
271       if not result.failed:
272         break
273
274     return result
275
276   def _CheckRunningInstances(self):
277     """Checks if on the clusters to be merged there are running instances
278
279     @rtype: boolean
280     @return: True if there are running instances, False otherwise
281
282     """
283     for cluster in self.clusters:
284       result = self._RunCmd(cluster, "gnt-instance list -o status")
285       if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
286         return True
287
288     return False
289
290   def _StopMergingInstances(self):
291     """Stop instances on merging clusters.
292
293     """
294     for cluster in self.clusters:
295       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
296                                      " --force-multiple")
297
298       if result.failed:
299         raise errors.RemoteError("Unable to stop instances on %s."
300                                  " Fail reason: %s; output: %s" %
301                                  (cluster, result.fail_reason, result.output))
302
303   def _DisableWatcher(self):
304     """Disable watch on all merging clusters, including ourself.
305
306     """
307     for cluster in ["localhost"] + self.clusters:
308       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
309                                      self.pause_period)
310
311       if result.failed:
312         raise errors.RemoteError("Unable to pause watcher on %s."
313                                  " Fail reason: %s; output: %s" %
314                                  (cluster, result.fail_reason, result.output))
315
316   def _RemoveMasterIps(self):
317     """Removes the master IPs from the master nodes of each cluster.
318
319     """
320     for data in self.merger_data:
321       result = self._RunCmd(data.master_node,
322                             "gnt-cluster deactivate-master-ip --yes")
323
324       if result.failed:
325         raise errors.RemoteError("Unable to remove master IP on %s."
326                                  " Fail reason: %s; output: %s" %
327                                  (data.master_node,
328                                   result.fail_reason,
329                                   result.output))
330
331   def _StopDaemons(self):
332     """Stop all daemons on merging nodes.
333
334     """
335     cmd = "%s stop-all" % pathutils.DAEMON_UTIL
336     for data in self.merger_data:
337       for node in data.nodes:
338         result = self._RunCmd(node, cmd, max_attempts=3)
339
340         if result.failed:
341           raise errors.RemoteError("Unable to stop daemons on %s."
342                                    " Fail reason: %s; output: %s." %
343                                    (node, result.fail_reason, result.output))
344
345   def _FetchRemoteConfig(self):
346     """Fetches and stores remote cluster config from the master.
347
348     This step is needed before we can merge the config.
349
350     """
351     for data in self.merger_data:
352       result = self._RunCmd(data.cluster, "cat %s" %
353                                           pathutils.CLUSTER_CONF_FILE)
354
355       if result.failed:
356         raise errors.RemoteError("Unable to retrieve remote config on %s."
357                                  " Fail reason: %s; output %s" %
358                                  (data.cluster, result.fail_reason,
359                                   result.output))
360
361       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
362                                         data.cluster)
363       utils.WriteFile(data.config_path, data=result.stdout)
364
365   # R0201: Method could be a function
366   def _KillMasterDaemon(self): # pylint: disable=R0201
367     """Kills the local master daemon.
368
369     @raise errors.CommandError: If unable to kill
370
371     """
372     result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
373     if result.failed:
374       raise errors.CommandError("Unable to stop master daemons."
375                                 " Fail reason: %s; output: %s" %
376                                 (result.fail_reason, result.output))
377
378   def _MergeConfig(self):
379     """Merges all foreign config into our own config.
380
381     """
382     my_config = config.ConfigWriter(offline=True)
383     fake_ec_id = 0 # Needs to be uniq over the whole config merge
384
385     for data in self.merger_data:
386       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
387       self._MergeClusterConfigs(my_config, other_config)
388       self._MergeNodeGroups(my_config, other_config)
389
390       for node in other_config.GetNodeList():
391         node_info = other_config.GetNodeInfo(node)
392         # Offline the node, it will be reonlined later at node readd
393         node_info.master_candidate = False
394         node_info.drained = False
395         node_info.offline = True
396         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
397         fake_ec_id += 1
398
399       for instance in other_config.GetInstanceList():
400         instance_info = other_config.GetInstanceInfo(instance)
401
402         # Update the DRBD port assignments
403         # This is a little bit hackish
404         for dsk in instance_info.disks:
405           if dsk.dev_type in constants.DTS_DRBD:
406             port = my_config.AllocatePort()
407
408             logical_id = list(dsk.logical_id)
409             logical_id[2] = port
410             dsk.logical_id = tuple(logical_id)
411
412         my_config.AddInstance(instance_info,
413                               _CLUSTERMERGE_ECID + str(fake_ec_id))
414         fake_ec_id += 1
415
416   def _MergeClusterConfigs(self, my_config, other_config):
417     """Checks that all relevant cluster parameters are compatible
418
419     """
420     my_cluster = my_config.GetClusterInfo()
421     other_cluster = other_config.GetClusterInfo()
422     err_count = 0
423
424     #
425     # Generic checks
426     #
427     check_params = [
428       "beparams",
429       "default_iallocator",
430       "drbd_usermode_helper",
431       "hidden_os",
432       "maintain_node_health",
433       "master_netdev",
434       "ndparams",
435       "nicparams",
436       "primary_ip_family",
437       "tags",
438       "uid_pool",
439       ]
440     check_params_strict = [
441       "volume_group_name",
442     ]
443     if my_cluster.IsFileStorageEnabled() or \
444         other_cluster.IsFileStorageEnabled():
445       check_params_strict.append("file_storage_dir")
446     if my_cluster.IsSharedFileStorageEnabled() or \
447         other_cluster.IsSharedFileStorageEnabled():
448       check_params_strict.append("shared_file_storage_dir")
449     check_params.extend(check_params_strict)
450
451     params_strict = (self.params == _PARAMS_STRICT)
452
453     for param_name in check_params:
454       my_param = getattr(my_cluster, param_name)
455       other_param = getattr(other_cluster, param_name)
456       if my_param != other_param:
457         logging.error("The value (%s) of the cluster parameter %s on %s"
458                       " differs to this cluster's value (%s)",
459                       other_param, param_name, other_cluster.cluster_name,
460                       my_param)
461         if params_strict or param_name in check_params_strict:
462           err_count += 1
463
464     #
465     # Custom checks
466     #
467
468     # Check default hypervisor
469     my_defhyp = my_cluster.enabled_hypervisors[0]
470     other_defhyp = other_cluster.enabled_hypervisors[0]
471     if my_defhyp != other_defhyp:
472       logging.warning("The default hypervisor (%s) differs on %s, new"
473                       " instances will be created with this cluster's"
474                       " default hypervisor (%s)", other_defhyp,
475                       other_cluster.cluster_name, my_defhyp)
476
477     if (set(my_cluster.enabled_hypervisors) !=
478         set(other_cluster.enabled_hypervisors)):
479       logging.error("The set of enabled hypervisors (%s) on %s differs to"
480                     " this cluster's set (%s)",
481                     other_cluster.enabled_hypervisors,
482                     other_cluster.cluster_name, my_cluster.enabled_hypervisors)
483       err_count += 1
484
485     # Check hypervisor params for hypervisors we care about
486     for hyp in my_cluster.enabled_hypervisors:
487       for param in my_cluster.hvparams[hyp]:
488         my_value = my_cluster.hvparams[hyp][param]
489         other_value = other_cluster.hvparams[hyp][param]
490         if my_value != other_value:
491           logging.error("The value (%s) of the %s parameter of the %s"
492                         " hypervisor on %s differs to this cluster's parameter"
493                         " (%s)",
494                         other_value, param, hyp, other_cluster.cluster_name,
495                         my_value)
496           if params_strict:
497             err_count += 1
498
499     # Check os hypervisor params for hypervisors we care about
500     for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
501       for hyp in my_cluster.enabled_hypervisors:
502         my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
503         other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
504         if my_os_hvp != other_os_hvp:
505           logging.error("The OS parameters (%s) for the %s OS for the %s"
506                         " hypervisor on %s differs to this cluster's parameters"
507                         " (%s)",
508                         other_os_hvp, os_name, hyp, other_cluster.cluster_name,
509                         my_os_hvp)
510           if params_strict:
511             err_count += 1
512
513     #
514     # Warnings
515     #
516     if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
517       logging.warning("The modify_etc_hosts value (%s) differs on %s,"
518                       " this cluster's value (%s) will take precedence",
519                       other_cluster.modify_etc_hosts,
520                       other_cluster.cluster_name,
521                       my_cluster.modify_etc_hosts)
522
523     if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
524       logging.warning("The modify_ssh_setup value (%s) differs on %s,"
525                       " this cluster's value (%s) will take precedence",
526                       other_cluster.modify_ssh_setup,
527                       other_cluster.cluster_name,
528                       my_cluster.modify_ssh_setup)
529
530     #
531     # Actual merging
532     #
533     my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
534                                        other_cluster.reserved_lvs))
535
536     if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
537       logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
538                       " cluster's value (%s). The least permissive value (%s)"
539                       " will be used", other_cluster.prealloc_wipe_disks,
540                       other_cluster.cluster_name,
541                       my_cluster.prealloc_wipe_disks, True)
542       my_cluster.prealloc_wipe_disks = True
543
544     for os_, osparams in other_cluster.osparams.items():
545       if os_ not in my_cluster.osparams:
546         my_cluster.osparams[os_] = osparams
547       elif my_cluster.osparams[os_] != osparams:
548         logging.error("The OS parameters (%s) for the %s OS on %s differs to"
549                       " this cluster's parameters (%s)",
550                       osparams, os_, other_cluster.cluster_name,
551                       my_cluster.osparams[os_])
552         if params_strict:
553           err_count += 1
554
555     if err_count:
556       raise errors.ConfigurationError("Cluster config for %s has incompatible"
557                                       " values, please fix and re-run" %
558                                       other_cluster.cluster_name)
559
560   # R0201: Method could be a function
561   def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
562     if os_name in cluster.os_hvp:
563       return cluster.os_hvp[os_name].get(hyp, None)
564     else:
565       return None
566
567   # R0201: Method could be a function
568   def _MergeNodeGroups(self, my_config, other_config):
569     """Adds foreign node groups
570
571     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
572     """
573     # pylint: disable=R0201
574     logging.info("Node group conflict strategy: %s", self.groups)
575
576     my_grps = my_config.GetAllNodeGroupsInfo().values()
577     other_grps = other_config.GetAllNodeGroupsInfo().values()
578
579     # Check for node group naming conflicts:
580     conflicts = []
581     for other_grp in other_grps:
582       for my_grp in my_grps:
583         if other_grp.name == my_grp.name:
584           conflicts.append(other_grp)
585
586     if conflicts:
587       conflict_names = utils.CommaJoin([g.name for g in conflicts])
588       logging.info("Node groups in both local and remote cluster: %s",
589                    conflict_names)
590
591       # User hasn't specified how to handle conflicts
592       if not self.groups:
593         raise errors.CommandError("The following node group(s) are in both"
594                                   " clusters, and no merge strategy has been"
595                                   " supplied (see the --groups option): %s" %
596                                   conflict_names)
597
598       # User wants to rename conflicts
599       elif self.groups == _GROUPS_RENAME:
600         for grp in conflicts:
601           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
602           logging.info("Renaming remote node group from %s to %s"
603                        " to resolve conflict", grp.name, new_name)
604           grp.name = new_name
605
606       # User wants to merge conflicting groups
607       elif self.groups == _GROUPS_MERGE:
608         for other_grp in conflicts:
609           logging.info("Merging local and remote '%s' groups", other_grp.name)
610           for node_name in other_grp.members[:]:
611             node = other_config.GetNodeInfo(node_name)
612             # Access to a protected member of a client class
613             # pylint: disable=W0212
614             other_config._UnlockedRemoveNodeFromGroup(node)
615
616             # Access to a protected member of a client class
617             # pylint: disable=W0212
618             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
619
620             # Access to a protected member of a client class
621             # pylint: disable=W0212
622             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
623             node.group = my_grp_uuid
624           # Remove from list of groups to add
625           other_grps.remove(other_grp)
626
627     for grp in other_grps:
628       #TODO: handle node group conflicts
629       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
630
631   # R0201: Method could be a function
632   def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
633     """Starts the local master daemon.
634
635     @param no_vote: Should the masterd started without voting? default: False
636     @raise errors.CommandError: If unable to start daemon.
637
638     """
639     env = {}
640     if no_vote:
641       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
642
643     result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
644     if result.failed:
645       raise errors.CommandError("Couldn't start ganeti master."
646                                 " Fail reason: %s; output: %s" %
647                                 (result.fail_reason, result.output))
648
649   def _ReaddMergedNodesAndRedist(self):
650     """Readds all merging nodes and make sure their config is up-to-date.
651
652     @raise errors.CommandError: If anything fails.
653
654     """
655     for data in self.merger_data:
656       for node in data.nodes:
657         logging.info("Readding node %s", node)
658         result = utils.RunCmd(["gnt-node", "add", "--readd",
659                                "--no-ssh-key-check", node])
660         if result.failed:
661           logging.error("%s failed to be readded. Reason: %s, output: %s",
662                          node, result.fail_reason, result.output)
663
664     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
665     if result.failed:
666       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
667                                 " output: %s" % (result.fail_reason,
668                                                  result.output))
669
670   # R0201: Method could be a function
671   def _StartupAllInstances(self): # pylint: disable=R0201
672     """Starts up all instances (locally).
673
674     @raise errors.CommandError: If unable to start clusters
675
676     """
677     result = utils.RunCmd(["gnt-instance", "startup", "--all",
678                            "--force-multiple"])
679     if result.failed:
680       raise errors.CommandError("Unable to start all instances."
681                                 " Fail reason: %s; output: %s" %
682                                 (result.fail_reason, result.output))
683
684   # R0201: Method could be a function
685   # TODO: make this overridable, for some verify errors
686   def _VerifyCluster(self): # pylint: disable=R0201
687     """Runs gnt-cluster verify to verify the health.
688
689     @raise errors.ProgrammError: If cluster fails on verification
690
691     """
692     result = utils.RunCmd(["gnt-cluster", "verify"])
693     if result.failed:
694       raise errors.CommandError("Verification of cluster failed."
695                                 " Fail reason: %s; output: %s" %
696                                 (result.fail_reason, result.output))
697
698   def Merge(self):
699     """Does the actual merge.
700
701     It runs all the steps in the right order and updates the user about steps
702     taken. Also it keeps track of rollback_steps to undo everything.
703
704     """
705     rbsteps = []
706     try:
707       logging.info("Pre cluster verification")
708       self._VerifyCluster()
709
710       logging.info("Prepare authorized_keys")
711       rbsteps.append("Remove our key from authorized_keys on nodes:"
712                      " %(nodes)s")
713       self._PrepareAuthorizedKeys()
714
715       rbsteps.append("Start all instances again on the merging"
716                      " clusters: %(clusters)s")
717       if self.stop_instances:
718         logging.info("Stopping merging instances (takes a while)")
719         self._StopMergingInstances()
720       logging.info("Checking that no instances are running on the mergees")
721       instances_running = self._CheckRunningInstances()
722       if instances_running:
723         raise errors.CommandError("Some instances are still running on the"
724                                   " mergees")
725       logging.info("Disable watcher")
726       self._DisableWatcher()
727       logging.info("Merging config")
728       self._FetchRemoteConfig()
729       logging.info("Removing master IPs on mergee master nodes")
730       self._RemoveMasterIps()
731       logging.info("Stop daemons on merging nodes")
732       self._StopDaemons()
733
734       logging.info("Stopping master daemon")
735       self._KillMasterDaemon()
736
737       rbsteps.append("Restore %s from another master candidate"
738                      " and restart master daemon" %
739                      pathutils.CLUSTER_CONF_FILE)
740       self._MergeConfig()
741       self._StartMasterDaemon(no_vote=True)
742
743       # Point of no return, delete rbsteps
744       del rbsteps[:]
745
746       logging.warning("We are at the point of no return. Merge can not easily"
747                       " be undone after this point.")
748       logging.info("Readd nodes")
749       self._ReaddMergedNodesAndRedist()
750
751       logging.info("Merge done, restart master daemon normally")
752       self._KillMasterDaemon()
753       self._StartMasterDaemon()
754
755       if self.restart == _RESTART_ALL:
756         logging.info("Starting instances again")
757         self._StartupAllInstances()
758       else:
759         logging.info("Not starting instances again")
760       logging.info("Post cluster verification")
761       self._VerifyCluster()
762     except errors.GenericError, e:
763       logging.exception(e)
764
765       if rbsteps:
766         nodes = Flatten([data.nodes for data in self.merger_data])
767         info = {
768           "clusters": self.clusters,
769           "nodes": nodes,
770           }
771         logging.critical("In order to rollback do the following:")
772         for step in rbsteps:
773           logging.critical("  * %s", step % info)
774       else:
775         logging.critical("Nothing to rollback.")
776
777       # TODO: Keep track of steps done for a flawless resume?
778
779   def Cleanup(self):
780     """Clean up our environment.
781
782     This cleans up remote private keys and configs and after that
783     deletes the temporary directory.
784
785     """
786     shutil.rmtree(self.work_dir)
787
788
789 def main():
790   """Main routine.
791
792   """
793   program = os.path.basename(sys.argv[0])
794
795   parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
796                                  prog=program)
797   parser.add_option(cli.DEBUG_OPT)
798   parser.add_option(cli.VERBOSE_OPT)
799   parser.add_option(PAUSE_PERIOD_OPT)
800   parser.add_option(GROUPS_OPT)
801   parser.add_option(RESTART_OPT)
802   parser.add_option(PARAMS_OPT)
803   parser.add_option(SKIP_STOP_INSTANCES_OPT)
804
805   (options, args) = parser.parse_args()
806
807   utils.SetupToolLogging(
808       options.debug, options.verbose,
809       toolname=os.path.splitext(os.path.basename(__file__))[0])
810
811   if not args:
812     parser.error("No clusters specified")
813
814   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
815                           options.groups, options.restart, options.params,
816                           options.stop_instances)
817   try:
818     try:
819       cluster_merger.Setup()
820       cluster_merger.Merge()
821     except errors.GenericError, e:
822       logging.exception(e)
823       return constants.EXIT_FAILURE
824   finally:
825     cluster_merger.Cleanup()
826
827   return constants.EXIT_SUCCESS
828
829
830 if __name__ == "__main__":
831   sys.exit(main())