e43bd5c23a7774610541760905ff2bdfc403cbd3
[ganeti-github.git] / tools / move-instance
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Tool to move instances from one cluster to another.
31
32 """
33
34 # pylint: disable=C0103
35 # C0103: Invalid name move-instance
36
37 import os
38 import sys
39 import time
40 import logging
41 import optparse
42 import random
43 import threading
44
45 from ganeti import cli
46 from ganeti import constants
47 from ganeti import utils
48 from ganeti import workerpool
49 from ganeti import objects
50 from ganeti import compat
51 from ganeti import rapi
52 from ganeti import errors
53
54 from ganeti.rapi.client import UsesRapiClient
55
56
57 SRC_RAPI_PORT_OPT = \
58   cli.cli_option("--src-rapi-port", action="store", type="int",
59                  dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
60                  help=("Source cluster RAPI port (defaults to %s)" %
61                        constants.DEFAULT_RAPI_PORT))
62
63 SRC_CA_FILE_OPT = \
64   cli.cli_option("--src-ca-file", action="store", type="string",
65                  dest="src_ca_file",
66                  help=("File containing source cluster Certificate"
67                        " Authority (CA) in PEM format"))
68
69 SRC_USERNAME_OPT = \
70   cli.cli_option("--src-username", action="store", type="string",
71                  dest="src_username", default=None,
72                  help="Source cluster username")
73
74 SRC_PASSWORD_FILE_OPT = \
75   cli.cli_option("--src-password-file", action="store", type="string",
76                  dest="src_password_file",
77                  help="File containing source cluster password")
78
79 DEST_RAPI_PORT_OPT = \
80   cli.cli_option("--dest-rapi-port", action="store", type="int",
81                  dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
82                  help=("Destination cluster RAPI port (defaults to source"
83                        " cluster RAPI port)"))
84
85 DEST_CA_FILE_OPT = \
86   cli.cli_option("--dest-ca-file", action="store", type="string",
87                  dest="dest_ca_file",
88                  help=("File containing destination cluster Certificate"
89                        " Authority (CA) in PEM format (defaults to source"
90                        " cluster CA)"))
91
92 DEST_USERNAME_OPT = \
93   cli.cli_option("--dest-username", action="store", type="string",
94                  dest="dest_username", default=None,
95                  help=("Destination cluster username (defaults to"
96                        " source cluster username)"))
97
98 DEST_PASSWORD_FILE_OPT = \
99   cli.cli_option("--dest-password-file", action="store", type="string",
100                  dest="dest_password_file",
101                  help=("File containing destination cluster password"
102                        " (defaults to source cluster password)"))
103
104 DEST_INSTANCE_NAME_OPT = \
105   cli.cli_option("--dest-instance-name", action="store", type="string",
106                  dest="dest_instance_name",
107                  help=("Instance name on destination cluster (only"
108                        " when moving exactly one instance)"))
109
110 DEST_PRIMARY_NODE_OPT = \
111   cli.cli_option("--dest-primary-node", action="store", type="string",
112                  dest="dest_primary_node",
113                  help=("Primary node on destination cluster (only"
114                        " when moving exactly one instance)"))
115
116 DEST_SECONDARY_NODE_OPT = \
117   cli.cli_option("--dest-secondary-node", action="store", type="string",
118                  dest="dest_secondary_node",
119                  help=("Secondary node on destination cluster (only"
120                        " when moving exactly one instance)"))
121
122 DEST_DISK_TEMPLATE_OPT = \
123   cli.cli_option("--dest-disk-template", action="store", type="string",
124                  dest="dest_disk_template", default=None,
125                  help="Disk template to use on destination cluster")
126
127 COMPRESS_OPT = \
128   cli.cli_option("--compress", action="store", type="string",
129                  dest="compress", default="none",
130                  help="Compression mode to use during the move (this mode has"
131                       " to be supported by both clusters)")
132
133 PARALLEL_OPT = \
134   cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
135                  dest="parallel", metavar="<number>",
136                  help="Number of instances to be moved simultaneously")
137
138 OPPORTUNISTIC_TRIES_OPT = \
139   cli.cli_option("--opportunistic-tries", action="store", type="int",
140                  dest="opportunistic_tries", metavar="<number>",
141                  help="Number of opportunistic instance creation attempts"
142                       " before a normal creation is performed. An opportunistic"
143                       " attempt will use the iallocator with all the nodes"
144                       " currently unlocked, failing if not enough nodes are"
145                       " available. Even though it will succeed (or fail) more"
146                       " quickly, it can result in suboptimal instance"
147                       " placement")
148
149 OPPORTUNISTIC_DELAY_OPT = \
150   cli.cli_option("--opportunistic-delay", action="store", type="int",
151                  dest="opportunistic_delay", metavar="<number>",
152                  help="The delay between successive opportunistic instance"
153                       " creation attempts, in seconds")
154
155
156 class Error(Exception):
157   """Generic error.
158
159   """
160
161
162 class Abort(Error):
163   """Special exception for aborting import/export.
164
165   """
166
167
168 class RapiClientFactory(object):
169   """Factory class for creating RAPI clients.
170
171   @ivar src_cluster_name: Source cluster name
172   @ivar dest_cluster_name: Destination cluster name
173   @ivar GetSourceClient: Callable returning new client for source cluster
174   @ivar GetDestClient: Callable returning new client for destination cluster
175
176   """
177   def __init__(self, options, src_cluster_name, dest_cluster_name):
178     """Initializes this class.
179
180     @param options: Program options
181     @type src_cluster_name: string
182     @param src_cluster_name: Source cluster name
183     @type dest_cluster_name: string
184     @param dest_cluster_name: Destination cluster name
185
186     """
187     self.src_cluster_name = src_cluster_name
188     self.dest_cluster_name = dest_cluster_name
189
190     # TODO: Implement timeouts for RAPI connections
191     # TODO: Support for using system default paths for verifying SSL certificate
192     logging.debug("Using '%s' as source CA", options.src_ca_file)
193     src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
194
195     if options.dest_ca_file:
196       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
197       dest_curl_config = \
198         rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
199     else:
200       logging.debug("Using source CA for destination")
201       dest_curl_config = src_curl_config
202
203     logging.debug("Source RAPI server is %s:%s",
204                   src_cluster_name, options.src_rapi_port)
205     logging.debug("Source username is '%s'", options.src_username)
206
207     if options.src_username is None:
208       src_username = ""
209     else:
210       src_username = options.src_username
211
212     if options.src_password_file:
213       logging.debug("Reading '%s' for source password",
214                     options.src_password_file)
215       src_password = utils.ReadOneLineFile(options.src_password_file,
216                                            strict=True)
217     else:
218       logging.debug("Source has no password")
219       src_password = None
220
221     self.GetSourceClient = lambda: \
222       rapi.client.GanetiRapiClient(src_cluster_name,
223                                    port=options.src_rapi_port,
224                                    curl_config_fn=src_curl_config,
225                                    username=src_username,
226                                    password=src_password)
227
228     if options.dest_rapi_port:
229       dest_rapi_port = options.dest_rapi_port
230     else:
231       dest_rapi_port = options.src_rapi_port
232
233     if options.dest_username is None:
234       dest_username = src_username
235     else:
236       dest_username = options.dest_username
237
238     logging.debug("Destination RAPI server is %s:%s",
239                   dest_cluster_name, dest_rapi_port)
240     logging.debug("Destination username is '%s'", dest_username)
241
242     if options.dest_password_file:
243       logging.debug("Reading '%s' for destination password",
244                     options.dest_password_file)
245       dest_password = utils.ReadOneLineFile(options.dest_password_file,
246                                             strict=True)
247     else:
248       logging.debug("Using source password for destination")
249       dest_password = src_password
250
251     self.GetDestClient = lambda: \
252       rapi.client.GanetiRapiClient(dest_cluster_name,
253                                    port=dest_rapi_port,
254                                    curl_config_fn=dest_curl_config,
255                                    username=dest_username,
256                                    password=dest_password)
257
258
259 class MoveJobPollReportCb(cli.JobPollReportCbBase):
260   def __init__(self, abort_check_fn, remote_import_fn):
261     """Initializes this class.
262
263     @type abort_check_fn: callable
264     @param abort_check_fn: Function to check whether move is aborted
265     @type remote_import_fn: callable or None
266     @param remote_import_fn: Callback for reporting received remote import
267                              information
268
269     """
270     cli.JobPollReportCbBase.__init__(self)
271     self._abort_check_fn = abort_check_fn
272     self._remote_import_fn = remote_import_fn
273
274   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
275     """Handles a log message.
276
277     """
278     if log_type == constants.ELOG_REMOTE_IMPORT:
279       logging.debug("Received remote import information")
280
281       if not self._remote_import_fn:
282         raise RuntimeError("Received unexpected remote import information")
283
284       assert "x509_ca" in log_msg
285       assert "disks" in log_msg
286
287       self._remote_import_fn(log_msg)
288
289       return
290
291     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
292                  cli.FormatLogMessage(log_type, log_msg))
293
294   def ReportNotChanged(self, job_id, status):
295     """Called if a job hasn't changed in a while.
296
297     """
298     try:
299       # Check whether we were told to abort by the other thread
300       self._abort_check_fn()
301     except Abort:
302       logging.warning("Aborting despite job %s still running", job_id)
303       raise
304
305
306 class InstanceMove(object):
307   """Status class for instance moves.
308
309   """
310   def __init__(self, src_instance_name, dest_instance_name,
311                dest_pnode, dest_snode, compress, dest_iallocator,
312                dest_disk_template, hvparams,
313                beparams, osparams, nics, opportunistic_tries,
314                opportunistic_delay):
315     """Initializes this class.
316
317     @type src_instance_name: string
318     @param src_instance_name: Instance name on source cluster
319     @type dest_instance_name: string
320     @param dest_instance_name: Instance name on destination cluster
321     @type dest_pnode: string or None
322     @param dest_pnode: Name of primary node on destination cluster
323     @type dest_snode: string or None
324     @param dest_snode: Name of secondary node on destination cluster
325     @type compress; string
326     @param compress: Compression mode to use (has to be supported on both
327                      clusters)
328     @type dest_iallocator: string or None
329     @param dest_iallocator: Name of iallocator to use
330     @type dest_disk_template: string or None
331     @param dest_disk_template: Disk template to use instead of the original one
332     @type hvparams: dict or None
333     @param hvparams: Hypervisor parameters to override
334     @type beparams: dict or None
335     @param beparams: Backend parameters to override
336     @type osparams: dict or None
337     @param osparams: OS parameters to override
338     @type nics: dict or None
339     @param nics: NICs to override
340     @type opportunistic_tries: int or None
341     @param opportunistic_tries: Number of opportunistic creation attempts to
342                                 perform
343     @type opportunistic_delay: int or None
344     @param opportunistic_delay: Delay between successive creation attempts, in
345                                 seconds
346
347     """
348     self.src_instance_name = src_instance_name
349     self.dest_instance_name = dest_instance_name
350     self.dest_pnode = dest_pnode
351     self.dest_snode = dest_snode
352     self.compress = compress
353     self.dest_iallocator = dest_iallocator
354     self.dest_disk_template = dest_disk_template
355     self.hvparams = hvparams
356     self.beparams = beparams
357     self.osparams = osparams
358     self.nics = nics
359
360     if opportunistic_tries is not None:
361       self.opportunistic_tries = opportunistic_tries
362     else:
363       self.opportunistic_tries = 0
364
365     if opportunistic_delay is not None:
366       self.opportunistic_delay = opportunistic_delay
367     else:
368       self.opportunistic_delay = constants.DEFAULT_OPPORTUNISTIC_RETRY_INTERVAL
369
370     self.error_message = None
371
372
373 class MoveRuntime(object):
374   """Class to keep track of instance move.
375
376   """
377   def __init__(self, move):
378     """Initializes this class.
379
380     @type move: L{InstanceMove}
381
382     """
383     self.move = move
384
385     # Thread synchronization
386     self.lock = threading.Lock()
387     self.source_to_dest = threading.Condition(self.lock)
388     self.dest_to_source = threading.Condition(self.lock)
389
390     # Source information
391     self.src_error_message = None
392     self.src_expinfo = None
393     self.src_instinfo = None
394
395     # Destination information
396     self.dest_error_message = None
397     self.dest_impinfo = None
398
399   def HandleErrors(self, prefix, fn, *args):
400     """Wrapper to catch errors and abort threads.
401
402     @type prefix: string
403     @param prefix: Variable name prefix ("src" or "dest")
404     @type fn: callable
405     @param fn: Function
406
407     """
408     assert prefix in ("dest", "src")
409
410     try:
411       # Call inner function
412       fn(*args)
413
414       errmsg = None
415     except Abort:
416       errmsg = "Aborted"
417     except Exception, err:  # pylint: disable=W0703
418       logging.exception("Caught unhandled exception")
419       errmsg = str(err)
420
421     setattr(self, "%s_error_message" % prefix, errmsg)
422
423     self.lock.acquire()
424     try:
425       self.source_to_dest.notifyAll()
426       self.dest_to_source.notifyAll()
427     finally:
428       self.lock.release()
429
430   def CheckAbort(self):
431     """Check whether thread should be aborted.
432
433     @raise Abort: When thread should be aborted
434
435     """
436     if not (self.src_error_message is None and
437             self.dest_error_message is None):
438       logging.info("Aborting")
439       raise Abort()
440
441   def Wait(self, cond, check_fn):
442     """Waits for a condition to become true.
443
444     @type cond: threading.Condition
445     @param cond: Threading condition
446     @type check_fn: callable
447     @param check_fn: Function to check whether condition is true
448
449     """
450     cond.acquire()
451     try:
452       while check_fn(self):
453         self.CheckAbort()
454         cond.wait()
455     finally:
456       cond.release()
457
458   def PollJob(self, cl, job_id, remote_import_fn=None):
459     """Wrapper for polling a job.
460
461     @type cl: L{rapi.client.GanetiRapiClient}
462     @param cl: RAPI client
463     @type job_id: string
464     @param job_id: Job ID
465     @type remote_import_fn: callable or None
466     @param remote_import_fn: Callback for reporting received remote import
467                              information
468
469     @return: opreturn of the move job
470     @raise errors.JobLost: If job can't be found
471     @raise errors.OpExecError: If job didn't succeed
472
473     @see: L{ganeti.rapi.client_utils.PollJob}
474
475     """
476     return rapi.client_utils.PollJob(cl, job_id,
477                                      MoveJobPollReportCb(self.CheckAbort,
478                                                          remote_import_fn))
479
480
481 class MoveDestExecutor(object):
482   def __init__(self, dest_client, mrt):
483     """Destination side of an instance move.
484
485     @type dest_client: L{rapi.client.GanetiRapiClient}
486     @param dest_client: RAPI client
487     @type mrt: L{MoveRuntime}
488     @param mrt: Instance move runtime information
489
490     """
491     logging.debug("Waiting for instance information to become available")
492     mrt.Wait(mrt.source_to_dest,
493              lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
494
495     logging.info("Creating instance %s in remote-import mode",
496                  mrt.move.dest_instance_name)
497
498     # Depending on whether opportunistic tries are enabled, we may have to
499     # make multiple creation attempts
500     creation_attempts = [True] * mrt.move.opportunistic_tries
501
502     # But the last one is never opportunistic, and will block until completion
503     # or failure
504     creation_attempts.append(False)
505
506     # Initiate the RNG for the variations
507     random.seed()
508
509     for is_attempt_opportunistic in creation_attempts:
510       job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
511                                     mrt.move.dest_pnode, mrt.move.dest_snode,
512                                     mrt.move.compress,
513                                     mrt.move.dest_iallocator,
514                                     mrt.move.dest_disk_template,
515                                     mrt.src_instinfo, mrt.src_expinfo,
516                                     mrt.move.hvparams, mrt.move.beparams,
517                                     mrt.move.osparams, mrt.move.nics,
518                                     is_attempt_opportunistic
519                                     )
520
521       try:
522         # The completion of this block signifies that the import has been
523         # completed successfullly
524         mrt.PollJob(dest_client, job_id,
525                     remote_import_fn=compat.partial(self._SetImportInfo, mrt))
526         logging.info("Import successful")
527         return
528       except errors.OpPrereqError, err:
529         # Any exception in the non-opportunistic creation is to be passed on,
530         # as well as exceptions apart from resources temporarily unavailable
531         if not is_attempt_opportunistic or \
532            err.args[1] != rapi.client.ECODE_TEMP_NORES:
533           raise
534
535       delay_to_use = MoveDestExecutor._VaryDelay(mrt)
536       logging.info("Opportunistic attempt unsuccessful, waiting %.2f seconds"
537                    " before another creation attempt is made",
538                    delay_to_use)
539       time.sleep(delay_to_use)
540
541   @staticmethod
542   def _VaryDelay(mrt):
543     """ Varies the opportunistic delay by a small amount.
544
545     """
546     MAX_VARIATION = 0.15
547     variation_factor = (1.0 + random.uniform(-MAX_VARIATION, MAX_VARIATION))
548     return mrt.move.opportunistic_delay * variation_factor
549
550   @staticmethod
551   def _SetImportInfo(mrt, impinfo):
552     """Sets the remote import information and notifies source thread.
553
554     @type mrt: L{MoveRuntime}
555     @param mrt: Instance move runtime information
556     @param impinfo: Remote import information
557
558     """
559     mrt.dest_to_source.acquire()
560     try:
561       mrt.dest_impinfo = impinfo
562       mrt.dest_to_source.notifyAll()
563     finally:
564       mrt.dest_to_source.release()
565
566   @staticmethod
567   def _GetDisks(instance):
568     disks = []
569     for idisk in instance["disks"]:
570       odisk = {
571         constants.IDISK_SIZE: idisk["size"],
572         constants.IDISK_MODE: idisk["mode"],
573         constants.IDISK_NAME: str(idisk.get("name")),
574         }
575       spindles = idisk.get("spindles")
576       if spindles is not None:
577         odisk[constants.IDISK_SPINDLES] = spindles
578       disks.append(odisk)
579     return disks
580
581   @staticmethod
582   def _GetNics(instance, override_nics):
583     try:
584       nics = [{
585         constants.INIC_IP: ip,
586         constants.INIC_MAC: mac,
587         constants.INIC_MODE: mode,
588         constants.INIC_LINK: link,
589         constants.INIC_VLAN: vlan,
590         constants.INIC_NETWORK: network,
591         constants.INIC_NAME: nic_name
592         } for nic_name, _, ip, mac, mode, link, vlan, network, _
593           in instance["nics"]]
594     except ValueError:
595       raise Error("Received NIC information does not match expected format; "
596                   "Do the versions of this tool and the source cluster match?")
597
598     if len(override_nics) > len(nics):
599       raise Error("Can not create new NICs")
600
601     if override_nics:
602       assert len(override_nics) <= len(nics)
603       for idx, (nic, override) in enumerate(zip(nics, override_nics)):
604         nics[idx] = objects.FillDict(nic, override)
605
606     return nics
607
608   @staticmethod
609   def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
610                       dest_disk_template, instance, expinfo, override_hvparams,
611                       override_beparams, override_osparams, override_nics,
612                       is_attempt_opportunistic):
613     """Starts the instance creation in remote import mode.
614
615     @type cl: L{rapi.client.GanetiRapiClient}
616     @param cl: RAPI client
617     @type name: string
618     @param name: Instance name
619     @type pnode: string or None
620     @param pnode: Name of primary node on destination cluster
621     @type snode: string or None
622     @param snode: Name of secondary node on destination cluster
623     @type compress: string
624     @param compress: Compression mode to use
625     @type iallocator: string or None
626     @param iallocator: Name of iallocator to use
627     @type dest_disk_template: string or None
628     @param dest_disk_template: Disk template to use instead of the original one
629     @type instance: dict
630     @param instance: Instance details from source cluster
631     @type expinfo: dict
632     @param expinfo: Prepared export information from source cluster
633     @type override_hvparams: dict or None
634     @param override_hvparams: Hypervisor parameters to override
635     @type override_beparams: dict or None
636     @param override_beparams: Backend parameters to override
637     @type override_osparams: dict or None
638     @param override_osparams: OS parameters to override
639     @type override_nics: dict or None
640     @param override_nics: NICs to override
641     @type is_attempt_opportunistic: bool
642     @param is_attempt_opportunistic: Whether to use opportunistic locking or not
643     @return: Job ID
644
645     """
646     if dest_disk_template:
647       disk_template = dest_disk_template
648     else:
649       disk_template = instance["disk_template"]
650
651     disks = MoveDestExecutor._GetDisks(instance)
652     nics = MoveDestExecutor._GetNics(instance, override_nics)
653     os_type = instance.get("os", None)
654
655     # TODO: Should this be the actual up/down status? (run_state)
656     start = (instance["config_state"] == "up")
657
658     assert len(disks) == len(instance["disks"])
659     assert len(nics) == len(instance["nics"])
660
661     inst_beparams = instance.get("be_instance", {})
662     inst_hvparams = instance.get("hv_instance", {})
663     inst_osparams = instance.get("os_instance", {})
664
665     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
666                              name, disk_template, disks, nics,
667                              os=os_type,
668                              pnode=pnode,
669                              snode=snode,
670                              start=start,
671                              ip_check=False,
672                              iallocator=iallocator,
673                              hypervisor=instance["hypervisor"],
674                              source_handshake=expinfo["handshake"],
675                              source_x509_ca=expinfo["x509_ca"],
676                              compress=compress,
677                              source_instance_name=instance["name"],
678                              beparams=objects.FillDict(inst_beparams,
679                                                        override_beparams),
680                              hvparams=objects.FillDict(inst_hvparams,
681                                                        override_hvparams),
682                              osparams=objects.FillDict(inst_osparams,
683                                                        override_osparams),
684                              opportunistic_locking=is_attempt_opportunistic
685                              )
686
687
688 class MoveSourceExecutor(object):
689   def __init__(self, src_client, mrt):
690     """Source side of an instance move.
691
692     @type src_client: L{rapi.client.GanetiRapiClient}
693     @param src_client: RAPI client
694     @type mrt: L{MoveRuntime}
695     @param mrt: Instance move runtime information
696
697     """
698     logging.info("Checking whether instance exists")
699     self._CheckInstance(src_client, mrt.move.src_instance_name)
700
701     logging.info("Retrieving instance information from source cluster")
702     instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
703                                      mrt.move.src_instance_name)
704
705     logging.info("Preparing export on source cluster")
706     expinfo = self._PrepareExport(src_client, mrt.PollJob,
707                                   mrt.move.src_instance_name)
708     assert "handshake" in expinfo
709     assert "x509_key_name" in expinfo
710     assert "x509_ca" in expinfo
711
712     # Hand information to destination thread
713     mrt.source_to_dest.acquire()
714     try:
715       mrt.src_instinfo = instinfo
716       mrt.src_expinfo = expinfo
717       mrt.source_to_dest.notifyAll()
718     finally:
719       mrt.source_to_dest.release()
720
721     logging.info("Waiting for destination information to become available")
722     mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
723
724     logging.info("Starting remote export on source cluster")
725     self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
726                          expinfo["x509_key_name"], mrt.move.compress,
727                          mrt.dest_impinfo)
728
729     logging.info("Export successful")
730
731   @staticmethod
732   def _CheckInstance(cl, name):
733     """Checks whether the instance exists on the source cluster.
734
735     @type cl: L{rapi.client.GanetiRapiClient}
736     @param cl: RAPI client
737     @type name: string
738     @param name: Instance name
739
740     """
741     try:
742       cl.GetInstance(name)
743     except rapi.client.GanetiApiError, err:
744       if err.code == rapi.client.HTTP_NOT_FOUND:
745         raise Error("Instance %s not found (%s)" % (name, str(err)))
746       raise
747
748   @staticmethod
749   def _GetInstanceInfo(cl, poll_job_fn, name):
750     """Retrieves detailed instance information from source cluster.
751
752     @type cl: L{rapi.client.GanetiRapiClient}
753     @param cl: RAPI client
754     @type poll_job_fn: callable
755     @param poll_job_fn: Function to poll for job result
756     @type name: string
757     @param name: Instance name
758
759     """
760     job_id = cl.GetInstanceInfo(name, static=True)
761     result = poll_job_fn(cl, job_id)
762     assert len(result[0].keys()) == 1
763     return result[0][result[0].keys()[0]]
764
765   @staticmethod
766   def _PrepareExport(cl, poll_job_fn, name):
767     """Prepares export on source cluster.
768
769     @type cl: L{rapi.client.GanetiRapiClient}
770     @param cl: RAPI client
771     @type poll_job_fn: callable
772     @param poll_job_fn: Function to poll for job result
773     @type name: string
774     @param name: Instance name
775
776     """
777     job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
778     return poll_job_fn(cl, job_id)[0]
779
780   @staticmethod
781   def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
782     """Exports instance from source cluster.
783
784     @type cl: L{rapi.client.GanetiRapiClient}
785     @param cl: RAPI client
786     @type poll_job_fn: callable
787     @param poll_job_fn: Function to poll for job result
788     @type name: string
789     @param name: Instance name
790     @param x509_key_name: Source X509 key
791     @type compress: string
792     @param compress: Compression mode to use
793     @param impinfo: Import information from destination cluster
794
795     """
796     job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
797                                impinfo["disks"], shutdown=True,
798                                remove_instance=True,
799                                x509_key_name=x509_key_name,
800                                destination_x509_ca=impinfo["x509_ca"],
801                                compress=compress)
802     (fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
803
804     if not (fin_resu and compat.all(dresults)):
805       raise Error("Export failed for disks %s" %
806                   utils.CommaJoin(str(idx) for idx, result
807                                   in enumerate(dresults) if not result))
808
809
810 class MoveSourceWorker(workerpool.BaseWorker):
811   def RunTask(self, rapi_factory, move): # pylint: disable=W0221
812     """Executes an instance move.
813
814     @type rapi_factory: L{RapiClientFactory}
815     @param rapi_factory: RAPI client factory
816     @type move: L{InstanceMove}
817     @param move: Instance move information
818
819     """
820     try:
821       logging.info("Preparing to move %s from cluster %s to %s as %s",
822                    move.src_instance_name, rapi_factory.src_cluster_name,
823                    rapi_factory.dest_cluster_name, move.dest_instance_name)
824
825       mrt = MoveRuntime(move)
826
827       logging.debug("Starting destination thread")
828       dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
829                                      target=mrt.HandleErrors,
830                                      args=("dest", MoveDestExecutor,
831                                            rapi_factory.GetDestClient(),
832                                            mrt, ))
833       dest_thread.start()
834       try:
835         mrt.HandleErrors("src", MoveSourceExecutor,
836                          rapi_factory.GetSourceClient(), mrt)
837       finally:
838         dest_thread.join()
839
840       if mrt.src_error_message or mrt.dest_error_message:
841         move.error_message = ("Source error: %s, destination error: %s" %
842                               (mrt.src_error_message, mrt.dest_error_message))
843       else:
844         move.error_message = None
845     except Exception, err: # pylint: disable=W0703
846       logging.exception("Caught unhandled exception")
847       move.error_message = str(err)
848
849
850 def CheckRapiSetup(rapi_factory):
851   """Checks the RAPI setup by retrieving the version.
852
853   @type rapi_factory: L{RapiClientFactory}
854   @param rapi_factory: RAPI client factory
855
856   """
857   src_client = rapi_factory.GetSourceClient()
858   logging.info("Connecting to source RAPI server")
859   logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
860
861   dest_client = rapi_factory.GetDestClient()
862   logging.info("Connecting to destination RAPI server")
863   logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
864
865
866 def ParseOptions():
867   """Parses options passed to program.
868
869   """
870   program = os.path.basename(sys.argv[0])
871
872   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
873                                         " <source-cluster> <dest-cluster>"
874                                         " <instance...>"),
875                                  prog=program)
876   parser.add_option(cli.DEBUG_OPT)
877   parser.add_option(cli.VERBOSE_OPT)
878   parser.add_option(cli.IALLOCATOR_OPT)
879   parser.add_option(cli.BACKEND_OPT)
880   parser.add_option(cli.HVOPTS_OPT)
881   parser.add_option(cli.OSPARAMS_OPT)
882   parser.add_option(cli.NET_OPT)
883   parser.add_option(SRC_RAPI_PORT_OPT)
884   parser.add_option(SRC_CA_FILE_OPT)
885   parser.add_option(SRC_USERNAME_OPT)
886   parser.add_option(SRC_PASSWORD_FILE_OPT)
887   parser.add_option(DEST_RAPI_PORT_OPT)
888   parser.add_option(DEST_CA_FILE_OPT)
889   parser.add_option(DEST_USERNAME_OPT)
890   parser.add_option(DEST_PASSWORD_FILE_OPT)
891   parser.add_option(DEST_INSTANCE_NAME_OPT)
892   parser.add_option(DEST_PRIMARY_NODE_OPT)
893   parser.add_option(DEST_SECONDARY_NODE_OPT)
894   parser.add_option(DEST_DISK_TEMPLATE_OPT)
895   parser.add_option(COMPRESS_OPT)
896   parser.add_option(PARALLEL_OPT)
897   parser.add_option(OPPORTUNISTIC_TRIES_OPT)
898   parser.add_option(OPPORTUNISTIC_DELAY_OPT)
899
900   (options, args) = parser.parse_args()
901
902   return (parser, options, args)
903
904
905 def _CheckAllocatorOptions(parser, options):
906   if (bool(options.iallocator) and
907       bool(options.dest_primary_node or options.dest_secondary_node)):
908     parser.error("Destination node and iallocator options exclude each other")
909
910   if not options.iallocator and (options.opportunistic_tries > 0):
911     parser.error("Opportunistic instance creation can only be used with an"
912                  " iallocator")
913
914
915 def _CheckOpportunisticLockingOptions(parser, options):
916   tries_specified = options.opportunistic_tries is not None
917   delay_specified = options.opportunistic_delay is not None
918   if tries_specified:
919     if options.opportunistic_tries < 0:
920       parser.error("Number of opportunistic creation attempts must be >= 0")
921     if delay_specified:
922       if options.opportunistic_delay <= 0:
923         parser.error("The delay between two successive creation attempts must"
924                      " be greater than zero")
925   elif delay_specified:
926     parser.error("Opportunistic delay can only be specified when opportunistic"
927                  " tries are used")
928   else:
929     # The default values will be provided later
930     pass
931
932
933 def _CheckInstanceOptions(parser, options, instance_names):
934   if len(instance_names) == 1:
935     # Moving one instance only
936     if options.hvparams:
937       utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
938
939     if options.beparams:
940       utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
941
942     if options.nics:
943       options.nics = cli.ParseNicOption(options.nics)
944   else:
945     # Moving more than one instance
946     if compat.any(options.dest_instance_name, options.dest_primary_node,
947                   options.dest_secondary_node, options.hvparams,
948                   options.beparams, options.osparams, options.nics):
949       parser.error("The options --dest-instance-name, --dest-primary-node,"
950                    " --dest-secondary-node, --hypervisor-parameters,"
951                    " --backend-parameters, --os-parameters and --net can"
952                    " only be used when moving exactly one instance")
953
954
955 def CheckOptions(parser, options, args):
956   """Checks options and arguments for validity.
957
958   """
959   if len(args) < 3:
960     parser.error("Not enough arguments")
961
962   src_cluster_name = args.pop(0)
963   dest_cluster_name = args.pop(0)
964   instance_names = args
965
966   assert len(instance_names) > 0
967
968   # TODO: Remove once using system default paths for SSL certificate
969   # verification is implemented
970   if not options.src_ca_file:
971     parser.error("Missing source cluster CA file")
972
973   if options.parallel < 1:
974     parser.error("Number of simultaneous moves must be >= 1")
975
976   _CheckAllocatorOptions(parser, options)
977   _CheckOpportunisticLockingOptions(parser, options)
978   _CheckInstanceOptions(parser, options, instance_names)
979
980   return (src_cluster_name, dest_cluster_name, instance_names)
981
982
983 def DestClusterHasDefaultIAllocator(rapi_factory):
984   """Determines if a given cluster has a default iallocator.
985
986   """
987   result = rapi_factory.GetDestClient().GetInfo()
988   ia_name = "default_iallocator"
989   return ia_name in result and result[ia_name]
990
991
992 def ExitWithError(message):
993   """Exits after an error and shows a message.
994
995   """
996   sys.stderr.write("move-instance: error: " + message + "\n")
997   sys.exit(constants.EXIT_FAILURE)
998
999
1000 def _PrepareListOfInstanceMoves(options, instance_names):
1001   moves = []
1002   for src_instance_name in instance_names:
1003     if options.dest_instance_name:
1004       assert len(instance_names) == 1
1005       # Rename instance
1006       dest_instance_name = options.dest_instance_name
1007     else:
1008       dest_instance_name = src_instance_name
1009
1010     moves.append(InstanceMove(src_instance_name, dest_instance_name,
1011                               options.dest_primary_node,
1012                               options.dest_secondary_node,
1013                               options.compress,
1014                               options.iallocator,
1015                               options.dest_disk_template,
1016                               options.hvparams,
1017                               options.beparams,
1018                               options.osparams,
1019                               options.nics,
1020                               options.opportunistic_tries,
1021                               options.opportunistic_delay))
1022
1023   assert len(moves) == len(instance_names)
1024   return moves
1025
1026
1027 @UsesRapiClient
1028 def main():
1029   """Main routine.
1030
1031   """
1032   (parser, options, args) = ParseOptions()
1033
1034   utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
1035
1036   (src_cluster_name, dest_cluster_name, instance_names) = \
1037     CheckOptions(parser, options, args)
1038
1039   logging.info("Source cluster: %s", src_cluster_name)
1040   logging.info("Destination cluster: %s", dest_cluster_name)
1041   logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
1042
1043   rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
1044
1045   CheckRapiSetup(rapi_factory)
1046
1047   has_iallocator = options.iallocator or \
1048                    DestClusterHasDefaultIAllocator(rapi_factory)
1049
1050   if len(instance_names) > 1 and not has_iallocator:
1051     ExitWithError("When moving multiple nodes, an iallocator must be used. "
1052                   "None was provided and the target cluster does not have "
1053                   "a default iallocator.")
1054   if (len(instance_names) == 1 and not (has_iallocator or
1055       options.dest_primary_node or options.dest_secondary_node)):
1056     ExitWithError("Target cluster does not have a default iallocator, "
1057                   "please specify either destination nodes or an iallocator.")
1058
1059   moves = _PrepareListOfInstanceMoves(options, instance_names)
1060
1061   # Start workerpool
1062   wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
1063   try:
1064     # Add instance moves to workerpool
1065     for move in moves:
1066       wp.AddTask((rapi_factory, move))
1067
1068     # Wait for all moves to finish
1069     wp.Quiesce()
1070
1071   finally:
1072     wp.TerminateWorkers()
1073
1074   # There should be no threads running at this point, hence not using locks
1075   # anymore
1076
1077   logging.info("Instance move results:")
1078
1079   for move in moves:
1080     if move.dest_instance_name == move.src_instance_name:
1081       name = move.src_instance_name
1082     else:
1083       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
1084
1085     if move.error_message:
1086       msg = "Failed (%s)" % move.error_message
1087     else:
1088       msg = "Success"
1089
1090     logging.info("%s: %s", name, msg)
1091
1092   if compat.any(move.error_message for move in moves):
1093     sys.exit(constants.EXIT_FAILURE)
1094
1095   sys.exit(constants.EXIT_SUCCESS)
1096
1097
1098 if __name__ == "__main__":
1099   main()