Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Ganeti node daemon"""
32
33 # pylint: disable=C0103
34
35 # C0103: Functions in this module need to have a given name structure,
36 # and the name of the daemon doesn't match
37
38 import os
39 import sys
40 import logging
41 import signal
42 import codecs
43
44 from optparse import OptionParser
45
46 from ganeti import backend
47 from ganeti import constants
48 from ganeti import objects
49 from ganeti import errors
50 from ganeti import jstore
51 from ganeti import daemon
52 from ganeti import http
53 from ganeti import utils
54 from ganeti.storage import container
55 from ganeti import serializer
56 from ganeti import netutils
57 from ganeti import pathutils
58 from ganeti import ssconf
59
60 import ganeti.http.server # pylint: disable=W0611
61
62
63 queue_lock = None
64
65
66 def _extendReasonTrail(trail, source, reason=""):
67 """Extend the reason trail with noded information
68
69 The trail is extended by appending the name of the noded functionality
70 """
71 assert trail is not None
72 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
73 trail.append((trail_source, reason, utils.EpochNano()))
74
75
76 def _PrepareQueueLock():
77 """Try to prepare the queue lock.
78
79 @return: None for success, otherwise an exception object
80
81 """
82 global queue_lock # pylint: disable=W0603
83
84 if queue_lock is not None:
85 return None
86
87 # Prepare job queue
88 try:
89 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
90 return None
91 except EnvironmentError, err:
92 return err
93
94
95 def _RequireJobQueueLock(fn):
96 """Decorator for job queue manipulating functions.
97
98 """
99 QUEUE_LOCK_TIMEOUT = 10
100
101 def wrapper(*args, **kwargs):
102 # Locking in exclusive, blocking mode because there could be several
103 # children running at the same time. Waiting up to 10 seconds.
104 if _PrepareQueueLock() is not None:
105 raise errors.JobQueueError("Job queue failed initialization,"
106 " cannot update jobs")
107 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
108 try:
109 return fn(*args, **kwargs)
110 finally:
111 queue_lock.Unlock()
112
113 return wrapper
114
115
116 def _DecodeImportExportIO(ieio, ieioargs):
117 """Decodes import/export I/O information.
118
119 """
120 if ieio == constants.IEIO_RAW_DISK:
121 assert len(ieioargs) == 1
122 return (objects.Disk.FromDict(ieioargs[0]), )
123
124 if ieio == constants.IEIO_SCRIPT:
125 assert len(ieioargs) == 2
126 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
127
128 return ieioargs
129
130
131 def _DefaultAlternative(value, default):
132 """Returns value or, if evaluating to False, a default value.
133
134 Returns the given value, unless it evaluates to False. In the latter case the
135 default value is returned.
136
137 @param value: Value to return if it doesn't evaluate to False
138 @param default: Default value
139 @return: Given value or the default
140
141 """
142 if value:
143 return value
144
145 return default
146
147
148 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
149 """Subclass ensuring request handlers are locked in RAM.
150
151 """
152 def __init__(self, *args, **kwargs):
153 utils.Mlockall()
154
155 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
156
157
158 class NodeRequestHandler(http.server.HttpServerHandler):
159 """The server implementation.
160
161 This class holds all methods exposed over the RPC interface.
162
163 """
164 # too many public methods, and unused args - all methods get params
165 # due to the API
166 # pylint: disable=R0904,W0613
167 def __init__(self):
168 http.server.HttpServerHandler.__init__(self)
169 self.noded_pid = os.getpid()
170
171 def HandleRequest(self, req):
172 """Handle a request.
173
174 """
175
176 if req.request_method.upper() != http.HTTP_POST:
177 raise http.HttpBadRequest("Only the POST method is supported")
178
179 path = req.request_path
180 if path.startswith("/"):
181 path = path[1:]
182
183 method = getattr(self, "perspective_%s" % path, None)
184 if method is None:
185 raise http.HttpNotFound()
186
187 try:
188 result = (True, method(serializer.LoadJson(req.request_body)))
189
190 except backend.RPCFail, err:
191 # our custom failure exception; str(err) works fine if the
192 # exception was constructed with a single argument, and in
193 # this case, err.message == err.args[0] == str(err)
194 result = (False, str(err))
195 except errors.QuitGanetiException, err:
196 # Tell parent to quit
197 logging.info("Shutting down the node daemon, arguments: %s",
198 str(err.args))
199 os.kill(self.noded_pid, signal.SIGTERM)
200 # And return the error's arguments, which must be already in
201 # correct tuple format
202 result = err.args
203 except Exception, err: # pylint: disable=W0703
204 logging.exception("Error in RPC call")
205 result = (False, "Error while executing backend function: %s" % str(err))
206
207 return serializer.DumpJson(result)
208
209 # the new block devices --------------------------
210
211 @staticmethod
212 def perspective_blockdev_create(params):
213 """Create a block device.
214
215 """
216 (bdev_s, size, owner, on_primary, info, excl_stor) = params
217 bdev = objects.Disk.FromDict(bdev_s)
218 if bdev is None:
219 raise ValueError("can't unserialize data!")
220 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
221 excl_stor)
222
223 @staticmethod
224 def perspective_blockdev_convert(params):
225 """Copy data from source block device to target.
226
227 """
228 disk_src, disk_dest = params
229 bdev_src = objects.Disk.FromDict(disk_src)
230 bdev_dest = objects.Disk.FromDict(disk_dest)
231 return backend.BlockdevConvert(bdev_src, bdev_dest)
232
233 @staticmethod
234 def perspective_blockdev_pause_resume_sync(params):
235 """Pause/resume sync of a block device.
236
237 """
238 disks_s, pause = params
239 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
240 return backend.BlockdevPauseResumeSync(disks, pause)
241
242 @staticmethod
243 def perspective_blockdev_image(params):
244 """Image a block device.
245
246 """
247 bdev_s, image, size = params
248 bdev = objects.Disk.FromDict(bdev_s)
249 return backend.BlockdevImage(bdev, image, size)
250
251 @staticmethod
252 def perspective_blockdev_wipe(params):
253 """Wipe a block device.
254
255 """
256 bdev_s, offset, size = params
257 bdev = objects.Disk.FromDict(bdev_s)
258 return backend.BlockdevWipe(bdev, offset, size)
259
260 @staticmethod
261 def perspective_blockdev_remove(params):
262 """Remove a block device.
263
264 """
265 bdev_s = params[0]
266 bdev = objects.Disk.FromDict(bdev_s)
267 return backend.BlockdevRemove(bdev)
268
269 @staticmethod
270 def perspective_blockdev_rename(params):
271 """Remove a block device.
272
273 """
274 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
275 return backend.BlockdevRename(devlist)
276
277 @staticmethod
278 def perspective_blockdev_assemble(params):
279 """Assemble a block device.
280
281 """
282 bdev_s, idict, on_primary, idx = params
283 bdev = objects.Disk.FromDict(bdev_s)
284 instance = objects.Instance.FromDict(idict)
285 if bdev is None:
286 raise ValueError("can't unserialize data!")
287 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
288
289 @staticmethod
290 def perspective_blockdev_shutdown(params):
291 """Shutdown a block device.
292
293 """
294 bdev_s = params[0]
295 bdev = objects.Disk.FromDict(bdev_s)
296 if bdev is None:
297 raise ValueError("can't unserialize data!")
298 return backend.BlockdevShutdown(bdev)
299
300 @staticmethod
301 def perspective_blockdev_addchildren(params):
302 """Add a child to a mirror device.
303
304 Note: this is only valid for mirror devices. It's the caller's duty
305 to send a correct disk, otherwise we raise an error.
306
307 """
308 bdev_s, ndev_s = params
309 bdev = objects.Disk.FromDict(bdev_s)
310 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
311 if bdev is None or ndevs.count(None) > 0:
312 raise ValueError("can't unserialize data!")
313 return backend.BlockdevAddchildren(bdev, ndevs)
314
315 @staticmethod
316 def perspective_blockdev_removechildren(params):
317 """Remove a child from a mirror device.
318
319 This is only valid for mirror devices, of course. It's the callers
320 duty to send a correct disk, otherwise we raise an error.
321
322 """
323 bdev_s, ndev_s = params
324 bdev = objects.Disk.FromDict(bdev_s)
325 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
326 if bdev is None or ndevs.count(None) > 0:
327 raise ValueError("can't unserialize data!")
328 return backend.BlockdevRemovechildren(bdev, ndevs)
329
330 @staticmethod
331 def perspective_blockdev_getmirrorstatus(params):
332 """Return the mirror status for a list of disks.
333
334 """
335 disks = [objects.Disk.FromDict(dsk_s)
336 for dsk_s in params[0]]
337 return [status.ToDict()
338 for status in backend.BlockdevGetmirrorstatus(disks)]
339
340 @staticmethod
341 def perspective_blockdev_getmirrorstatus_multi(params):
342 """Return the mirror status for a list of disks.
343
344 """
345 (node_disks, ) = params
346
347 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
348
349 result = []
350
351 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
352 if success:
353 result.append((success, status.ToDict()))
354 else:
355 result.append((success, status))
356
357 return result
358
359 @staticmethod
360 def perspective_blockdev_find(params):
361 """Expose the FindBlockDevice functionality for a disk.
362
363 This will try to find but not activate a disk.
364
365 """
366 disk = objects.Disk.FromDict(params[0])
367
368 result = backend.BlockdevFind(disk)
369 if result is None:
370 return None
371
372 return result.ToDict()
373
374 @staticmethod
375 def perspective_blockdev_snapshot(params):
376 """Create a snapshot device.
377
378 Note that this is only valid for LVM and ExtStorage disks, if we get passed
379 something else we raise an exception. The snapshot device can be
380 remove by calling the generic block device remove call.
381
382 """
383 (disk, snap_name, snap_size) = params
384 cfbd = objects.Disk.FromDict(disk)
385 return backend.BlockdevSnapshot(cfbd, snap_name, snap_size)
386
387 @staticmethod
388 def perspective_blockdev_grow(params):
389 """Grow a stack of devices.
390
391 """
392 if len(params) < 5:
393 raise ValueError("Received only %s parameters in blockdev_grow,"
394 " old master?" % len(params))
395 cfbd = objects.Disk.FromDict(params[0])
396 amount = params[1]
397 dryrun = params[2]
398 backingstore = params[3]
399 excl_stor = params[4]
400 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
401
402 @staticmethod
403 def perspective_blockdev_close(params):
404 """Closes the given block devices.
405
406 """
407 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
408 return backend.BlockdevClose(params[0], disks)
409
410 @staticmethod
411 def perspective_blockdev_open(params):
412 """Opens the given block devices.
413
414 """
415 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
416 exclusive = params[2]
417 return backend.BlockdevOpen(params[0], disks, exclusive)
418
419 @staticmethod
420 def perspective_blockdev_getdimensions(params):
421 """Compute the sizes of the given block devices.
422
423 """
424 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
425 return backend.BlockdevGetdimensions(disks)
426
427 @staticmethod
428 def perspective_blockdev_setinfo(params):
429 """Sets metadata information on the given block device.
430
431 """
432 (disk, info) = params
433 disk = objects.Disk.FromDict(disk)
434 return backend.BlockdevSetInfo(disk, info)
435
436 # blockdev/drbd specific methods ----------
437
438 @staticmethod
439 def perspective_drbd_disconnect_net(params):
440 """Disconnects the network connection of drbd disks.
441
442 Note that this is only valid for drbd disks, so the members of the
443 disk list must all be drbd devices.
444
445 """
446 (disks,) = params
447 disks = [objects.Disk.FromDict(disk) for disk in disks]
448 return backend.DrbdDisconnectNet(disks)
449
450 @staticmethod
451 def perspective_drbd_attach_net(params):
452 """Attaches the network connection of drbd disks.
453
454 Note that this is only valid for drbd disks, so the members of the
455 disk list must all be drbd devices.
456
457 """
458 disks, multimaster = params
459 disks = [objects.Disk.FromDict(disk) for disk in disks]
460 return backend.DrbdAttachNet(disks, multimaster)
461
462 @staticmethod
463 def perspective_drbd_wait_sync(params):
464 """Wait until DRBD disks are synched.
465
466 Note that this is only valid for drbd disks, so the members of the
467 disk list must all be drbd devices.
468
469 """
470 (disks,) = params
471 disks = [objects.Disk.FromDict(disk) for disk in disks]
472 return backend.DrbdWaitSync(disks)
473
474 @staticmethod
475 def perspective_drbd_needs_activation(params):
476 """Checks if the drbd devices need activation
477
478 Note that this is only valid for drbd disks, so the members of the
479 disk list must all be drbd devices.
480
481 """
482 (disks,) = params
483 disks = [objects.Disk.FromDict(disk) for disk in disks]
484 return backend.DrbdNeedsActivation(disks)
485
486 @staticmethod
487 def perspective_drbd_helper(_):
488 """Query drbd helper.
489
490 """
491 return backend.GetDrbdUsermodeHelper()
492
493 # export/import --------------------------
494
495 @staticmethod
496 def perspective_finalize_export(params):
497 """Expose the finalize export functionality.
498
499 """
500 instance = objects.Instance.FromDict(params[0])
501
502 snap_disks = []
503 for disk in params[1]:
504 if isinstance(disk, bool):
505 snap_disks.append(disk)
506 else:
507 snap_disks.append(objects.Disk.FromDict(disk))
508
509 return backend.FinalizeExport(instance, snap_disks)
510
511 @staticmethod
512 def perspective_export_info(params):
513 """Query information about an existing export on this node.
514
515 The given path may not contain an export, in which case we return
516 None.
517
518 """
519 path = params[0]
520 return backend.ExportInfo(path)
521
522 @staticmethod
523 def perspective_export_list(params):
524 """List the available exports on this node.
525
526 Note that as opposed to export_info, which may query data about an
527 export in any path, this only queries the standard Ganeti path
528 (pathutils.EXPORT_DIR).
529
530 """
531 return backend.ListExports()
532
533 @staticmethod
534 def perspective_export_remove(params):
535 """Remove an export.
536
537 """
538 export = params[0]
539 return backend.RemoveExport(export)
540
541 # block device ---------------------
542 @staticmethod
543 def perspective_bdev_sizes(params):
544 """Query the list of block devices
545
546 """
547 devices = params[0]
548 return backend.GetBlockDevSizes(devices)
549
550 # volume --------------------------
551
552 @staticmethod
553 def perspective_lv_list(params):
554 """Query the list of logical volumes in a given volume group.
555
556 """
557 vgname = params[0]
558 return backend.GetVolumeList(vgname)
559
560 @staticmethod
561 def perspective_vg_list(params):
562 """Query the list of volume groups.
563
564 """
565 return backend.ListVolumeGroups()
566
567 # Storage --------------------------
568
569 @staticmethod
570 def perspective_storage_list(params):
571 """Get list of storage units.
572
573 """
574 (su_name, su_args, name, fields) = params
575 return container.GetStorage(su_name, *su_args).List(name, fields)
576
577 @staticmethod
578 def perspective_storage_modify(params):
579 """Modify a storage unit.
580
581 """
582 (su_name, su_args, name, changes) = params
583 return container.GetStorage(su_name, *su_args).Modify(name, changes)
584
585 @staticmethod
586 def perspective_storage_execute(params):
587 """Execute an operation on a storage unit.
588
589 """
590 (su_name, su_args, name, op) = params
591 return container.GetStorage(su_name, *su_args).Execute(name, op)
592
593 # bridge --------------------------
594
595 @staticmethod
596 def perspective_bridges_exist(params):
597 """Check if all bridges given exist on this node.
598
599 """
600 bridges_list = params[0]
601 return backend.BridgesExist(bridges_list)
602
603 # instance --------------------------
604
605 @staticmethod
606 def perspective_instance_os_add(params):
607 """Install an OS on a given instance.
608
609 """
610 inst_s = params[0]
611 inst = objects.Instance.FromDict(inst_s)
612 reinstall = params[1]
613 debug = params[2]
614 return backend.InstanceOsAdd(inst, reinstall, debug)
615
616 @staticmethod
617 def perspective_instance_run_rename(params):
618 """Runs the OS rename script for an instance.
619
620 """
621 inst_s, old_name, debug = params
622 inst = objects.Instance.FromDict(inst_s)
623 return backend.RunRenameInstance(inst, old_name, debug)
624
625 @staticmethod
626 def perspective_instance_shutdown(params):
627 """Shutdown an instance.
628
629 """
630 instance = objects.Instance.FromDict(params[0])
631 timeout = params[1]
632 trail = params[2]
633 _extendReasonTrail(trail, "shutdown")
634 return backend.InstanceShutdown(instance, timeout, trail)
635
636 @staticmethod
637 def perspective_instance_start(params):
638 """Start an instance.
639
640 """
641 (instance_name, startup_paused, trail) = params
642 instance = objects.Instance.FromDict(instance_name)
643 _extendReasonTrail(trail, "start")
644 return backend.StartInstance(instance, startup_paused, trail)
645
646 @staticmethod
647 def perspective_hotplug_device(params):
648 """Hotplugs device to a running instance.
649
650 """
651 (idict, action, dev_type, ddict, extra, seq) = params
652 instance = objects.Instance.FromDict(idict)
653 if dev_type == constants.HOTPLUG_TARGET_DISK:
654 device = objects.Disk.FromDict(ddict)
655 elif dev_type == constants.HOTPLUG_TARGET_NIC:
656 device = objects.NIC.FromDict(ddict)
657 else:
658 assert dev_type in constants.HOTPLUG_ALL_TARGETS
659 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
660
661 @staticmethod
662 def perspective_hotplug_supported(params):
663 """Checks if hotplug is supported.
664
665 """
666 instance = objects.Instance.FromDict(params[0])
667 return backend.HotplugSupported(instance)
668
669 @staticmethod
670 def perspective_instance_metadata_modify(params):
671 """Modify instance metadata.
672
673 """
674 instance = params[0]
675 return backend.ModifyInstanceMetadata(instance)
676
677 @staticmethod
678 def perspective_migration_info(params):
679 """Gather information about an instance to be migrated.
680
681 """
682 instance = objects.Instance.FromDict(params[0])
683 return backend.MigrationInfo(instance)
684
685 @staticmethod
686 def perspective_accept_instance(params):
687 """Prepare the node to accept an instance.
688
689 """
690 instance, info, target = params
691 instance = objects.Instance.FromDict(instance)
692 return backend.AcceptInstance(instance, info, target)
693
694 @staticmethod
695 def perspective_instance_finalize_migration_dst(params):
696 """Finalize the instance migration on the destination node.
697
698 """
699 instance, info, success = params
700 instance = objects.Instance.FromDict(instance)
701 return backend.FinalizeMigrationDst(instance, info, success)
702
703 @staticmethod
704 def perspective_instance_migrate(params):
705 """Migrates an instance.
706
707 """
708 cluster_name, instance, target, live = params
709 instance = objects.Instance.FromDict(instance)
710 return backend.MigrateInstance(cluster_name, instance, target, live)
711
712 @staticmethod
713 def perspective_instance_finalize_migration_src(params):
714 """Finalize the instance migration on the source node.
715
716 """
717 instance, success, live = params
718 instance = objects.Instance.FromDict(instance)
719 return backend.FinalizeMigrationSource(instance, success, live)
720
721 @staticmethod
722 def perspective_instance_get_migration_status(params):
723 """Reports migration status.
724
725 """
726 instance = objects.Instance.FromDict(params[0])
727 return backend.GetMigrationStatus(instance).ToDict()
728
729 @staticmethod
730 def perspective_instance_reboot(params):
731 """Reboot an instance.
732
733 """
734 instance = objects.Instance.FromDict(params[0])
735 reboot_type = params[1]
736 shutdown_timeout = params[2]
737 trail = params[3]
738 _extendReasonTrail(trail, "reboot")
739 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
740 trail)
741
742 @staticmethod
743 def perspective_instance_balloon_memory(params):
744 """Modify instance runtime memory.
745
746 """
747 instance_dict, memory = params
748 instance = objects.Instance.FromDict(instance_dict)
749 return backend.InstanceBalloonMemory(instance, memory)
750
751 @staticmethod
752 def perspective_instance_info(params):
753 """Query instance information.
754
755 """
756 (instance_name, hypervisor_name, hvparams) = params
757 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
758
759 @staticmethod
760 def perspective_instance_migratable(params):
761 """Query whether the specified instance can be migrated.
762
763 """
764 instance = objects.Instance.FromDict(params[0])
765 return backend.GetInstanceMigratable(instance)
766
767 @staticmethod
768 def perspective_all_instances_info(params):
769 """Query information about all instances.
770
771 """
772 (hypervisor_list, all_hvparams) = params
773 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
774
775 @staticmethod
776 def perspective_instance_console_info(params):
777 """Query information on how to get console access to instances
778
779 """
780 return backend.GetInstanceConsoleInfo(params)
781
782 @staticmethod
783 def perspective_instance_list(params):
784 """Query the list of running instances.
785
786 """
787 (hypervisor_list, hvparams) = params
788 return backend.GetInstanceList(hypervisor_list, hvparams)
789
790 # node --------------------------
791
792 @staticmethod
793 def perspective_node_has_ip_address(params):
794 """Checks if a node has the given ip address.
795
796 """
797 return netutils.IPAddress.Own(params[0])
798
799 @staticmethod
800 def perspective_node_info(params):
801 """Query node information.
802
803 """
804 (storage_units, hv_specs) = params
805 return backend.GetNodeInfo(storage_units, hv_specs)
806
807 @staticmethod
808 def perspective_etc_hosts_modify(params):
809 """Modify a node entry in /etc/hosts.
810
811 """
812 backend.EtcHostsModify(params[0], params[1], params[2])
813
814 return True
815
816 @staticmethod
817 def perspective_node_verify(params):
818 """Run a verify sequence on this node.
819
820 """
821 (what, cluster_name, hvparams) = params
822 return backend.VerifyNode(what, cluster_name, hvparams)
823
824 @classmethod
825 def perspective_node_verify_light(cls, params):
826 """Run a light verify sequence on this node.
827
828 This call is meant to perform a less strict verification of the node in
829 certain situations. Right now, it is invoked only when a node is just about
830 to be added to a cluster, and even then, it performs the same checks as
831 L{perspective_node_verify}.
832 """
833 return cls.perspective_node_verify(params)
834
835 @staticmethod
836 def perspective_node_start_master_daemons(params):
837 """Start the master daemons on this node.
838
839 """
840 return backend.StartMasterDaemons(params[0])
841
842 @staticmethod
843 def perspective_node_activate_master_ip(params):
844 """Activate the master IP on this node.
845
846 """
847 master_params = objects.MasterNetworkParameters.FromDict(params[0])
848 return backend.ActivateMasterIp(master_params, params[1])
849
850 @staticmethod
851 def perspective_node_deactivate_master_ip(params):
852 """Deactivate the master IP on this node.
853
854 """
855 master_params = objects.MasterNetworkParameters.FromDict(params[0])
856 return backend.DeactivateMasterIp(master_params, params[1])
857
858 @staticmethod
859 def perspective_node_stop_master(params):
860 """Stops master daemons on this node.
861
862 """
863 return backend.StopMasterDaemons()
864
865 @staticmethod
866 def perspective_node_change_master_netmask(params):
867 """Change the master IP netmask.
868
869 """
870 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
871 params[3])
872
873 @staticmethod
874 def perspective_node_leave_cluster(params):
875 """Cleanup after leaving a cluster.
876
877 """
878 return backend.LeaveCluster(params[0])
879
880 @staticmethod
881 def perspective_node_volumes(params):
882 """Query the list of all logical volume groups.
883
884 """
885 return backend.NodeVolumes()
886
887 @staticmethod
888 def perspective_node_demote_from_mc(params):
889 """Demote a node from the master candidate role.
890
891 """
892 return backend.DemoteFromMC()
893
894 @staticmethod
895 def perspective_node_powercycle(params):
896 """Tries to powercycle the node.
897
898 """
899 (hypervisor_type, hvparams) = params
900 return backend.PowercycleNode(hypervisor_type, hvparams)
901
902 @staticmethod
903 def perspective_node_configure_ovs(params):
904 """Sets up OpenvSwitch on the node.
905
906 """
907 (ovs_name, ovs_link) = params
908 return backend.ConfigureOVS(ovs_name, ovs_link)
909
910 @staticmethod
911 def perspective_node_crypto_tokens(params):
912 """Gets the node's public crypto tokens.
913
914 """
915 token_requests = params[0]
916 return backend.GetCryptoTokens(token_requests)
917
918 @staticmethod
919 def perspective_node_ensure_daemon(params):
920 """Ensure daemon is running.
921
922 """
923 (daemon_name, run) = params
924 return backend.EnsureDaemon(daemon_name, run)
925
926 @staticmethod
927 def perspective_node_ssh_key_add(params):
928 """Distributes a new node's SSH key if authorized.
929
930 """
931 (node_uuid, node_name, potential_master_candidates,
932 to_authorized_keys, to_public_keys, get_public_keys,
933 debug, verbose) = params
934 return backend.AddNodeSshKey(node_uuid, node_name,
935 potential_master_candidates,
936 to_authorized_keys=to_authorized_keys,
937 to_public_keys=to_public_keys,
938 get_public_keys=get_public_keys,
939 ssh_update_debug=debug,
940 ssh_update_verbose=verbose)
941
942 @staticmethod
943 def perspective_node_ssh_keys_renew(params):
944 """Generates a new root SSH key pair on the node.
945
946 """
947 (node_uuids, node_names, master_candidate_uuids,
948 potential_master_candidates, old_key_type, new_key_type,
949 new_key_bits, debug, verbose) = params
950 return backend.RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
951 potential_master_candidates, old_key_type,
952 new_key_type, new_key_bits,
953 ssh_update_debug=debug,
954 ssh_update_verbose=verbose)
955
956 @staticmethod
957 def perspective_node_ssh_key_remove(params):
958 """Removes a node's SSH key from the other nodes' SSH files.
959
960 """
961 (node_uuid, node_name,
962 master_candidate_uuids, potential_master_candidates,
963 from_authorized_keys, from_public_keys, clear_authorized_keys,
964 clear_public_keys, readd, debug, verbose) = params
965 return backend.RemoveNodeSshKey(node_uuid, node_name,
966 master_candidate_uuids,
967 potential_master_candidates,
968 from_authorized_keys=from_authorized_keys,
969 from_public_keys=from_public_keys,
970 clear_authorized_keys=clear_authorized_keys,
971 clear_public_keys=clear_public_keys,
972 readd=readd,
973 ssh_update_debug=debug,
974 ssh_update_verbose=verbose)
975
976 @staticmethod
977 def perspective_node_ssh_key_remove_light(params):
978 """Removes a node's SSH key from the master's public key file.
979
980 """
981 (node_name, ) = params
982 return backend.RemoveSshKeyFromPublicKeyFile(node_name)
983
984 # cluster --------------------------
985
986 @staticmethod
987 def perspective_version(params):
988 """Query version information.
989
990 """
991 return constants.PROTOCOL_VERSION
992
993 @staticmethod
994 def perspective_upload_file(params):
995 """Upload a file.
996
997 Note that the backend implementation imposes strict rules on which
998 files are accepted.
999
1000 """
1001 return backend.UploadFile(*(params[0]))
1002
1003 @staticmethod
1004 def perspective_upload_file_single(params):
1005 """Upload a file.
1006
1007 Note that the backend implementation imposes strict rules on which
1008 files are accepted.
1009
1010 """
1011 return backend.UploadFile(*params)
1012
1013 @staticmethod
1014 def perspective_master_node_name(params):
1015 """Returns the master node name.
1016
1017 """
1018 return backend.GetMasterNodeName()
1019
1020 @staticmethod
1021 def perspective_run_oob(params):
1022 """Runs oob on node.
1023
1024 """
1025 output = backend.RunOob(params[0], params[1], params[2], params[3])
1026 if output:
1027 result = serializer.LoadJson(output)
1028 else:
1029 result = None
1030 return result
1031
1032 @staticmethod
1033 def perspective_restricted_command(params):
1034 """Runs a restricted command.
1035
1036 """
1037 (cmd, ) = params
1038
1039 return backend.RunConstrainedCmd(
1040 cmd,
1041 lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
1042 path=pathutils.RESTRICTED_COMMANDS_DIR)
1043
1044 @staticmethod
1045 def perspective_repair_command(params):
1046 """ Run a repair command.
1047
1048 """
1049 (cmd, inp, ) = params
1050
1051 return backend.RunConstrainedCmd(
1052 cmd,
1053 lock_file=pathutils.REPAIR_COMMANDS_LOCK_FILE,
1054 path=pathutils.REPAIR_COMMANDS_DIR,
1055 inp=inp)
1056
1057 @staticmethod
1058 def perspective_write_ssconf_files(params):
1059 """Write ssconf files.
1060
1061 """
1062 (values,) = params
1063 return ssconf.WriteSsconfFiles(values)
1064
1065 @staticmethod
1066 def perspective_get_watcher_pause(params):
1067 """Get watcher pause end.
1068
1069 """
1070 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
1071
1072 @staticmethod
1073 def perspective_set_watcher_pause(params):
1074 """Set watcher pause.
1075
1076 """
1077 (until, ) = params
1078 return backend.SetWatcherPause(until)
1079
1080 @staticmethod
1081 def perspective_get_file_info(params):
1082 """Get info on whether a file exists and its properties.
1083
1084 """
1085 (path, ) = params
1086 return backend.GetFileInfo(path)
1087
1088 # os -----------------------
1089
1090 @staticmethod
1091 def perspective_os_diagnose(params):
1092 """Query detailed information about existing OSes.
1093
1094 """
1095 return backend.DiagnoseOS()
1096
1097 @staticmethod
1098 def perspective_os_validate(params):
1099 """Run a given OS' validation routine.
1100
1101 """
1102 required, name, checks, params, force_variant = params
1103 return backend.ValidateOS(required, name, checks, params, force_variant)
1104
1105 @staticmethod
1106 def perspective_os_export(params):
1107 """Export an OS definition into an instance specific package.
1108
1109 """
1110 instance = objects.Instance.FromDict(params[0])
1111 override_env = params[1]
1112 return backend.ExportOS(instance, override_env)
1113
1114 # extstorage -----------------------
1115
1116 @staticmethod
1117 def perspective_extstorage_diagnose(params):
1118 """Query detailed information about existing extstorage providers.
1119
1120 """
1121 return backend.DiagnoseExtStorage()
1122
1123 # hooks -----------------------
1124
1125 @staticmethod
1126 def perspective_hooks_runner(params):
1127 """Run hook scripts.
1128
1129 """
1130 hpath, phase, env = params
1131 hr = backend.HooksRunner()
1132 return hr.RunHooks(hpath, phase, env)
1133
1134 # iallocator -----------------
1135
1136 @staticmethod
1137 def perspective_iallocator_runner(params):
1138 """Run an iallocator script.
1139
1140 """
1141 name, idata, ial_params_dict = params
1142 ial_params = []
1143 for ial_param in ial_params_dict.items():
1144 if ial_param[1] is not None:
1145 ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1146 else:
1147 ial_params.append("--" + ial_param[0])
1148 iar = backend.IAllocatorRunner()
1149 return iar.Run(name, idata, ial_params)
1150
1151 # test -----------------------
1152
1153 @staticmethod
1154 def perspective_test_delay(params):
1155 """Run test delay.
1156
1157 """
1158 duration = params[0]
1159 status, rval = utils.TestDelay(duration)
1160 if not status:
1161 raise backend.RPCFail(rval)
1162 return rval
1163
1164 # file storage ---------------
1165
1166 @staticmethod
1167 def perspective_file_storage_dir_create(params):
1168 """Create the file storage directory.
1169
1170 """
1171 file_storage_dir = params[0]
1172 return backend.CreateFileStorageDir(file_storage_dir)
1173
1174 @staticmethod
1175 def perspective_file_storage_dir_remove(params):
1176 """Remove the file storage directory.
1177
1178 """
1179 file_storage_dir = params[0]
1180 return backend.RemoveFileStorageDir(file_storage_dir)
1181
1182 @staticmethod
1183 def perspective_file_storage_dir_rename(params):
1184 """Rename the file storage directory.
1185
1186 """
1187 old_file_storage_dir = params[0]
1188 new_file_storage_dir = params[1]
1189 return backend.RenameFileStorageDir(old_file_storage_dir,
1190 new_file_storage_dir)
1191
1192 # jobs ------------------------
1193
1194 @staticmethod
1195 @_RequireJobQueueLock
1196 def perspective_jobqueue_update(params):
1197 """Update job queue.
1198
1199 """
1200 (file_name, content) = params
1201 return backend.JobQueueUpdate(file_name, content)
1202
1203 @staticmethod
1204 @_RequireJobQueueLock
1205 def perspective_jobqueue_purge(params):
1206 """Purge job queue.
1207
1208 """
1209 return backend.JobQueuePurge()
1210
1211 @staticmethod
1212 @_RequireJobQueueLock
1213 def perspective_jobqueue_rename(params):
1214 """Rename a job queue file.
1215
1216 """
1217 # TODO: What if a file fails to rename?
1218 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1219
1220 @staticmethod
1221 @_RequireJobQueueLock
1222 def perspective_jobqueue_set_drain_flag(params):
1223 """Set job queue's drain flag.
1224
1225 """
1226 (flag, ) = params
1227
1228 return jstore.SetDrainFlag(flag)
1229
1230 # hypervisor ---------------
1231
1232 @staticmethod
1233 def perspective_hypervisor_validate_params(params):
1234 """Validate the hypervisor parameters.
1235
1236 """
1237 (hvname, hvparams) = params
1238 return backend.ValidateHVParams(hvname, hvparams)
1239
1240 # Crypto
1241
1242 @staticmethod
1243 def perspective_x509_cert_create(params):
1244 """Creates a new X509 certificate for SSL/TLS.
1245
1246 """
1247 (validity, ) = params
1248 return backend.CreateX509Certificate(validity)
1249
1250 @staticmethod
1251 def perspective_x509_cert_remove(params):
1252 """Removes a X509 certificate.
1253
1254 """
1255 (name, ) = params
1256 return backend.RemoveX509Certificate(name)
1257
1258 # Import and export
1259
1260 @staticmethod
1261 def perspective_import_start(params):
1262 """Starts an import daemon.
1263
1264 """
1265 (opts_s, instance, component, (dest, dest_args)) = params
1266
1267 opts = objects.ImportExportOptions.FromDict(opts_s)
1268
1269 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1270 None, None,
1271 objects.Instance.FromDict(instance),
1272 component, dest,
1273 _DecodeImportExportIO(dest,
1274 dest_args))
1275
1276 @staticmethod
1277 def perspective_export_start(params):
1278 """Starts an export daemon.
1279
1280 """
1281 (opts_s, host, port, instance, component, (source, source_args)) = params
1282
1283 opts = objects.ImportExportOptions.FromDict(opts_s)
1284
1285 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1286 host, port,
1287 objects.Instance.FromDict(instance),
1288 component, source,
1289 _DecodeImportExportIO(source,
1290 source_args))
1291
1292 @staticmethod
1293 def perspective_impexp_status(params):
1294 """Retrieves the status of an import or export daemon.
1295
1296 """
1297 return backend.GetImportExportStatus(params[0])
1298
1299 @staticmethod
1300 def perspective_impexp_abort(params):
1301 """Aborts an import or export.
1302
1303 """
1304 return backend.AbortImportExport(params[0])
1305
1306 @staticmethod
1307 def perspective_impexp_cleanup(params):
1308 """Cleans up after an import or export.
1309
1310 """
1311 return backend.CleanupImportExport(params[0])
1312
1313
1314 def CheckNoded(options, args):
1315 """Initial checks whether to run or exit with a failure.
1316
1317 """
1318 if args: # noded doesn't take any arguments
1319 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1320 sys.argv[0])
1321 sys.exit(constants.EXIT_FAILURE)
1322
1323 if options.max_clients < 1:
1324 print >> sys.stderr, ("%s --max-clients argument must be >= 1" %
1325 sys.argv[0])
1326 sys.exit(constants.EXIT_FAILURE)
1327
1328 try:
1329 codecs.lookup("string-escape")
1330 except LookupError:
1331 print >> sys.stderr, ("Can't load the string-escape code which is part"
1332 " of the Python installation. Is your installation"
1333 " complete/correct? Aborting.")
1334 sys.exit(constants.EXIT_FAILURE)
1335
1336
1337 def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1338 """Callback function to verify a peer against the candidate cert map.
1339
1340 Note that we have a chicken-and-egg problem during cluster init and upgrade.
1341 This method checks whether the incoming connection comes from a master
1342 candidate by comparing it to the master certificate map in the cluster
1343 configuration. However, during cluster init and cluster upgrade there
1344 are various RPC calls done to the master node itself, before the candidate
1345 certificate list is established and the cluster configuration is written.
1346 In this case, we cannot check against the master candidate map.
1347
1348 This problem is solved by checking whether the candidate map is empty. An
1349 initialized 2.11 or higher cluster has at least one entry for the master
1350 node in the candidate map. If the map is empty, we know that we are still
1351 in the bootstrap/upgrade phase. In this case, we read the server certificate
1352 digest and compare it to the incoming request.
1353
1354 This means that after an upgrade of Ganeti, the system continues to operate
1355 like before, using server certificates only. After the client certificates
1356 are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1357 RPC communication is switched to using client certificates and the trick of
1358 using server certificates does not work anymore.
1359
1360 @type conn: C{OpenSSL.SSL.Connection}
1361 @param conn: the OpenSSL connection object
1362 @type cert: C{OpenSSL.X509}
1363 @param cert: the peer's SSL certificate
1364 @type errdepth: integer
1365 @param errdepth: number of the step in the certificate chain starting at 0
1366 for the actual client certificate.
1367
1368 """
1369 # some parameters are unused, but this is the API
1370 # pylint: disable=W0613
1371
1372 # If we receive a certificate from the certificate chain that is higher
1373 # than the lowest element of the chain, we have to check it against the
1374 # server certificate.
1375 if errdepth > 0:
1376 server_digest = utils.GetCertificateDigest(
1377 cert_filename=pathutils.NODED_CERT_FILE)
1378 match = cert.digest("sha1") == server_digest
1379 if not match:
1380 logging.debug("Received certificate from the certificate chain, which"
1381 " does not match the server certficate. Digest of the"
1382 " received certificate: %s. Digest of the server"
1383 " certificate: %s.", cert.digest("sha1"), server_digest)
1384 return match
1385 elif errdepth == 0:
1386 sstore = ssconf.SimpleStore()
1387 try:
1388 candidate_certs = sstore.GetMasterCandidatesCertMap()
1389 except errors.ConfigurationError:
1390 logging.info("No candidate certificates found. Switching to "
1391 "bootstrap/update mode.")
1392 candidate_certs = None
1393 if not candidate_certs:
1394 candidate_certs = {
1395 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest(
1396 cert_filename=pathutils.NODED_CERT_FILE)}
1397 match = cert.digest("sha1") in candidate_certs.values()
1398 if not match:
1399 logging.debug("Received certificate which is not a certificate of a"
1400 " master candidate. Certificate digest: %s. List of master"
1401 " candidate certificate digests: %s.", cert.digest("sha1"),
1402 str(candidate_certs))
1403 return match
1404 else:
1405 logging.error("Invalid errdepth value: %s.", errdepth)
1406 return False
1407
1408
1409 def PrepNoded(options, _):
1410 """Preparation node daemon function, executed with the PID file held.
1411
1412 """
1413 if options.mlock:
1414 request_executor_class = MlockallRequestExecutor
1415 try:
1416 utils.Mlockall()
1417 except errors.NoCtypesError:
1418 logging.warning("Cannot set memory lock, ctypes module not found")
1419 request_executor_class = http.server.HttpServerRequestExecutor
1420 else:
1421 request_executor_class = http.server.HttpServerRequestExecutor
1422
1423 # Read SSL certificate
1424 if options.ssl:
1425 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1426 ssl_cert_path=options.ssl_cert)
1427 else:
1428 ssl_params = None
1429
1430 err = _PrepareQueueLock()
1431 if err is not None:
1432 # this might be some kind of file-system/permission error; while
1433 # this breaks the job queue functionality, we shouldn't prevent
1434 # startup of the whole node daemon because of this
1435 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1436
1437 handler = NodeRequestHandler()
1438
1439 mainloop = daemon.Mainloop()
1440 server = http.server.HttpServer(
1441 mainloop, options.bind_address, options.port, options.max_clients,
1442 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1443 request_executor_class=request_executor_class,
1444 ssl_verify_callback=SSLVerifyPeer)
1445 server.Start()
1446
1447 return (mainloop, server)
1448
1449
1450 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1451 """Main node daemon function, executed with the PID file held.
1452
1453 """
1454 (mainloop, server) = prep_data
1455 try:
1456 mainloop.Run()
1457 finally:
1458 server.Stop()
1459
1460
1461 def Main():
1462 """Main function for the node daemon.
1463
1464 """
1465 parser = OptionParser(description="Ganeti node daemon",
1466 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1467 " [-i INTERFACE]"),
1468 version="%%prog (ganeti) %s" %
1469 constants.RELEASE_VERSION)
1470 parser.add_option("--no-mlock", dest="mlock",
1471 help="Do not mlock the node memory in ram",
1472 default=True, action="store_false")
1473 parser.add_option("--max-clients", dest="max_clients",
1474 default=20, type="int",
1475 help="Number of simultaneous connections accepted"
1476 " by noded")
1477
1478 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1479 default_ssl_cert=pathutils.NODED_CERT_FILE,
1480 default_ssl_key=pathutils.NODED_CERT_FILE,
1481 console_logging=True,
1482 warn_breach=True)