4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 """Ganeti node daemon"""
33 # pylint: disable=C0103,W0142
35 # C0103: Functions in this module need to have a given name structure,
36 # and the name of the daemon doesn't match
38 # W0142: Used * or ** magic, since we do use it extensively in this
47 from optparse
import OptionParser
49 from ganeti
import backend
50 from ganeti
import constants
51 from ganeti
import objects
52 from ganeti
import errors
53 from ganeti
import jstore
54 from ganeti
import daemon
55 from ganeti
import http
56 from ganeti
import utils
57 from ganeti
.storage
import container
58 from ganeti
import serializer
59 from ganeti
import netutils
60 from ganeti
import pathutils
61 from ganeti
import ssconf
63 import ganeti
.http
.server
# pylint: disable=W0611
69 def _extendReasonTrail(trail
, source
, reason
=""):
70 """Extend the reason trail with noded information
72 The trail is extended by appending the name of the noded functionality
74 assert trail
is not None
75 trail_source
= "%s:%s" % (constants
.OPCODE_REASON_SRC_NODED
, source
)
76 trail
.append((trail_source
, reason
, utils
.EpochNano()))
79 def _PrepareQueueLock():
80 """Try to prepare the queue lock.
82 @return: None for success, otherwise an exception object
85 global queue_lock
# pylint: disable=W0603
87 if queue_lock
is not None:
92 queue_lock
= jstore
.InitAndVerifyQueue(must_lock
=False)
94 except EnvironmentError, err
:
98 def _RequireJobQueueLock(fn
):
99 """Decorator for job queue manipulating functions.
102 QUEUE_LOCK_TIMEOUT
= 10
104 def wrapper(*args
, **kwargs
):
105 # Locking in exclusive, blocking mode because there could be several
106 # children running at the same time. Waiting up to 10 seconds.
107 if _PrepareQueueLock() is not None:
108 raise errors
.JobQueueError("Job queue failed initialization,"
109 " cannot update jobs")
110 queue_lock
.Exclusive(blocking
=True, timeout
=QUEUE_LOCK_TIMEOUT
)
112 return fn(*args
, **kwargs
)
119 def _DecodeImportExportIO(ieio
, ieioargs
):
120 """Decodes import/export I/O information.
123 if ieio
== constants
.IEIO_RAW_DISK
:
124 assert len(ieioargs
) == 1
125 return (objects
.Disk
.FromDict(ieioargs
[0]), )
127 if ieio
== constants
.IEIO_SCRIPT
:
128 assert len(ieioargs
) == 2
129 return (objects
.Disk
.FromDict(ieioargs
[0]), ieioargs
[1])
134 def _DefaultAlternative(value
, default
):
135 """Returns value or, if evaluating to False, a default value.
137 Returns the given value, unless it evaluates to False. In the latter case the
138 default value is returned.
140 @param value: Value to return if it doesn't evaluate to False
141 @param default: Default value
142 @return: Given value or the default
151 class MlockallRequestExecutor(http
.server
.HttpServerRequestExecutor
):
152 """Subclass ensuring request handlers are locked in RAM.
155 def __init__(self
, *args
, **kwargs
):
158 http
.server
.HttpServerRequestExecutor
.__init__(self
, *args
, **kwargs
)
161 class NodeRequestHandler(http
.server
.HttpServerHandler
):
162 """The server implementation.
164 This class holds all methods exposed over the RPC interface.
167 # too many public methods, and unused args - all methods get params
169 # pylint: disable=R0904,W0613
171 http
.server
.HttpServerHandler
.__init__(self
)
172 self
.noded_pid
= os
.getpid()
174 def HandleRequest(self
, req
):
179 if req
.request_method
.upper() != http
.HTTP_POST
:
180 raise http
.HttpBadRequest("Only the POST method is supported")
182 path
= req
.request_path
183 if path
.startswith("/"):
186 method
= getattr(self
, "perspective_%s" % path
, None)
188 raise http
.HttpNotFound()
191 result
= (True, method(serializer
.LoadJson(req
.request_body
)))
193 except backend
.RPCFail
, err
:
194 # our custom failure exception; str(err) works fine if the
195 # exception was constructed with a single argument, and in
196 # this case, err.message == err.args[0] == str(err)
197 result
= (False, str(err
))
198 except errors
.QuitGanetiException
, err
:
199 # Tell parent to quit
200 logging
.info("Shutting down the node daemon, arguments: %s",
202 os
.kill(self
.noded_pid
, signal
.SIGTERM
)
203 # And return the error's arguments, which must be already in
204 # correct tuple format
206 except Exception, err
:
207 logging
.exception("Error in RPC call")
208 result
= (False, "Error while executing backend function: %s" % str(err
))
210 return serializer
.DumpJson(result
)
212 # the new block devices --------------------------
215 def perspective_blockdev_create(params
):
216 """Create a block device.
219 (bdev_s
, size
, owner
, on_primary
, info
, excl_stor
) = params
220 bdev
= objects
.Disk
.FromDict(bdev_s
)
222 raise ValueError("can't unserialize data!")
223 return backend
.BlockdevCreate(bdev
, size
, owner
, on_primary
, info
,
227 def perspective_blockdev_pause_resume_sync(params
):
228 """Pause/resume sync of a block device.
231 disks_s
, pause
= params
232 disks
= [objects
.Disk
.FromDict(bdev_s
) for bdev_s
in disks_s
]
233 return backend
.BlockdevPauseResumeSync(disks
, pause
)
236 def perspective_blockdev_image(params
):
237 """Image a block device.
240 bdev_s
, image
, size
= params
241 bdev
= objects
.Disk
.FromDict(bdev_s
)
242 return backend
.BlockdevImage(bdev
, image
, size
)
245 def perspective_blockdev_wipe(params
):
246 """Wipe a block device.
249 bdev_s
, offset
, size
= params
250 bdev
= objects
.Disk
.FromDict(bdev_s
)
251 return backend
.BlockdevWipe(bdev
, offset
, size
)
254 def perspective_blockdev_remove(params
):
255 """Remove a block device.
259 bdev
= objects
.Disk
.FromDict(bdev_s
)
260 return backend
.BlockdevRemove(bdev
)
263 def perspective_blockdev_rename(params
):
264 """Remove a block device.
267 devlist
= [(objects
.Disk
.FromDict(ds
), uid
) for ds
, uid
in params
[0]]
268 return backend
.BlockdevRename(devlist
)
271 def perspective_blockdev_assemble(params
):
272 """Assemble a block device.
275 bdev_s
, idict
, on_primary
, idx
= params
276 bdev
= objects
.Disk
.FromDict(bdev_s
)
277 instance
= objects
.Instance
.FromDict(idict
)
279 raise ValueError("can't unserialize data!")
280 return backend
.BlockdevAssemble(bdev
, instance
, on_primary
, idx
)
283 def perspective_blockdev_shutdown(params
):
284 """Shutdown a block device.
288 bdev
= objects
.Disk
.FromDict(bdev_s
)
290 raise ValueError("can't unserialize data!")
291 return backend
.BlockdevShutdown(bdev
)
294 def perspective_blockdev_addchildren(params
):
295 """Add a child to a mirror device.
297 Note: this is only valid for mirror devices. It's the caller's duty
298 to send a correct disk, otherwise we raise an error.
301 bdev_s
, ndev_s
= params
302 bdev
= objects
.Disk
.FromDict(bdev_s
)
303 ndevs
= [objects
.Disk
.FromDict(disk_s
) for disk_s
in ndev_s
]
304 if bdev
is None or ndevs
.count(None) > 0:
305 raise ValueError("can't unserialize data!")
306 return backend
.BlockdevAddchildren(bdev
, ndevs
)
309 def perspective_blockdev_removechildren(params
):
310 """Remove a child from a mirror device.
312 This is only valid for mirror devices, of course. It's the callers
313 duty to send a correct disk, otherwise we raise an error.
316 bdev_s
, ndev_s
= params
317 bdev
= objects
.Disk
.FromDict(bdev_s
)
318 ndevs
= [objects
.Disk
.FromDict(disk_s
) for disk_s
in ndev_s
]
319 if bdev
is None or ndevs
.count(None) > 0:
320 raise ValueError("can't unserialize data!")
321 return backend
.BlockdevRemovechildren(bdev
, ndevs
)
324 def perspective_blockdev_getmirrorstatus(params
):
325 """Return the mirror status for a list of disks.
328 disks
= [objects
.Disk
.FromDict(dsk_s
)
329 for dsk_s
in params
[0]]
330 return [status
.ToDict()
331 for status
in backend
.BlockdevGetmirrorstatus(disks
)]
334 def perspective_blockdev_getmirrorstatus_multi(params
):
335 """Return the mirror status for a list of disks.
338 (node_disks
, ) = params
340 disks
= [objects
.Disk
.FromDict(dsk_s
) for dsk_s
in node_disks
]
344 for (success
, status
) in backend
.BlockdevGetmirrorstatusMulti(disks
):
346 result
.append((success
, status
.ToDict()))
348 result
.append((success
, status
))
353 def perspective_blockdev_find(params
):
354 """Expose the FindBlockDevice functionality for a disk.
356 This will try to find but not activate a disk.
359 disk
= objects
.Disk
.FromDict(params
[0])
361 result
= backend
.BlockdevFind(disk
)
365 return result
.ToDict()
368 def perspective_blockdev_snapshot(params
):
369 """Create a snapshot device.
371 Note that this is only valid for LVM disks, if we get passed
372 something else we raise an exception. The snapshot device can be
373 remove by calling the generic block device remove call.
376 cfbd
= objects
.Disk
.FromDict(params
[0])
377 return backend
.BlockdevSnapshot(cfbd
)
380 def perspective_blockdev_grow(params
):
381 """Grow a stack of devices.
385 raise ValueError("Received only %s parameters in blockdev_grow,"
386 " old master?" % len(params
))
387 cfbd
= objects
.Disk
.FromDict(params
[0])
390 backingstore
= params
[3]
391 excl_stor
= params
[4]
392 return backend
.BlockdevGrow(cfbd
, amount
, dryrun
, backingstore
, excl_stor
)
395 def perspective_blockdev_close(params
):
396 """Closes the given block devices.
399 disks
= [objects
.Disk
.FromDict(cf
) for cf
in params
[1]]
400 return backend
.BlockdevClose(params
[0], disks
)
403 def perspective_blockdev_getdimensions(params
):
404 """Compute the sizes of the given block devices.
407 disks
= [objects
.Disk
.FromDict(cf
) for cf
in params
[0]]
408 return backend
.BlockdevGetdimensions(disks
)
411 def perspective_blockdev_setinfo(params
):
412 """Sets metadata information on the given block device.
415 (disk
, info
) = params
416 disk
= objects
.Disk
.FromDict(disk
)
417 return backend
.BlockdevSetInfo(disk
, info
)
419 # blockdev/drbd specific methods ----------
422 def perspective_drbd_disconnect_net(params
):
423 """Disconnects the network connection of drbd disks.
425 Note that this is only valid for drbd disks, so the members of the
426 disk list must all be drbd devices.
430 disks
= [objects
.Disk
.FromDict(disk
) for disk
in disks
]
431 return backend
.DrbdDisconnectNet(disks
)
434 def perspective_drbd_attach_net(params
):
435 """Attaches the network connection of drbd disks.
437 Note that this is only valid for drbd disks, so the members of the
438 disk list must all be drbd devices.
441 disks
, instance_name
, multimaster
= params
442 disks
= [objects
.Disk
.FromDict(disk
) for disk
in disks
]
443 return backend
.DrbdAttachNet(disks
, instance_name
, multimaster
)
446 def perspective_drbd_wait_sync(params
):
447 """Wait until DRBD disks are synched.
449 Note that this is only valid for drbd disks, so the members of the
450 disk list must all be drbd devices.
454 disks
= [objects
.Disk
.FromDict(disk
) for disk
in disks
]
455 return backend
.DrbdWaitSync(disks
)
458 def perspective_drbd_needs_activation(params
):
459 """Checks if the drbd devices need activation
461 Note that this is only valid for drbd disks, so the members of the
462 disk list must all be drbd devices.
466 disks
= [objects
.Disk
.FromDict(disk
) for disk
in disks
]
467 return backend
.DrbdNeedsActivation(disks
)
470 def perspective_drbd_helper(_
):
471 """Query drbd helper.
474 return backend
.GetDrbdUsermodeHelper()
476 # export/import --------------------------
479 def perspective_finalize_export(params
):
480 """Expose the finalize export functionality.
483 instance
= objects
.Instance
.FromDict(params
[0])
486 for disk
in params
[1]:
487 if isinstance(disk
, bool):
488 snap_disks
.append(disk
)
490 snap_disks
.append(objects
.Disk
.FromDict(disk
))
492 return backend
.FinalizeExport(instance
, snap_disks
)
495 def perspective_export_info(params
):
496 """Query information about an existing export on this node.
498 The given path may not contain an export, in which case we return
503 return backend
.ExportInfo(path
)
506 def perspective_export_list(params
):
507 """List the available exports on this node.
509 Note that as opposed to export_info, which may query data about an
510 export in any path, this only queries the standard Ganeti path
511 (pathutils.EXPORT_DIR).
514 return backend
.ListExports()
517 def perspective_export_remove(params
):
522 return backend
.RemoveExport(export
)
524 # block device ---------------------
526 def perspective_bdev_sizes(params
):
527 """Query the list of block devices
531 return backend
.GetBlockDevSizes(devices
)
533 # volume --------------------------
536 def perspective_lv_list(params
):
537 """Query the list of logical volumes in a given volume group.
541 return backend
.GetVolumeList(vgname
)
544 def perspective_vg_list(params
):
545 """Query the list of volume groups.
548 return backend
.ListVolumeGroups()
550 # Storage --------------------------
553 def perspective_storage_list(params
):
554 """Get list of storage units.
557 (su_name
, su_args
, name
, fields
) = params
558 return container
.GetStorage(su_name
, *su_args
).List(name
, fields
)
561 def perspective_storage_modify(params
):
562 """Modify a storage unit.
565 (su_name
, su_args
, name
, changes
) = params
566 return container
.GetStorage(su_name
, *su_args
).Modify(name
, changes
)
569 def perspective_storage_execute(params
):
570 """Execute an operation on a storage unit.
573 (su_name
, su_args
, name
, op
) = params
574 return container
.GetStorage(su_name
, *su_args
).Execute(name
, op
)
576 # bridge --------------------------
579 def perspective_bridges_exist(params
):
580 """Check if all bridges given exist on this node.
583 bridges_list
= params
[0]
584 return backend
.BridgesExist(bridges_list
)
586 # instance --------------------------
589 def perspective_instance_os_add(params
):
590 """Install an OS on a given instance.
594 inst
= objects
.Instance
.FromDict(inst_s
)
595 reinstall
= params
[1]
597 return backend
.InstanceOsAdd(inst
, reinstall
, debug
)
600 def perspective_instance_run_rename(params
):
601 """Runs the OS rename script for an instance.
604 inst_s
, old_name
, debug
= params
605 inst
= objects
.Instance
.FromDict(inst_s
)
606 return backend
.RunRenameInstance(inst
, old_name
, debug
)
609 def perspective_instance_shutdown(params
):
610 """Shutdown an instance.
613 instance
= objects
.Instance
.FromDict(params
[0])
616 _extendReasonTrail(trail
, "shutdown")
617 return backend
.InstanceShutdown(instance
, timeout
, trail
)
620 def perspective_instance_start(params
):
621 """Start an instance.
624 (instance_name
, startup_paused
, trail
) = params
625 instance
= objects
.Instance
.FromDict(instance_name
)
626 _extendReasonTrail(trail
, "start")
627 return backend
.StartInstance(instance
, startup_paused
, trail
)
630 def perspective_hotplug_device(params
):
631 """Hotplugs device to a running instance.
634 (idict
, action
, dev_type
, ddict
, extra
, seq
) = params
635 instance
= objects
.Instance
.FromDict(idict
)
636 if dev_type
== constants
.HOTPLUG_TARGET_DISK
:
637 device
= objects
.Disk
.FromDict(ddict
)
638 elif dev_type
== constants
.HOTPLUG_TARGET_NIC
:
639 device
= objects
.NIC
.FromDict(ddict
)
641 assert dev_type
in constants
.HOTPLUG_ALL_TARGETS
642 return backend
.HotplugDevice(instance
, action
, dev_type
, device
, extra
, seq
)
645 def perspective_hotplug_supported(params
):
646 """Checks if hotplug is supported.
649 instance
= objects
.Instance
.FromDict(params
[0])
650 return backend
.HotplugSupported(instance
)
653 def perspective_instance_metadata_modify(params
):
654 """Modify instance metadata.
658 return backend
.ModifyInstanceMetadata(instance
)
661 def perspective_migration_info(params
):
662 """Gather information about an instance to be migrated.
665 instance
= objects
.Instance
.FromDict(params
[0])
666 return backend
.MigrationInfo(instance
)
669 def perspective_accept_instance(params
):
670 """Prepare the node to accept an instance.
673 instance
, info
, target
= params
674 instance
= objects
.Instance
.FromDict(instance
)
675 return backend
.AcceptInstance(instance
, info
, target
)
678 def perspective_instance_finalize_migration_dst(params
):
679 """Finalize the instance migration on the destination node.
682 instance
, info
, success
= params
683 instance
= objects
.Instance
.FromDict(instance
)
684 return backend
.FinalizeMigrationDst(instance
, info
, success
)
687 def perspective_instance_migrate(params
):
688 """Migrates an instance.
691 cluster_name
, instance
, target
, live
= params
692 instance
= objects
.Instance
.FromDict(instance
)
693 return backend
.MigrateInstance(cluster_name
, instance
, target
, live
)
696 def perspective_instance_finalize_migration_src(params
):
697 """Finalize the instance migration on the source node.
700 instance
, success
, live
= params
701 instance
= objects
.Instance
.FromDict(instance
)
702 return backend
.FinalizeMigrationSource(instance
, success
, live
)
705 def perspective_instance_get_migration_status(params
):
706 """Reports migration status.
709 instance
= objects
.Instance
.FromDict(params
[0])
710 return backend
.GetMigrationStatus(instance
).ToDict()
713 def perspective_instance_reboot(params
):
714 """Reboot an instance.
717 instance
= objects
.Instance
.FromDict(params
[0])
718 reboot_type
= params
[1]
719 shutdown_timeout
= params
[2]
721 _extendReasonTrail(trail
, "reboot")
722 return backend
.InstanceReboot(instance
, reboot_type
, shutdown_timeout
,
726 def perspective_instance_balloon_memory(params
):
727 """Modify instance runtime memory.
730 instance_dict
, memory
= params
731 instance
= objects
.Instance
.FromDict(instance_dict
)
732 return backend
.InstanceBalloonMemory(instance
, memory
)
735 def perspective_instance_info(params
):
736 """Query instance information.
739 (instance_name
, hypervisor_name
, hvparams
) = params
740 return backend
.GetInstanceInfo(instance_name
, hypervisor_name
, hvparams
)
743 def perspective_instance_migratable(params
):
744 """Query whether the specified instance can be migrated.
747 instance
= objects
.Instance
.FromDict(params
[0])
748 return backend
.GetInstanceMigratable(instance
)
751 def perspective_all_instances_info(params
):
752 """Query information about all instances.
755 (hypervisor_list
, all_hvparams
) = params
756 return backend
.GetAllInstancesInfo(hypervisor_list
, all_hvparams
)
759 def perspective_instance_console_info(params
):
760 """Query information on how to get console access to instances
763 return backend
.GetInstanceConsoleInfo(params
)
766 def perspective_instance_list(params
):
767 """Query the list of running instances.
770 (hypervisor_list
, hvparams
) = params
771 return backend
.GetInstanceList(hypervisor_list
, hvparams
)
773 # node --------------------------
776 def perspective_node_has_ip_address(params
):
777 """Checks if a node has the given ip address.
780 return netutils
.IPAddress
.Own(params
[0])
783 def perspective_node_info(params
):
784 """Query node information.
787 (storage_units
, hv_specs
) = params
788 return backend
.GetNodeInfo(storage_units
, hv_specs
)
791 def perspective_etc_hosts_modify(params
):
792 """Modify a node entry in /etc/hosts.
795 backend
.EtcHostsModify(params
[0], params
[1], params
[2])
800 def perspective_node_verify(params
):
801 """Run a verify sequence on this node.
804 (what
, cluster_name
, hvparams
, node_groups
, groups_cfg
) = params
805 return backend
.VerifyNode(what
, cluster_name
, hvparams
,
806 node_groups
, groups_cfg
)
809 def perspective_node_verify_light(cls
, params
):
810 """Run a light verify sequence on this node.
812 This call is meant to perform a less strict verification of the node in
813 certain situations. Right now, it is invoked only when a node is just about
814 to be added to a cluster, and even then, it performs the same checks as
815 L{perspective_node_verify}.
817 return cls
.perspective_node_verify(params
)
820 def perspective_node_start_master_daemons(params
):
821 """Start the master daemons on this node.
824 return backend
.StartMasterDaemons(params
[0])
827 def perspective_node_activate_master_ip(params
):
828 """Activate the master IP on this node.
831 master_params
= objects
.MasterNetworkParameters
.FromDict(params
[0])
832 return backend
.ActivateMasterIp(master_params
, params
[1])
835 def perspective_node_deactivate_master_ip(params
):
836 """Deactivate the master IP on this node.
839 master_params
= objects
.MasterNetworkParameters
.FromDict(params
[0])
840 return backend
.DeactivateMasterIp(master_params
, params
[1])
843 def perspective_node_stop_master(params
):
844 """Stops master daemons on this node.
847 return backend
.StopMasterDaemons()
850 def perspective_node_change_master_netmask(params
):
851 """Change the master IP netmask.
854 return backend
.ChangeMasterNetmask(params
[0], params
[1], params
[2],
858 def perspective_node_leave_cluster(params
):
859 """Cleanup after leaving a cluster.
862 return backend
.LeaveCluster(params
[0])
865 def perspective_node_volumes(params
):
866 """Query the list of all logical volume groups.
869 return backend
.NodeVolumes()
872 def perspective_node_demote_from_mc(params
):
873 """Demote a node from the master candidate role.
876 return backend
.DemoteFromMC()
879 def perspective_node_powercycle(params
):
880 """Tries to powercycle the node.
883 (hypervisor_type
, hvparams
) = params
884 return backend
.PowercycleNode(hypervisor_type
, hvparams
)
887 def perspective_node_configure_ovs(params
):
888 """Sets up OpenvSwitch on the node.
891 (ovs_name
, ovs_link
) = params
892 return backend
.ConfigureOVS(ovs_name
, ovs_link
)
895 def perspective_node_crypto_tokens(params
):
896 """Gets the node's public crypto tokens.
899 token_requests
= params
[0]
900 return backend
.GetCryptoTokens(token_requests
)
903 def perspective_node_ensure_daemon(params
):
904 """Ensure daemon is running.
907 (daemon_name
, run
) = params
908 return backend
.EnsureDaemon(daemon_name
, run
)
910 # cluster --------------------------
913 def perspective_version(params
):
914 """Query version information.
917 return constants
.PROTOCOL_VERSION
920 def perspective_upload_file(params
):
923 Note that the backend implementation imposes strict rules on which
927 return backend
.UploadFile(*(params
[0]))
930 def perspective_upload_file_single(params
):
933 Note that the backend implementation imposes strict rules on which
937 return backend
.UploadFile(*params
)
940 def perspective_master_node_name(params
):
941 """Returns the master node name.
944 return backend
.GetMasterNodeName()
947 def perspective_run_oob(params
):
951 output
= backend
.RunOob(params
[0], params
[1], params
[2], params
[3])
953 result
= serializer
.LoadJson(output
)
959 def perspective_restricted_command(params
):
960 """Runs a restricted command.
965 return backend
.RunRestrictedCmd(cmd
)
968 def perspective_write_ssconf_files(params
):
969 """Write ssconf files.
973 return ssconf
.WriteSsconfFiles(values
)
976 def perspective_get_watcher_pause(params
):
977 """Get watcher pause end.
980 return utils
.ReadWatcherPauseFile(pathutils
.WATCHER_PAUSEFILE
)
983 def perspective_set_watcher_pause(params
):
984 """Set watcher pause.
988 return backend
.SetWatcherPause(until
)
991 def perspective_get_file_info(params
):
992 """Get info on whether a file exists and its properties.
996 return backend
.GetFileInfo(path
)
998 # os -----------------------
1001 def perspective_os_diagnose(params
):
1002 """Query detailed information about existing OSes.
1005 return backend
.DiagnoseOS()
1008 def perspective_os_validate(params
):
1009 """Run a given OS' validation routine.
1012 required
, name
, checks
, params
, force_variant
= params
1013 return backend
.ValidateOS(required
, name
, checks
, params
, force_variant
)
1016 def perspective_os_export(params
):
1017 """Export an OS definition into an instance specific package.
1020 instance
= objects
.Instance
.FromDict(params
[0])
1021 override_env
= params
[1]
1022 return backend
.ExportOS(instance
, override_env
)
1024 # extstorage -----------------------
1027 def perspective_extstorage_diagnose(params
):
1028 """Query detailed information about existing extstorage providers.
1031 return backend
.DiagnoseExtStorage()
1033 # hooks -----------------------
1036 def perspective_hooks_runner(params
):
1037 """Run hook scripts.
1040 hpath
, phase
, env
= params
1041 hr
= backend
.HooksRunner()
1042 return hr
.RunHooks(hpath
, phase
, env
)
1044 # iallocator -----------------
1047 def perspective_iallocator_runner(params
):
1048 """Run an iallocator script.
1051 name
, idata
, ial_params_dict
= params
1053 for ial_param
in ial_params_dict
.items():
1054 ial_params
.append("--" + ial_param
[0] + "=" + ial_param
[1])
1055 iar
= backend
.IAllocatorRunner()
1056 return iar
.Run(name
, idata
, ial_params
)
1058 # test -----------------------
1061 def perspective_test_delay(params
):
1065 duration
= params
[0]
1066 status
, rval
= utils
.TestDelay(duration
)
1068 raise backend
.RPCFail(rval
)
1071 # file storage ---------------
1074 def perspective_file_storage_dir_create(params
):
1075 """Create the file storage directory.
1078 file_storage_dir
= params
[0]
1079 return backend
.CreateFileStorageDir(file_storage_dir
)
1082 def perspective_file_storage_dir_remove(params
):
1083 """Remove the file storage directory.
1086 file_storage_dir
= params
[0]
1087 return backend
.RemoveFileStorageDir(file_storage_dir
)
1090 def perspective_file_storage_dir_rename(params
):
1091 """Rename the file storage directory.
1094 old_file_storage_dir
= params
[0]
1095 new_file_storage_dir
= params
[1]
1096 return backend
.RenameFileStorageDir(old_file_storage_dir
,
1097 new_file_storage_dir
)
1099 # jobs ------------------------
1102 @_RequireJobQueueLock
1103 def perspective_jobqueue_update(params
):
1104 """Update job queue.
1107 (file_name
, content
) = params
1108 return backend
.JobQueueUpdate(file_name
, content
)
1111 @_RequireJobQueueLock
1112 def perspective_jobqueue_purge(params
):
1116 return backend
.JobQueuePurge()
1119 @_RequireJobQueueLock
1120 def perspective_jobqueue_rename(params
):
1121 """Rename a job queue file.
1124 # TODO: What if a file fails to rename?
1125 return [backend
.JobQueueRename(old
, new
) for old
, new
in params
[0]]
1128 @_RequireJobQueueLock
1129 def perspective_jobqueue_set_drain_flag(params
):
1130 """Set job queue's drain flag.
1135 return jstore
.SetDrainFlag(flag
)
1137 # hypervisor ---------------
1140 def perspective_hypervisor_validate_params(params
):
1141 """Validate the hypervisor parameters.
1144 (hvname
, hvparams
) = params
1145 return backend
.ValidateHVParams(hvname
, hvparams
)
1150 def perspective_x509_cert_create(params
):
1151 """Creates a new X509 certificate for SSL/TLS.
1154 (validity
, ) = params
1155 return backend
.CreateX509Certificate(validity
)
1158 def perspective_x509_cert_remove(params
):
1159 """Removes a X509 certificate.
1163 return backend
.RemoveX509Certificate(name
)
1168 def perspective_import_start(params
):
1169 """Starts an import daemon.
1172 (opts_s
, instance
, component
, (dest
, dest_args
)) = params
1174 opts
= objects
.ImportExportOptions
.FromDict(opts_s
)
1176 return backend
.StartImportExportDaemon(constants
.IEM_IMPORT
, opts
,
1178 objects
.Instance
.FromDict(instance
),
1180 _DecodeImportExportIO(dest
,
1184 def perspective_export_start(params
):
1185 """Starts an export daemon.
1188 (opts_s
, host
, port
, instance
, component
, (source
, source_args
)) = params
1190 opts
= objects
.ImportExportOptions
.FromDict(opts_s
)
1192 return backend
.StartImportExportDaemon(constants
.IEM_EXPORT
, opts
,
1194 objects
.Instance
.FromDict(instance
),
1196 _DecodeImportExportIO(source
,
1200 def perspective_impexp_status(params
):
1201 """Retrieves the status of an import or export daemon.
1204 return backend
.GetImportExportStatus(params
[0])
1207 def perspective_impexp_abort(params
):
1208 """Aborts an import or export.
1211 return backend
.AbortImportExport(params
[0])
1214 def perspective_impexp_cleanup(params
):
1215 """Cleans up after an import or export.
1218 return backend
.CleanupImportExport(params
[0])
1221 def CheckNoded(_
, args
):
1222 """Initial checks whether to run or exit with a failure.
1225 if args
: # noded doesn't take any arguments
1226 print >> sys
.stderr
, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1228 sys
.exit(constants
.EXIT_FAILURE
)
1230 codecs
.lookup("string-escape")
1232 print >> sys
.stderr
, ("Can't load the string-escape code which is part"
1233 " of the Python installation. Is your installation"
1234 " complete/correct? Aborting.")
1235 sys
.exit(constants
.EXIT_FAILURE
)
1238 def SSLVerifyPeer(conn
, cert
, errnum
, errdepth
, ok
):
1239 """Callback function to verify a peer against the candidate cert map.
1241 Note that we have a chicken-and-egg problem during cluster init and upgrade.
1242 This method checks whether the incoming connection comes from a master
1243 candidate by comparing it to the master certificate map in the cluster
1244 configuration. However, during cluster init and cluster upgrade there
1245 are various RPC calls done to the master node itself, before the candidate
1246 certificate list is established and the cluster configuration is written.
1247 In this case, we cannot check against the master candidate map.
1249 This problem is solved by checking whether the candidate map is empty. An
1250 initialized 2.11 or higher cluster has at least one entry for the master
1251 node in the candidate map. If the map is empty, we know that we are still
1252 in the bootstrap/upgrade phase. In this case, we read the server certificate
1253 digest and compare it to the incoming request.
1255 This means that after an upgrade of Ganeti, the system continues to operate
1256 like before, using server certificates only. After the client certificates
1257 are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1258 RPC communication is switched to using client certificates and the trick of
1259 using server certificates does not work anymore.
1261 @type conn: C{OpenSSL.SSL.Connection}
1262 @param conn: the OpenSSL connection object
1263 @type cert: C{OpenSSL.X509}
1264 @param cert: the peer's SSL certificate
1267 # some parameters are unused, but this is the API
1268 # pylint: disable=W0613
1269 _BOOTSTRAP
= "bootstrap"
1270 sstore
= ssconf
.SimpleStore()
1272 candidate_certs
= sstore
.GetMasterCandidatesCertMap()
1273 except errors
.ConfigurationError
:
1274 logging
.info("No candidate certificates found. Switching to "
1275 "bootstrap/update mode.")
1276 candidate_certs
= None
1277 if not candidate_certs
:
1279 _BOOTSTRAP
: utils
.GetCertificateDigest(
1280 cert_filename
=pathutils
.NODED_CERT_FILE
)}
1281 return cert
.digest("sha1") in candidate_certs
.values()
1282 # pylint: enable=W0613
1285 def PrepNoded(options
, _
):
1286 """Preparation node daemon function, executed with the PID file held.
1290 request_executor_class
= MlockallRequestExecutor
1293 except errors
.NoCtypesError
:
1294 logging
.warning("Cannot set memory lock, ctypes module not found")
1295 request_executor_class
= http
.server
.HttpServerRequestExecutor
1297 request_executor_class
= http
.server
.HttpServerRequestExecutor
1299 # Read SSL certificate
1301 ssl_params
= http
.HttpSslParams(ssl_key_path
=options
.ssl_key
,
1302 ssl_cert_path
=options
.ssl_cert
)
1306 err
= _PrepareQueueLock()
1308 # this might be some kind of file-system/permission error; while
1309 # this breaks the job queue functionality, we shouldn't prevent
1310 # startup of the whole node daemon because of this
1311 logging
.critical("Can't init/verify the queue, proceeding anyway: %s", err
)
1313 handler
= NodeRequestHandler()
1315 mainloop
= daemon
.Mainloop()
1317 http
.server
.HttpServer(mainloop
, options
.bind_address
, options
.port
,
1318 handler
, ssl_params
=ssl_params
, ssl_verify_peer
=True,
1319 request_executor_class
=request_executor_class
,
1320 ssl_verify_callback
=SSLVerifyPeer
)
1323 return (mainloop
, server
)
1326 def ExecNoded(options
, args
, prep_data
): # pylint: disable=W0613
1327 """Main node daemon function, executed with the PID file held.
1330 (mainloop
, server
) = prep_data
1338 """Main function for the node daemon.
1341 parser
= OptionParser(description
="Ganeti node daemon",
1342 usage
=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1344 version
="%%prog (ganeti) %s" %
1345 constants
.RELEASE_VERSION
)
1346 parser
.add_option("--no-mlock", dest
="mlock",
1347 help="Do not mlock the node memory in ram",
1348 default
=True, action
="store_false")
1350 daemon
.GenericMain(constants
.NODED
, parser
, CheckNoded
, PrepNoded
, ExecNoded
,
1351 default_ssl_cert
=pathutils
.NODED_CERT_FILE
,
1352 default_ssl_key
=pathutils
.NODED_CERT_FILE
,
1353 console_logging
=True,