Noded: make "bootstrap" a constant
[ganeti-github.git] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Ganeti node daemon"""
32
33 # pylint: disable=C0103,W0142
34
35 # C0103: Functions in this module need to have a given name structure,
36 # and the name of the daemon doesn't match
37
38 # W0142: Used * or ** magic, since we do use it extensively in this
39 # module
40
41 import os
42 import sys
43 import logging
44 import signal
45 import codecs
46
47 from optparse import OptionParser
48
49 from ganeti import backend
50 from ganeti import constants
51 from ganeti import objects
52 from ganeti import errors
53 from ganeti import jstore
54 from ganeti import daemon
55 from ganeti import http
56 from ganeti import utils
57 from ganeti.storage import container
58 from ganeti import serializer
59 from ganeti import netutils
60 from ganeti import pathutils
61 from ganeti import ssconf
62
63 import ganeti.http.server # pylint: disable=W0611
64
65
66 queue_lock = None
67
68
69 def _extendReasonTrail(trail, source, reason=""):
70 """Extend the reason trail with noded information
71
72 The trail is extended by appending the name of the noded functionality
73 """
74 assert trail is not None
75 trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
76 trail.append((trail_source, reason, utils.EpochNano()))
77
78
79 def _PrepareQueueLock():
80 """Try to prepare the queue lock.
81
82 @return: None for success, otherwise an exception object
83
84 """
85 global queue_lock # pylint: disable=W0603
86
87 if queue_lock is not None:
88 return None
89
90 # Prepare job queue
91 try:
92 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
93 return None
94 except EnvironmentError, err:
95 return err
96
97
98 def _RequireJobQueueLock(fn):
99 """Decorator for job queue manipulating functions.
100
101 """
102 QUEUE_LOCK_TIMEOUT = 10
103
104 def wrapper(*args, **kwargs):
105 # Locking in exclusive, blocking mode because there could be several
106 # children running at the same time. Waiting up to 10 seconds.
107 if _PrepareQueueLock() is not None:
108 raise errors.JobQueueError("Job queue failed initialization,"
109 " cannot update jobs")
110 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
111 try:
112 return fn(*args, **kwargs)
113 finally:
114 queue_lock.Unlock()
115
116 return wrapper
117
118
119 def _DecodeImportExportIO(ieio, ieioargs):
120 """Decodes import/export I/O information.
121
122 """
123 if ieio == constants.IEIO_RAW_DISK:
124 assert len(ieioargs) == 1
125 return (objects.Disk.FromDict(ieioargs[0]), )
126
127 if ieio == constants.IEIO_SCRIPT:
128 assert len(ieioargs) == 2
129 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
130
131 return ieioargs
132
133
134 def _DefaultAlternative(value, default):
135 """Returns value or, if evaluating to False, a default value.
136
137 Returns the given value, unless it evaluates to False. In the latter case the
138 default value is returned.
139
140 @param value: Value to return if it doesn't evaluate to False
141 @param default: Default value
142 @return: Given value or the default
143
144 """
145 if value:
146 return value
147
148 return default
149
150
151 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
152 """Subclass ensuring request handlers are locked in RAM.
153
154 """
155 def __init__(self, *args, **kwargs):
156 utils.Mlockall()
157
158 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
159
160
161 class NodeRequestHandler(http.server.HttpServerHandler):
162 """The server implementation.
163
164 This class holds all methods exposed over the RPC interface.
165
166 """
167 # too many public methods, and unused args - all methods get params
168 # due to the API
169 # pylint: disable=R0904,W0613
170 def __init__(self):
171 http.server.HttpServerHandler.__init__(self)
172 self.noded_pid = os.getpid()
173
174 def HandleRequest(self, req):
175 """Handle a request.
176
177 """
178
179 if req.request_method.upper() != http.HTTP_POST:
180 raise http.HttpBadRequest("Only the POST method is supported")
181
182 path = req.request_path
183 if path.startswith("/"):
184 path = path[1:]
185
186 method = getattr(self, "perspective_%s" % path, None)
187 if method is None:
188 raise http.HttpNotFound()
189
190 try:
191 result = (True, method(serializer.LoadJson(req.request_body)))
192
193 except backend.RPCFail, err:
194 # our custom failure exception; str(err) works fine if the
195 # exception was constructed with a single argument, and in
196 # this case, err.message == err.args[0] == str(err)
197 result = (False, str(err))
198 except errors.QuitGanetiException, err:
199 # Tell parent to quit
200 logging.info("Shutting down the node daemon, arguments: %s",
201 str(err.args))
202 os.kill(self.noded_pid, signal.SIGTERM)
203 # And return the error's arguments, which must be already in
204 # correct tuple format
205 result = err.args
206 except Exception, err:
207 logging.exception("Error in RPC call")
208 result = (False, "Error while executing backend function: %s" % str(err))
209
210 return serializer.DumpJson(result)
211
212 # the new block devices --------------------------
213
214 @staticmethod
215 def perspective_blockdev_create(params):
216 """Create a block device.
217
218 """
219 (bdev_s, size, owner, on_primary, info, excl_stor) = params
220 bdev = objects.Disk.FromDict(bdev_s)
221 if bdev is None:
222 raise ValueError("can't unserialize data!")
223 return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
224 excl_stor)
225
226 @staticmethod
227 def perspective_blockdev_pause_resume_sync(params):
228 """Pause/resume sync of a block device.
229
230 """
231 disks_s, pause = params
232 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
233 return backend.BlockdevPauseResumeSync(disks, pause)
234
235 @staticmethod
236 def perspective_blockdev_image(params):
237 """Image a block device.
238
239 """
240 bdev_s, image, size = params
241 bdev = objects.Disk.FromDict(bdev_s)
242 return backend.BlockdevImage(bdev, image, size)
243
244 @staticmethod
245 def perspective_blockdev_wipe(params):
246 """Wipe a block device.
247
248 """
249 bdev_s, offset, size = params
250 bdev = objects.Disk.FromDict(bdev_s)
251 return backend.BlockdevWipe(bdev, offset, size)
252
253 @staticmethod
254 def perspective_blockdev_remove(params):
255 """Remove a block device.
256
257 """
258 bdev_s = params[0]
259 bdev = objects.Disk.FromDict(bdev_s)
260 return backend.BlockdevRemove(bdev)
261
262 @staticmethod
263 def perspective_blockdev_rename(params):
264 """Remove a block device.
265
266 """
267 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
268 return backend.BlockdevRename(devlist)
269
270 @staticmethod
271 def perspective_blockdev_assemble(params):
272 """Assemble a block device.
273
274 """
275 bdev_s, idict, on_primary, idx = params
276 bdev = objects.Disk.FromDict(bdev_s)
277 instance = objects.Instance.FromDict(idict)
278 if bdev is None:
279 raise ValueError("can't unserialize data!")
280 return backend.BlockdevAssemble(bdev, instance, on_primary, idx)
281
282 @staticmethod
283 def perspective_blockdev_shutdown(params):
284 """Shutdown a block device.
285
286 """
287 bdev_s = params[0]
288 bdev = objects.Disk.FromDict(bdev_s)
289 if bdev is None:
290 raise ValueError("can't unserialize data!")
291 return backend.BlockdevShutdown(bdev)
292
293 @staticmethod
294 def perspective_blockdev_addchildren(params):
295 """Add a child to a mirror device.
296
297 Note: this is only valid for mirror devices. It's the caller's duty
298 to send a correct disk, otherwise we raise an error.
299
300 """
301 bdev_s, ndev_s = params
302 bdev = objects.Disk.FromDict(bdev_s)
303 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
304 if bdev is None or ndevs.count(None) > 0:
305 raise ValueError("can't unserialize data!")
306 return backend.BlockdevAddchildren(bdev, ndevs)
307
308 @staticmethod
309 def perspective_blockdev_removechildren(params):
310 """Remove a child from a mirror device.
311
312 This is only valid for mirror devices, of course. It's the callers
313 duty to send a correct disk, otherwise we raise an error.
314
315 """
316 bdev_s, ndev_s = params
317 bdev = objects.Disk.FromDict(bdev_s)
318 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
319 if bdev is None or ndevs.count(None) > 0:
320 raise ValueError("can't unserialize data!")
321 return backend.BlockdevRemovechildren(bdev, ndevs)
322
323 @staticmethod
324 def perspective_blockdev_getmirrorstatus(params):
325 """Return the mirror status for a list of disks.
326
327 """
328 disks = [objects.Disk.FromDict(dsk_s)
329 for dsk_s in params[0]]
330 return [status.ToDict()
331 for status in backend.BlockdevGetmirrorstatus(disks)]
332
333 @staticmethod
334 def perspective_blockdev_getmirrorstatus_multi(params):
335 """Return the mirror status for a list of disks.
336
337 """
338 (node_disks, ) = params
339
340 disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
341
342 result = []
343
344 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
345 if success:
346 result.append((success, status.ToDict()))
347 else:
348 result.append((success, status))
349
350 return result
351
352 @staticmethod
353 def perspective_blockdev_find(params):
354 """Expose the FindBlockDevice functionality for a disk.
355
356 This will try to find but not activate a disk.
357
358 """
359 disk = objects.Disk.FromDict(params[0])
360
361 result = backend.BlockdevFind(disk)
362 if result is None:
363 return None
364
365 return result.ToDict()
366
367 @staticmethod
368 def perspective_blockdev_snapshot(params):
369 """Create a snapshot device.
370
371 Note that this is only valid for LVM disks, if we get passed
372 something else we raise an exception. The snapshot device can be
373 remove by calling the generic block device remove call.
374
375 """
376 cfbd = objects.Disk.FromDict(params[0])
377 return backend.BlockdevSnapshot(cfbd)
378
379 @staticmethod
380 def perspective_blockdev_grow(params):
381 """Grow a stack of devices.
382
383 """
384 if len(params) < 5:
385 raise ValueError("Received only %s parameters in blockdev_grow,"
386 " old master?" % len(params))
387 cfbd = objects.Disk.FromDict(params[0])
388 amount = params[1]
389 dryrun = params[2]
390 backingstore = params[3]
391 excl_stor = params[4]
392 return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
393
394 @staticmethod
395 def perspective_blockdev_close(params):
396 """Closes the given block devices.
397
398 """
399 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
400 return backend.BlockdevClose(params[0], disks)
401
402 @staticmethod
403 def perspective_blockdev_getdimensions(params):
404 """Compute the sizes of the given block devices.
405
406 """
407 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
408 return backend.BlockdevGetdimensions(disks)
409
410 @staticmethod
411 def perspective_blockdev_setinfo(params):
412 """Sets metadata information on the given block device.
413
414 """
415 (disk, info) = params
416 disk = objects.Disk.FromDict(disk)
417 return backend.BlockdevSetInfo(disk, info)
418
419 # blockdev/drbd specific methods ----------
420
421 @staticmethod
422 def perspective_drbd_disconnect_net(params):
423 """Disconnects the network connection of drbd disks.
424
425 Note that this is only valid for drbd disks, so the members of the
426 disk list must all be drbd devices.
427
428 """
429 (disks,) = params
430 disks = [objects.Disk.FromDict(disk) for disk in disks]
431 return backend.DrbdDisconnectNet(disks)
432
433 @staticmethod
434 def perspective_drbd_attach_net(params):
435 """Attaches the network connection of drbd disks.
436
437 Note that this is only valid for drbd disks, so the members of the
438 disk list must all be drbd devices.
439
440 """
441 disks, instance_name, multimaster = params
442 disks = [objects.Disk.FromDict(disk) for disk in disks]
443 return backend.DrbdAttachNet(disks, instance_name, multimaster)
444
445 @staticmethod
446 def perspective_drbd_wait_sync(params):
447 """Wait until DRBD disks are synched.
448
449 Note that this is only valid for drbd disks, so the members of the
450 disk list must all be drbd devices.
451
452 """
453 (disks,) = params
454 disks = [objects.Disk.FromDict(disk) for disk in disks]
455 return backend.DrbdWaitSync(disks)
456
457 @staticmethod
458 def perspective_drbd_needs_activation(params):
459 """Checks if the drbd devices need activation
460
461 Note that this is only valid for drbd disks, so the members of the
462 disk list must all be drbd devices.
463
464 """
465 (disks,) = params
466 disks = [objects.Disk.FromDict(disk) for disk in disks]
467 return backend.DrbdNeedsActivation(disks)
468
469 @staticmethod
470 def perspective_drbd_helper(_):
471 """Query drbd helper.
472
473 """
474 return backend.GetDrbdUsermodeHelper()
475
476 # export/import --------------------------
477
478 @staticmethod
479 def perspective_finalize_export(params):
480 """Expose the finalize export functionality.
481
482 """
483 instance = objects.Instance.FromDict(params[0])
484
485 snap_disks = []
486 for disk in params[1]:
487 if isinstance(disk, bool):
488 snap_disks.append(disk)
489 else:
490 snap_disks.append(objects.Disk.FromDict(disk))
491
492 return backend.FinalizeExport(instance, snap_disks)
493
494 @staticmethod
495 def perspective_export_info(params):
496 """Query information about an existing export on this node.
497
498 The given path may not contain an export, in which case we return
499 None.
500
501 """
502 path = params[0]
503 return backend.ExportInfo(path)
504
505 @staticmethod
506 def perspective_export_list(params):
507 """List the available exports on this node.
508
509 Note that as opposed to export_info, which may query data about an
510 export in any path, this only queries the standard Ganeti path
511 (pathutils.EXPORT_DIR).
512
513 """
514 return backend.ListExports()
515
516 @staticmethod
517 def perspective_export_remove(params):
518 """Remove an export.
519
520 """
521 export = params[0]
522 return backend.RemoveExport(export)
523
524 # block device ---------------------
525 @staticmethod
526 def perspective_bdev_sizes(params):
527 """Query the list of block devices
528
529 """
530 devices = params[0]
531 return backend.GetBlockDevSizes(devices)
532
533 # volume --------------------------
534
535 @staticmethod
536 def perspective_lv_list(params):
537 """Query the list of logical volumes in a given volume group.
538
539 """
540 vgname = params[0]
541 return backend.GetVolumeList(vgname)
542
543 @staticmethod
544 def perspective_vg_list(params):
545 """Query the list of volume groups.
546
547 """
548 return backend.ListVolumeGroups()
549
550 # Storage --------------------------
551
552 @staticmethod
553 def perspective_storage_list(params):
554 """Get list of storage units.
555
556 """
557 (su_name, su_args, name, fields) = params
558 return container.GetStorage(su_name, *su_args).List(name, fields)
559
560 @staticmethod
561 def perspective_storage_modify(params):
562 """Modify a storage unit.
563
564 """
565 (su_name, su_args, name, changes) = params
566 return container.GetStorage(su_name, *su_args).Modify(name, changes)
567
568 @staticmethod
569 def perspective_storage_execute(params):
570 """Execute an operation on a storage unit.
571
572 """
573 (su_name, su_args, name, op) = params
574 return container.GetStorage(su_name, *su_args).Execute(name, op)
575
576 # bridge --------------------------
577
578 @staticmethod
579 def perspective_bridges_exist(params):
580 """Check if all bridges given exist on this node.
581
582 """
583 bridges_list = params[0]
584 return backend.BridgesExist(bridges_list)
585
586 # instance --------------------------
587
588 @staticmethod
589 def perspective_instance_os_add(params):
590 """Install an OS on a given instance.
591
592 """
593 inst_s = params[0]
594 inst = objects.Instance.FromDict(inst_s)
595 reinstall = params[1]
596 debug = params[2]
597 return backend.InstanceOsAdd(inst, reinstall, debug)
598
599 @staticmethod
600 def perspective_instance_run_rename(params):
601 """Runs the OS rename script for an instance.
602
603 """
604 inst_s, old_name, debug = params
605 inst = objects.Instance.FromDict(inst_s)
606 return backend.RunRenameInstance(inst, old_name, debug)
607
608 @staticmethod
609 def perspective_instance_shutdown(params):
610 """Shutdown an instance.
611
612 """
613 instance = objects.Instance.FromDict(params[0])
614 timeout = params[1]
615 trail = params[2]
616 _extendReasonTrail(trail, "shutdown")
617 return backend.InstanceShutdown(instance, timeout, trail)
618
619 @staticmethod
620 def perspective_instance_start(params):
621 """Start an instance.
622
623 """
624 (instance_name, startup_paused, trail) = params
625 instance = objects.Instance.FromDict(instance_name)
626 _extendReasonTrail(trail, "start")
627 return backend.StartInstance(instance, startup_paused, trail)
628
629 @staticmethod
630 def perspective_hotplug_device(params):
631 """Hotplugs device to a running instance.
632
633 """
634 (idict, action, dev_type, ddict, extra, seq) = params
635 instance = objects.Instance.FromDict(idict)
636 if dev_type == constants.HOTPLUG_TARGET_DISK:
637 device = objects.Disk.FromDict(ddict)
638 elif dev_type == constants.HOTPLUG_TARGET_NIC:
639 device = objects.NIC.FromDict(ddict)
640 else:
641 assert dev_type in constants.HOTPLUG_ALL_TARGETS
642 return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
643
644 @staticmethod
645 def perspective_hotplug_supported(params):
646 """Checks if hotplug is supported.
647
648 """
649 instance = objects.Instance.FromDict(params[0])
650 return backend.HotplugSupported(instance)
651
652 @staticmethod
653 def perspective_instance_metadata_modify(params):
654 """Modify instance metadata.
655
656 """
657 instance = params[0]
658 return backend.ModifyInstanceMetadata(instance)
659
660 @staticmethod
661 def perspective_migration_info(params):
662 """Gather information about an instance to be migrated.
663
664 """
665 instance = objects.Instance.FromDict(params[0])
666 return backend.MigrationInfo(instance)
667
668 @staticmethod
669 def perspective_accept_instance(params):
670 """Prepare the node to accept an instance.
671
672 """
673 instance, info, target = params
674 instance = objects.Instance.FromDict(instance)
675 return backend.AcceptInstance(instance, info, target)
676
677 @staticmethod
678 def perspective_instance_finalize_migration_dst(params):
679 """Finalize the instance migration on the destination node.
680
681 """
682 instance, info, success = params
683 instance = objects.Instance.FromDict(instance)
684 return backend.FinalizeMigrationDst(instance, info, success)
685
686 @staticmethod
687 def perspective_instance_migrate(params):
688 """Migrates an instance.
689
690 """
691 cluster_name, instance, target, live = params
692 instance = objects.Instance.FromDict(instance)
693 return backend.MigrateInstance(cluster_name, instance, target, live)
694
695 @staticmethod
696 def perspective_instance_finalize_migration_src(params):
697 """Finalize the instance migration on the source node.
698
699 """
700 instance, success, live = params
701 instance = objects.Instance.FromDict(instance)
702 return backend.FinalizeMigrationSource(instance, success, live)
703
704 @staticmethod
705 def perspective_instance_get_migration_status(params):
706 """Reports migration status.
707
708 """
709 instance = objects.Instance.FromDict(params[0])
710 return backend.GetMigrationStatus(instance).ToDict()
711
712 @staticmethod
713 def perspective_instance_reboot(params):
714 """Reboot an instance.
715
716 """
717 instance = objects.Instance.FromDict(params[0])
718 reboot_type = params[1]
719 shutdown_timeout = params[2]
720 trail = params[3]
721 _extendReasonTrail(trail, "reboot")
722 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
723 trail)
724
725 @staticmethod
726 def perspective_instance_balloon_memory(params):
727 """Modify instance runtime memory.
728
729 """
730 instance_dict, memory = params
731 instance = objects.Instance.FromDict(instance_dict)
732 return backend.InstanceBalloonMemory(instance, memory)
733
734 @staticmethod
735 def perspective_instance_info(params):
736 """Query instance information.
737
738 """
739 (instance_name, hypervisor_name, hvparams) = params
740 return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
741
742 @staticmethod
743 def perspective_instance_migratable(params):
744 """Query whether the specified instance can be migrated.
745
746 """
747 instance = objects.Instance.FromDict(params[0])
748 return backend.GetInstanceMigratable(instance)
749
750 @staticmethod
751 def perspective_all_instances_info(params):
752 """Query information about all instances.
753
754 """
755 (hypervisor_list, all_hvparams) = params
756 return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
757
758 @staticmethod
759 def perspective_instance_console_info(params):
760 """Query information on how to get console access to instances
761
762 """
763 return backend.GetInstanceConsoleInfo(params)
764
765 @staticmethod
766 def perspective_instance_list(params):
767 """Query the list of running instances.
768
769 """
770 (hypervisor_list, hvparams) = params
771 return backend.GetInstanceList(hypervisor_list, hvparams)
772
773 # node --------------------------
774
775 @staticmethod
776 def perspective_node_has_ip_address(params):
777 """Checks if a node has the given ip address.
778
779 """
780 return netutils.IPAddress.Own(params[0])
781
782 @staticmethod
783 def perspective_node_info(params):
784 """Query node information.
785
786 """
787 (storage_units, hv_specs) = params
788 return backend.GetNodeInfo(storage_units, hv_specs)
789
790 @staticmethod
791 def perspective_etc_hosts_modify(params):
792 """Modify a node entry in /etc/hosts.
793
794 """
795 backend.EtcHostsModify(params[0], params[1], params[2])
796
797 return True
798
799 @staticmethod
800 def perspective_node_verify(params):
801 """Run a verify sequence on this node.
802
803 """
804 (what, cluster_name, hvparams, node_groups, groups_cfg) = params
805 return backend.VerifyNode(what, cluster_name, hvparams,
806 node_groups, groups_cfg)
807
808 @classmethod
809 def perspective_node_verify_light(cls, params):
810 """Run a light verify sequence on this node.
811
812 This call is meant to perform a less strict verification of the node in
813 certain situations. Right now, it is invoked only when a node is just about
814 to be added to a cluster, and even then, it performs the same checks as
815 L{perspective_node_verify}.
816 """
817 return cls.perspective_node_verify(params)
818
819 @staticmethod
820 def perspective_node_start_master_daemons(params):
821 """Start the master daemons on this node.
822
823 """
824 return backend.StartMasterDaemons(params[0])
825
826 @staticmethod
827 def perspective_node_activate_master_ip(params):
828 """Activate the master IP on this node.
829
830 """
831 master_params = objects.MasterNetworkParameters.FromDict(params[0])
832 return backend.ActivateMasterIp(master_params, params[1])
833
834 @staticmethod
835 def perspective_node_deactivate_master_ip(params):
836 """Deactivate the master IP on this node.
837
838 """
839 master_params = objects.MasterNetworkParameters.FromDict(params[0])
840 return backend.DeactivateMasterIp(master_params, params[1])
841
842 @staticmethod
843 def perspective_node_stop_master(params):
844 """Stops master daemons on this node.
845
846 """
847 return backend.StopMasterDaemons()
848
849 @staticmethod
850 def perspective_node_change_master_netmask(params):
851 """Change the master IP netmask.
852
853 """
854 return backend.ChangeMasterNetmask(params[0], params[1], params[2],
855 params[3])
856
857 @staticmethod
858 def perspective_node_leave_cluster(params):
859 """Cleanup after leaving a cluster.
860
861 """
862 return backend.LeaveCluster(params[0])
863
864 @staticmethod
865 def perspective_node_volumes(params):
866 """Query the list of all logical volume groups.
867
868 """
869 return backend.NodeVolumes()
870
871 @staticmethod
872 def perspective_node_demote_from_mc(params):
873 """Demote a node from the master candidate role.
874
875 """
876 return backend.DemoteFromMC()
877
878 @staticmethod
879 def perspective_node_powercycle(params):
880 """Tries to powercycle the node.
881
882 """
883 (hypervisor_type, hvparams) = params
884 return backend.PowercycleNode(hypervisor_type, hvparams)
885
886 @staticmethod
887 def perspective_node_configure_ovs(params):
888 """Sets up OpenvSwitch on the node.
889
890 """
891 (ovs_name, ovs_link) = params
892 return backend.ConfigureOVS(ovs_name, ovs_link)
893
894 @staticmethod
895 def perspective_node_crypto_tokens(params):
896 """Gets the node's public crypto tokens.
897
898 """
899 token_requests = params[0]
900 return backend.GetCryptoTokens(token_requests)
901
902 @staticmethod
903 def perspective_node_ensure_daemon(params):
904 """Ensure daemon is running.
905
906 """
907 (daemon_name, run) = params
908 return backend.EnsureDaemon(daemon_name, run)
909
910 # cluster --------------------------
911
912 @staticmethod
913 def perspective_version(params):
914 """Query version information.
915
916 """
917 return constants.PROTOCOL_VERSION
918
919 @staticmethod
920 def perspective_upload_file(params):
921 """Upload a file.
922
923 Note that the backend implementation imposes strict rules on which
924 files are accepted.
925
926 """
927 return backend.UploadFile(*(params[0]))
928
929 @staticmethod
930 def perspective_upload_file_single(params):
931 """Upload a file.
932
933 Note that the backend implementation imposes strict rules on which
934 files are accepted.
935
936 """
937 return backend.UploadFile(*params)
938
939 @staticmethod
940 def perspective_master_node_name(params):
941 """Returns the master node name.
942
943 """
944 return backend.GetMasterNodeName()
945
946 @staticmethod
947 def perspective_run_oob(params):
948 """Runs oob on node.
949
950 """
951 output = backend.RunOob(params[0], params[1], params[2], params[3])
952 if output:
953 result = serializer.LoadJson(output)
954 else:
955 result = None
956 return result
957
958 @staticmethod
959 def perspective_restricted_command(params):
960 """Runs a restricted command.
961
962 """
963 (cmd, ) = params
964
965 return backend.RunRestrictedCmd(cmd)
966
967 @staticmethod
968 def perspective_write_ssconf_files(params):
969 """Write ssconf files.
970
971 """
972 (values,) = params
973 return ssconf.WriteSsconfFiles(values)
974
975 @staticmethod
976 def perspective_get_watcher_pause(params):
977 """Get watcher pause end.
978
979 """
980 return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
981
982 @staticmethod
983 def perspective_set_watcher_pause(params):
984 """Set watcher pause.
985
986 """
987 (until, ) = params
988 return backend.SetWatcherPause(until)
989
990 @staticmethod
991 def perspective_get_file_info(params):
992 """Get info on whether a file exists and its properties.
993
994 """
995 (path, ) = params
996 return backend.GetFileInfo(path)
997
998 # os -----------------------
999
1000 @staticmethod
1001 def perspective_os_diagnose(params):
1002 """Query detailed information about existing OSes.
1003
1004 """
1005 return backend.DiagnoseOS()
1006
1007 @staticmethod
1008 def perspective_os_validate(params):
1009 """Run a given OS' validation routine.
1010
1011 """
1012 required, name, checks, params, force_variant = params
1013 return backend.ValidateOS(required, name, checks, params, force_variant)
1014
1015 @staticmethod
1016 def perspective_os_export(params):
1017 """Export an OS definition into an instance specific package.
1018
1019 """
1020 instance = objects.Instance.FromDict(params[0])
1021 override_env = params[1]
1022 return backend.ExportOS(instance, override_env)
1023
1024 # extstorage -----------------------
1025
1026 @staticmethod
1027 def perspective_extstorage_diagnose(params):
1028 """Query detailed information about existing extstorage providers.
1029
1030 """
1031 return backend.DiagnoseExtStorage()
1032
1033 # hooks -----------------------
1034
1035 @staticmethod
1036 def perspective_hooks_runner(params):
1037 """Run hook scripts.
1038
1039 """
1040 hpath, phase, env = params
1041 hr = backend.HooksRunner()
1042 return hr.RunHooks(hpath, phase, env)
1043
1044 # iallocator -----------------
1045
1046 @staticmethod
1047 def perspective_iallocator_runner(params):
1048 """Run an iallocator script.
1049
1050 """
1051 name, idata, ial_params_dict = params
1052 ial_params = []
1053 for ial_param in ial_params_dict.items():
1054 ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1055 iar = backend.IAllocatorRunner()
1056 return iar.Run(name, idata, ial_params)
1057
1058 # test -----------------------
1059
1060 @staticmethod
1061 def perspective_test_delay(params):
1062 """Run test delay.
1063
1064 """
1065 duration = params[0]
1066 status, rval = utils.TestDelay(duration)
1067 if not status:
1068 raise backend.RPCFail(rval)
1069 return rval
1070
1071 # file storage ---------------
1072
1073 @staticmethod
1074 def perspective_file_storage_dir_create(params):
1075 """Create the file storage directory.
1076
1077 """
1078 file_storage_dir = params[0]
1079 return backend.CreateFileStorageDir(file_storage_dir)
1080
1081 @staticmethod
1082 def perspective_file_storage_dir_remove(params):
1083 """Remove the file storage directory.
1084
1085 """
1086 file_storage_dir = params[0]
1087 return backend.RemoveFileStorageDir(file_storage_dir)
1088
1089 @staticmethod
1090 def perspective_file_storage_dir_rename(params):
1091 """Rename the file storage directory.
1092
1093 """
1094 old_file_storage_dir = params[0]
1095 new_file_storage_dir = params[1]
1096 return backend.RenameFileStorageDir(old_file_storage_dir,
1097 new_file_storage_dir)
1098
1099 # jobs ------------------------
1100
1101 @staticmethod
1102 @_RequireJobQueueLock
1103 def perspective_jobqueue_update(params):
1104 """Update job queue.
1105
1106 """
1107 (file_name, content) = params
1108 return backend.JobQueueUpdate(file_name, content)
1109
1110 @staticmethod
1111 @_RequireJobQueueLock
1112 def perspective_jobqueue_purge(params):
1113 """Purge job queue.
1114
1115 """
1116 return backend.JobQueuePurge()
1117
1118 @staticmethod
1119 @_RequireJobQueueLock
1120 def perspective_jobqueue_rename(params):
1121 """Rename a job queue file.
1122
1123 """
1124 # TODO: What if a file fails to rename?
1125 return [backend.JobQueueRename(old, new) for old, new in params[0]]
1126
1127 @staticmethod
1128 @_RequireJobQueueLock
1129 def perspective_jobqueue_set_drain_flag(params):
1130 """Set job queue's drain flag.
1131
1132 """
1133 (flag, ) = params
1134
1135 return jstore.SetDrainFlag(flag)
1136
1137 # hypervisor ---------------
1138
1139 @staticmethod
1140 def perspective_hypervisor_validate_params(params):
1141 """Validate the hypervisor parameters.
1142
1143 """
1144 (hvname, hvparams) = params
1145 return backend.ValidateHVParams(hvname, hvparams)
1146
1147 # Crypto
1148
1149 @staticmethod
1150 def perspective_x509_cert_create(params):
1151 """Creates a new X509 certificate for SSL/TLS.
1152
1153 """
1154 (validity, ) = params
1155 return backend.CreateX509Certificate(validity)
1156
1157 @staticmethod
1158 def perspective_x509_cert_remove(params):
1159 """Removes a X509 certificate.
1160
1161 """
1162 (name, ) = params
1163 return backend.RemoveX509Certificate(name)
1164
1165 # Import and export
1166
1167 @staticmethod
1168 def perspective_import_start(params):
1169 """Starts an import daemon.
1170
1171 """
1172 (opts_s, instance, component, (dest, dest_args)) = params
1173
1174 opts = objects.ImportExportOptions.FromDict(opts_s)
1175
1176 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1177 None, None,
1178 objects.Instance.FromDict(instance),
1179 component, dest,
1180 _DecodeImportExportIO(dest,
1181 dest_args))
1182
1183 @staticmethod
1184 def perspective_export_start(params):
1185 """Starts an export daemon.
1186
1187 """
1188 (opts_s, host, port, instance, component, (source, source_args)) = params
1189
1190 opts = objects.ImportExportOptions.FromDict(opts_s)
1191
1192 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1193 host, port,
1194 objects.Instance.FromDict(instance),
1195 component, source,
1196 _DecodeImportExportIO(source,
1197 source_args))
1198
1199 @staticmethod
1200 def perspective_impexp_status(params):
1201 """Retrieves the status of an import or export daemon.
1202
1203 """
1204 return backend.GetImportExportStatus(params[0])
1205
1206 @staticmethod
1207 def perspective_impexp_abort(params):
1208 """Aborts an import or export.
1209
1210 """
1211 return backend.AbortImportExport(params[0])
1212
1213 @staticmethod
1214 def perspective_impexp_cleanup(params):
1215 """Cleans up after an import or export.
1216
1217 """
1218 return backend.CleanupImportExport(params[0])
1219
1220
1221 def CheckNoded(_, args):
1222 """Initial checks whether to run or exit with a failure.
1223
1224 """
1225 if args: # noded doesn't take any arguments
1226 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1227 sys.argv[0])
1228 sys.exit(constants.EXIT_FAILURE)
1229 try:
1230 codecs.lookup("string-escape")
1231 except LookupError:
1232 print >> sys.stderr, ("Can't load the string-escape code which is part"
1233 " of the Python installation. Is your installation"
1234 " complete/correct? Aborting.")
1235 sys.exit(constants.EXIT_FAILURE)
1236
1237
1238 def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1239 """Callback function to verify a peer against the candidate cert map.
1240
1241 Note that we have a chicken-and-egg problem during cluster init and upgrade.
1242 This method checks whether the incoming connection comes from a master
1243 candidate by comparing it to the master certificate map in the cluster
1244 configuration. However, during cluster init and cluster upgrade there
1245 are various RPC calls done to the master node itself, before the candidate
1246 certificate list is established and the cluster configuration is written.
1247 In this case, we cannot check against the master candidate map.
1248
1249 This problem is solved by checking whether the candidate map is empty. An
1250 initialized 2.11 or higher cluster has at least one entry for the master
1251 node in the candidate map. If the map is empty, we know that we are still
1252 in the bootstrap/upgrade phase. In this case, we read the server certificate
1253 digest and compare it to the incoming request.
1254
1255 This means that after an upgrade of Ganeti, the system continues to operate
1256 like before, using server certificates only. After the client certificates
1257 are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1258 RPC communication is switched to using client certificates and the trick of
1259 using server certificates does not work anymore.
1260
1261 @type conn: C{OpenSSL.SSL.Connection}
1262 @param conn: the OpenSSL connection object
1263 @type cert: C{OpenSSL.X509}
1264 @param cert: the peer's SSL certificate
1265
1266 """
1267 # some parameters are unused, but this is the API
1268 # pylint: disable=W0613
1269 sstore = ssconf.SimpleStore()
1270 try:
1271 candidate_certs = sstore.GetMasterCandidatesCertMap()
1272 except errors.ConfigurationError:
1273 logging.info("No candidate certificates found. Switching to "
1274 "bootstrap/update mode.")
1275 candidate_certs = None
1276 if not candidate_certs:
1277 candidate_certs = {
1278 constants.CRYPTO_BOOTSTRAP: utils.GetCertificateDigest(
1279 cert_filename=pathutils.NODED_CERT_FILE)}
1280 return cert.digest("sha1") in candidate_certs.values()
1281 # pylint: enable=W0613
1282
1283
1284 def PrepNoded(options, _):
1285 """Preparation node daemon function, executed with the PID file held.
1286
1287 """
1288 if options.mlock:
1289 request_executor_class = MlockallRequestExecutor
1290 try:
1291 utils.Mlockall()
1292 except errors.NoCtypesError:
1293 logging.warning("Cannot set memory lock, ctypes module not found")
1294 request_executor_class = http.server.HttpServerRequestExecutor
1295 else:
1296 request_executor_class = http.server.HttpServerRequestExecutor
1297
1298 # Read SSL certificate
1299 if options.ssl:
1300 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1301 ssl_cert_path=options.ssl_cert)
1302 else:
1303 ssl_params = None
1304
1305 err = _PrepareQueueLock()
1306 if err is not None:
1307 # this might be some kind of file-system/permission error; while
1308 # this breaks the job queue functionality, we shouldn't prevent
1309 # startup of the whole node daemon because of this
1310 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1311
1312 handler = NodeRequestHandler()
1313
1314 mainloop = daemon.Mainloop()
1315 server = \
1316 http.server.HttpServer(mainloop, options.bind_address, options.port,
1317 handler, ssl_params=ssl_params, ssl_verify_peer=True,
1318 request_executor_class=request_executor_class,
1319 ssl_verify_callback=SSLVerifyPeer)
1320 server.Start()
1321
1322 return (mainloop, server)
1323
1324
1325 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1326 """Main node daemon function, executed with the PID file held.
1327
1328 """
1329 (mainloop, server) = prep_data
1330 try:
1331 mainloop.Run()
1332 finally:
1333 server.Stop()
1334
1335
1336 def Main():
1337 """Main function for the node daemon.
1338
1339 """
1340 parser = OptionParser(description="Ganeti node daemon",
1341 usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1342 " [-i INTERFACE]"),
1343 version="%%prog (ganeti) %s" %
1344 constants.RELEASE_VERSION)
1345 parser.add_option("--no-mlock", dest="mlock",
1346 help="Do not mlock the node memory in ram",
1347 default=True, action="store_false")
1348
1349 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1350 default_ssl_cert=pathutils.NODED_CERT_FILE,
1351 default_ssl_key=pathutils.NODED_CERT_FILE,
1352 console_logging=True,
1353 warn_breach=True)