cmdlib: Extract storage related functionality
[ganeti-github.git] / lib / cmdlib / backup.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with backup operations."""
23
24 import OpenSSL
25 import logging
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import locking
31 from ganeti import masterd
32 from ganeti import qlang
33 from ganeti import query
34 from ganeti import utils
35
36 from ganeti.cmdlib.base import _QueryBase, NoHooksLU, LogicalUnit
37 from ganeti.cmdlib.common import _GetWantedNodes, _ShareAll, \
38 _CheckNodeOnline, _ExpandNodeName
39 from ganeti.cmdlib.instance_storage import _StartInstanceDisks, \
40 _ShutdownInstanceDisks
41 from ganeti.cmdlib.instance_utils import _GetClusterDomainSecret, \
42 _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _RemoveInstance
43
44
45 class _ExportQuery(_QueryBase):
46 FIELDS = query.EXPORT_FIELDS
47
48 #: The node name is not a unique key for this query
49 SORT_FIELD = "node"
50
51 def ExpandNames(self, lu):
52 lu.needed_locks = {}
53
54 # The following variables interact with _QueryBase._GetNames
55 if self.names:
56 self.wanted = _GetWantedNodes(lu, self.names)
57 else:
58 self.wanted = locking.ALL_SET
59
60 self.do_locking = self.use_locking
61
62 if self.do_locking:
63 lu.share_locks = _ShareAll()
64 lu.needed_locks = {
65 locking.LEVEL_NODE: self.wanted,
66 }
67
68 if not self.names:
69 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70
71 def DeclareLocks(self, lu, level):
72 pass
73
74 def _GetQueryData(self, lu):
75 """Computes the list of nodes and their attributes.
76
77 """
78 # Locking is not used
79 # TODO
80 assert not (compat.any(lu.glm.is_owned(level)
81 for level in locking.LEVELS
82 if level != locking.LEVEL_CLUSTER) or
83 self.do_locking or self.use_locking)
84
85 nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86
87 result = []
88
89 for (node, nres) in lu.rpc.call_export_list(nodes).items():
90 if nres.fail_msg:
91 result.append((node, None))
92 else:
93 result.extend((node, expname) for expname in nres.payload)
94
95 return result
96
97
98 class LUBackupQuery(NoHooksLU):
99 """Query the exports list
100
101 """
102 REQ_BGL = False
103
104 def CheckArguments(self):
105 self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
106 ["node", "export"], self.op.use_locking)
107
108 def ExpandNames(self):
109 self.expq.ExpandNames(self)
110
111 def DeclareLocks(self, level):
112 self.expq.DeclareLocks(self, level)
113
114 def Exec(self, feedback_fn):
115 result = {}
116
117 for (node, expname) in self.expq.OldStyleQuery(self):
118 if expname is None:
119 result[node] = False
120 else:
121 result.setdefault(node, []).append(expname)
122
123 return result
124
125
126 class LUBackupPrepare(NoHooksLU):
127 """Prepares an instance for an export and returns useful information.
128
129 """
130 REQ_BGL = False
131
132 def ExpandNames(self):
133 self._ExpandAndLockInstance()
134
135 def CheckPrereq(self):
136 """Check prerequisites.
137
138 """
139 instance_name = self.op.instance_name
140
141 self.instance = self.cfg.GetInstanceInfo(instance_name)
142 assert self.instance is not None, \
143 "Cannot retrieve locked instance %s" % self.op.instance_name
144 _CheckNodeOnline(self, self.instance.primary_node)
145
146 self._cds = _GetClusterDomainSecret()
147
148 def Exec(self, feedback_fn):
149 """Prepares an instance for an export.
150
151 """
152 instance = self.instance
153
154 if self.op.mode == constants.EXPORT_MODE_REMOTE:
155 salt = utils.GenerateSecret(8)
156
157 feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
158 result = self.rpc.call_x509_cert_create(instance.primary_node,
159 constants.RIE_CERT_VALIDITY)
160 result.Raise("Can't create X509 key and certificate on %s" % result.node)
161
162 (name, cert_pem) = result.payload
163
164 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
165 cert_pem)
166
167 return {
168 "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
169 "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
170 salt),
171 "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
172 }
173
174 return None
175
176
177 class LUBackupExport(LogicalUnit):
178 """Export an instance to an image in the cluster.
179
180 """
181 HPATH = "instance-export"
182 HTYPE = constants.HTYPE_INSTANCE
183 REQ_BGL = False
184
185 def CheckArguments(self):
186 """Check the arguments.
187
188 """
189 self.x509_key_name = self.op.x509_key_name
190 self.dest_x509_ca_pem = self.op.destination_x509_ca
191
192 if self.op.mode == constants.EXPORT_MODE_REMOTE:
193 if not self.x509_key_name:
194 raise errors.OpPrereqError("Missing X509 key name for encryption",
195 errors.ECODE_INVAL)
196
197 if not self.dest_x509_ca_pem:
198 raise errors.OpPrereqError("Missing destination X509 CA",
199 errors.ECODE_INVAL)
200
201 def ExpandNames(self):
202 self._ExpandAndLockInstance()
203
204 # Lock all nodes for local exports
205 if self.op.mode == constants.EXPORT_MODE_LOCAL:
206 # FIXME: lock only instance primary and destination node
207 #
208 # Sad but true, for now we have do lock all nodes, as we don't know where
209 # the previous export might be, and in this LU we search for it and
210 # remove it from its current node. In the future we could fix this by:
211 # - making a tasklet to search (share-lock all), then create the
212 # new one, then one to remove, after
213 # - removing the removal operation altogether
214 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
215
216 # Allocations should be stopped while this LU runs with node locks, but
217 # it doesn't have to be exclusive
218 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
219 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
220
221 def DeclareLocks(self, level):
222 """Last minute lock declaration."""
223 # All nodes are locked anyway, so nothing to do here.
224
225 def BuildHooksEnv(self):
226 """Build hooks env.
227
228 This will run on the master, primary node and target node.
229
230 """
231 env = {
232 "EXPORT_MODE": self.op.mode,
233 "EXPORT_NODE": self.op.target_node,
234 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
235 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
236 # TODO: Generic function for boolean env variables
237 "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
238 }
239
240 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
241
242 return env
243
244 def BuildHooksNodes(self):
245 """Build hooks nodes.
246
247 """
248 nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
249
250 if self.op.mode == constants.EXPORT_MODE_LOCAL:
251 nl.append(self.op.target_node)
252
253 return (nl, nl)
254
255 def CheckPrereq(self):
256 """Check prerequisites.
257
258 This checks that the instance and node names are valid.
259
260 """
261 instance_name = self.op.instance_name
262
263 self.instance = self.cfg.GetInstanceInfo(instance_name)
264 assert self.instance is not None, \
265 "Cannot retrieve locked instance %s" % self.op.instance_name
266 _CheckNodeOnline(self, self.instance.primary_node)
267
268 if (self.op.remove_instance and
269 self.instance.admin_state == constants.ADMINST_UP and
270 not self.op.shutdown):
271 raise errors.OpPrereqError("Can not remove instance without shutting it"
272 " down before", errors.ECODE_STATE)
273
274 if self.op.mode == constants.EXPORT_MODE_LOCAL:
275 self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
276 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
277 assert self.dst_node is not None
278
279 _CheckNodeOnline(self, self.dst_node.name)
280 _CheckNodeNotDrained(self, self.dst_node.name)
281
282 self._cds = None
283 self.dest_disk_info = None
284 self.dest_x509_ca = None
285
286 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
287 self.dst_node = None
288
289 if len(self.op.target_node) != len(self.instance.disks):
290 raise errors.OpPrereqError(("Received destination information for %s"
291 " disks, but instance %s has %s disks") %
292 (len(self.op.target_node), instance_name,
293 len(self.instance.disks)),
294 errors.ECODE_INVAL)
295
296 cds = _GetClusterDomainSecret()
297
298 # Check X509 key name
299 try:
300 (key_name, hmac_digest, hmac_salt) = self.x509_key_name
301 except (TypeError, ValueError), err:
302 raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
303 errors.ECODE_INVAL)
304
305 if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
306 raise errors.OpPrereqError("HMAC for X509 key name is wrong",
307 errors.ECODE_INVAL)
308
309 # Load and verify CA
310 try:
311 (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
312 except OpenSSL.crypto.Error, err:
313 raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
314 (err, ), errors.ECODE_INVAL)
315
316 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
317 if errcode is not None:
318 raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
319 (msg, ), errors.ECODE_INVAL)
320
321 self.dest_x509_ca = cert
322
323 # Verify target information
324 disk_info = []
325 for idx, disk_data in enumerate(self.op.target_node):
326 try:
327 (host, port, magic) = \
328 masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
329 except errors.GenericError, err:
330 raise errors.OpPrereqError("Target info for disk %s: %s" %
331 (idx, err), errors.ECODE_INVAL)
332
333 disk_info.append((host, port, magic))
334
335 assert len(disk_info) == len(self.op.target_node)
336 self.dest_disk_info = disk_info
337
338 else:
339 raise errors.ProgrammerError("Unhandled export mode %r" %
340 self.op.mode)
341
342 # instance disk type verification
343 # TODO: Implement export support for file-based disks
344 for disk in self.instance.disks:
345 if disk.dev_type == constants.LD_FILE:
346 raise errors.OpPrereqError("Export not supported for instances with"
347 " file-based disks", errors.ECODE_INVAL)
348
349 def _CleanupExports(self, feedback_fn):
350 """Removes exports of current instance from all other nodes.
351
352 If an instance in a cluster with nodes A..D was exported to node C, its
353 exports will be removed from the nodes A, B and D.
354
355 """
356 assert self.op.mode != constants.EXPORT_MODE_REMOTE
357
358 nodelist = self.cfg.GetNodeList()
359 nodelist.remove(self.dst_node.name)
360
361 # on one-node clusters nodelist will be empty after the removal
362 # if we proceed the backup would be removed because OpBackupQuery
363 # substitutes an empty list with the full cluster node list.
364 iname = self.instance.name
365 if nodelist:
366 feedback_fn("Removing old exports for instance %s" % iname)
367 exportlist = self.rpc.call_export_list(nodelist)
368 for node in exportlist:
369 if exportlist[node].fail_msg:
370 continue
371 if iname in exportlist[node].payload:
372 msg = self.rpc.call_export_remove(node, iname).fail_msg
373 if msg:
374 self.LogWarning("Could not remove older export for instance %s"
375 " on node %s: %s", iname, node, msg)
376
377 def Exec(self, feedback_fn):
378 """Export an instance to an image in the cluster.
379
380 """
381 assert self.op.mode in constants.EXPORT_MODES
382
383 instance = self.instance
384 src_node = instance.primary_node
385
386 if self.op.shutdown:
387 # shutdown the instance, but not the disks
388 feedback_fn("Shutting down instance %s" % instance.name)
389 result = self.rpc.call_instance_shutdown(src_node, instance,
390 self.op.shutdown_timeout,
391 self.op.reason)
392 # TODO: Maybe ignore failures if ignore_remove_failures is set
393 result.Raise("Could not shutdown instance %s on"
394 " node %s" % (instance.name, src_node))
395
396 # set the disks ID correctly since call_instance_start needs the
397 # correct drbd minor to create the symlinks
398 for disk in instance.disks:
399 self.cfg.SetDiskID(disk, src_node)
400
401 activate_disks = (instance.admin_state != constants.ADMINST_UP)
402
403 if activate_disks:
404 # Activate the instance disks if we'exporting a stopped instance
405 feedback_fn("Activating disks for %s" % instance.name)
406 _StartInstanceDisks(self, instance, None)
407
408 try:
409 helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
410 instance)
411
412 helper.CreateSnapshots()
413 try:
414 if (self.op.shutdown and
415 instance.admin_state == constants.ADMINST_UP and
416 not self.op.remove_instance):
417 assert not activate_disks
418 feedback_fn("Starting instance %s" % instance.name)
419 result = self.rpc.call_instance_start(src_node,
420 (instance, None, None), False,
421 self.op.reason)
422 msg = result.fail_msg
423 if msg:
424 feedback_fn("Failed to start instance: %s" % msg)
425 _ShutdownInstanceDisks(self, instance)
426 raise errors.OpExecError("Could not start instance: %s" % msg)
427
428 if self.op.mode == constants.EXPORT_MODE_LOCAL:
429 (fin_resu, dresults) = helper.LocalExport(self.dst_node)
430 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
431 connect_timeout = constants.RIE_CONNECT_TIMEOUT
432 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
433
434 (key_name, _, _) = self.x509_key_name
435
436 dest_ca_pem = \
437 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
438 self.dest_x509_ca)
439
440 (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
441 key_name, dest_ca_pem,
442 timeouts)
443 finally:
444 helper.Cleanup()
445
446 # Check for backwards compatibility
447 assert len(dresults) == len(instance.disks)
448 assert compat.all(isinstance(i, bool) for i in dresults), \
449 "Not all results are boolean: %r" % dresults
450
451 finally:
452 if activate_disks:
453 feedback_fn("Deactivating disks for %s" % instance.name)
454 _ShutdownInstanceDisks(self, instance)
455
456 if not (compat.all(dresults) and fin_resu):
457 failures = []
458 if not fin_resu:
459 failures.append("export finalization")
460 if not compat.all(dresults):
461 fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
462 if not dsk)
463 failures.append("disk export: disk(s) %s" % fdsk)
464
465 raise errors.OpExecError("Export failed, errors in %s" %
466 utils.CommaJoin(failures))
467
468 # At this point, the export was successful, we can cleanup/finish
469
470 # Remove instance if requested
471 if self.op.remove_instance:
472 feedback_fn("Removing instance %s" % instance.name)
473 _RemoveInstance(self, feedback_fn, instance,
474 self.op.ignore_remove_failures)
475
476 if self.op.mode == constants.EXPORT_MODE_LOCAL:
477 self._CleanupExports(feedback_fn)
478
479 return fin_resu, dresults
480
481
482 class LUBackupRemove(NoHooksLU):
483 """Remove exports related to the named instance.
484
485 """
486 REQ_BGL = False
487
488 def ExpandNames(self):
489 self.needed_locks = {
490 # We need all nodes to be locked in order for RemoveExport to work, but
491 # we don't need to lock the instance itself, as nothing will happen to it
492 # (and we can remove exports also for a removed instance)
493 locking.LEVEL_NODE: locking.ALL_SET,
494
495 # Removing backups is quick, so blocking allocations is justified
496 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
497 }
498
499 # Allocations should be stopped while this LU runs with node locks, but it
500 # doesn't have to be exclusive
501 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
502
503 def Exec(self, feedback_fn):
504 """Remove any export.
505
506 """
507 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
508 # If the instance was not found we'll try with the name that was passed in.
509 # This will only work if it was an FQDN, though.
510 fqdn_warn = False
511 if not instance_name:
512 fqdn_warn = True
513 instance_name = self.op.instance_name
514
515 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
516 exportlist = self.rpc.call_export_list(locked_nodes)
517 found = False
518 for node in exportlist:
519 msg = exportlist[node].fail_msg
520 if msg:
521 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
522 continue
523 if instance_name in exportlist[node].payload:
524 found = True
525 result = self.rpc.call_export_remove(node, instance_name)
526 msg = result.fail_msg
527 if msg:
528 logging.error("Could not remove export for instance %s"
529 " on node %s: %s", instance_name, node, msg)
530
531 if fqdn_warn and not found:
532 feedback_fn("Export not found. If trying to remove an export belonging"
533 " to a deleted instance please use its Fully Qualified"
534 " Domain Name.")