Merge branch 'stable-2.15' into stable-2.16
authorKlaus Aehlig <aehlig@google.com>
Mon, 1 Feb 2016 11:59:37 +0000 (12:59 +0100)
committerKlaus Aehlig <aehlig@google.com>
Mon, 1 Feb 2016 12:28:13 +0000 (13:28 +0100)
* stable-2.15
  Do not add a new Inotify watchers on timer
  Mock InitDrbdHelper's output in unittests
  Optimise codegen for Python OpCode classes

Signed-off-by: Klaus Aehlig <aehlig@google.com>
Reviewed-by: Hrvoje Ribicic <riba@google.com>

164 files changed:
Makefile.am
NEWS
README
autotools/build-bash-completion
cabal/ganeti.template.cabal
configure.ac
daemons/daemon-util.in
devel/build_chroot
doc/conf.py
doc/design-2.16.rst [copied from doc/design-2.15.rst with 68% similarity]
doc/design-draft.rst
doc/design-location.rst
doc/design-plain-redundancy.rst [new file with mode: 0644]
doc/design-repaird.rst [new file with mode: 0644]
doc/design-scsi-kvm.rst [new file with mode: 0644]
doc/design-shared-storage-redundancy.rst
doc/examples/systemd/ganeti-metad.service.in
doc/hooks.rst
doc/iallocator.rst
doc/index.rst
doc/security.rst
doc/users/groupmemberships.in
doc/users/groups.in
doc/users/users.in
doc/virtual-cluster.rst
lib/backend.py
lib/bootstrap.py
lib/cli_opts.py
lib/client/gnt_cluster.py
lib/client/gnt_debug.py
lib/client/gnt_instance.py
lib/client/gnt_node.py
lib/cmdlib/__init__.py
lib/cmdlib/base.py
lib/cmdlib/cluster/__init__.py
lib/cmdlib/cluster/verify.py
lib/cmdlib/common.py
lib/cmdlib/instance_create.py
lib/cmdlib/instance_set_params.py
lib/cmdlib/node.py
lib/cmdlib/test.py
lib/config/__init__.py
lib/daemon.py
lib/ht.py
lib/hypervisor/hv_kvm/__init__.py
lib/hypervisor/hv_kvm/monitor.py
lib/jqueue/__init__.py
lib/jqueue/exec.py
lib/masterd/iallocator.py
lib/mcpu.py
lib/objects.py
lib/rapi/client.py
lib/rpc/transport.py
lib/rpc_defs.py
lib/server/masterd.py
lib/server/noded.py
lib/ssconf.py
lib/ssh.py
lib/tools/cfgupgrade.py
lib/tools/common.py
lib/tools/prepare_node_join.py
lib/tools/ssh_update.py
lib/utils/__init__.py
lib/utils/tags.py [copied from test/py/cmdlib/testsupport/pathutils_mock.py with 68% similarity]
lib/wconfd.py
man/gnt-cluster.rst
man/gnt-debug.rst
man/gnt-instance.rst
man/gnt-node.rst
man/hail.rst
man/harep.rst
man/hbal.rst
man/hspace.rst
qa/ganeti-qa.py
qa/qa_cluster.py
qa/qa_instance.py
qa/qa_utils.py
src/Ganeti/ConstantUtils.hs
src/Ganeti/Constants.hs
src/Ganeti/HTools/AlgorithmParams.hs
src/Ganeti/HTools/Backend/IAlloc.hs
src/Ganeti/HTools/CLI.hs
src/Ganeti/HTools/Cluster.hs
src/Ganeti/HTools/Cluster/AllocatePrimitives.hs [new file with mode: 0644]
src/Ganeti/HTools/Cluster/AllocateSecondary.hs [new file with mode: 0644]
src/Ganeti/HTools/Cluster/AllocationSolution.hs [new file with mode: 0644]
src/Ganeti/HTools/Cluster/Metrics.hs
src/Ganeti/HTools/Cluster/Moves.hs
src/Ganeti/HTools/Dedicated.hs
src/Ganeti/HTools/GlobalN1.hs
src/Ganeti/HTools/Instance.hs
src/Ganeti/HTools/Loader.hs
src/Ganeti/HTools/Node.hs
src/Ganeti/HTools/Program/Hail.hs
src/Ganeti/HTools/Program/Harep.hs
src/Ganeti/HTools/Program/Hspace.hs
src/Ganeti/HTools/Program/Hsqueeze.hs
src/Ganeti/HTools/Tags.hs
src/Ganeti/HTools/Tags/Constants.hs [copied from src/Ganeti/HTools/Tags.hs with 53% similarity]
src/Ganeti/Hs2Py/OpDoc.hs
src/Ganeti/JQScheduler.hs
src/Ganeti/JQueue.hs
src/Ganeti/Jobs.hs
src/Ganeti/Objects.hs
src/Ganeti/Objects/Disk.hs
src/Ganeti/Objects/Lens.hs
src/Ganeti/OpCodes.hs
src/Ganeti/OpParams.hs
src/Ganeti/Query/Exec.hs
src/Ganeti/Query/Server.hs
src/Ganeti/Rpc.hs
src/Ganeti/Ssconf.hs
src/Ganeti/THH.hs
src/Ganeti/Types.hs
src/Ganeti/UDSServer.hs
src/Ganeti/Utils/Statistics.hs
src/Ganeti/WConfd/ConfigModifications.hs
src/Ganeti/WConfd/Monad.hs
test/data/cluster_config_2.15.json [copied from test/data/cluster_config_2.14.json with 99% similarity]
test/data/htools/hail-alloc-desired-location.json [copied from test/data/htools/hail-alloc-nlocation.json with 77% similarity]
test/data/htools/hail-alloc-drbd-restricted.json [new file with mode: 0644]
test/data/htools/hail-alloc-ext.json [new file with mode: 0644]
test/data/htools/hail-alloc-secondary.json [new file with mode: 0644]
test/data/htools/hbal-desiredlocation-1.data [new file with mode: 0644]
test/data/htools/hbal-desiredlocation-2.data [new file with mode: 0644]
test/data/htools/hbal-desiredlocation-3.data [new file with mode: 0644]
test/data/htools/hbal-desiredlocation-4.data [new file with mode: 0644]
test/data/htools/hbal-location-exclusion.data [new file with mode: 0644]
test/data/htools/hbal-soft-errors2.data [new file with mode: 0644]
test/data/htools/hspace-bad-group.data [new file with mode: 0644]
test/data/htools/hspace-existing.data [copied from test/data/htools/empty-cluster.data with 50% similarity]
test/data/htools/partly-used.data [new file with mode: 0644]
test/data/htools/plain-n1-restriction.data [new file with mode: 0644]
test/data/htools/shared-n1-restriction.data [copied from test/data/htools/shared-n1-failure.data with 85% similarity]
test/data/kvm_runtime.json
test/hs/Test/Ganeti/HTools/Backend/Text.hs
test/hs/Test/Ganeti/HTools/Cluster.hs
test/hs/Test/Ganeti/HTools/Instance.hs
test/hs/Test/Ganeti/Objects.hs
test/hs/Test/Ganeti/OpCodes.hs
test/hs/Test/Ganeti/Query/Network.hs
test/hs/Test/Ganeti/Utils/Statistics.hs
test/hs/Test/Ganeti/WConfd/Ssconf.hs [copied from test/hs/Test/Ganeti/Errors.hs with 66% similarity]
test/hs/htest.hs
test/hs/shelltests/htools-balancing.test
test/hs/shelltests/htools-hail.test
test/hs/shelltests/htools-hbal.test
test/hs/shelltests/htools-hspace.test
test/py/cfgupgrade_unittest.py
test/py/cmdlib/instance_unittest.py
test/py/cmdlib/testsupport/cmdlib_testcase.py
test/py/docs_unittest.py
test/py/ganeti.backend_unittest.py
test/py/ganeti.client.gnt_cluster_unittest.py
test/py/ganeti.config_unittest.py
test/py/ganeti.daemon_unittest.py
test/py/ganeti.hooks_unittest.py
test/py/ganeti.hypervisor.hv_kvm_unittest.py
test/py/ganeti.jqueue_unittest.py
test/py/ganeti.masterd.iallocator_unittest.py
test/py/ganeti.mcpu_unittest.py
test/py/ganeti.ssh_unittest.py
test/py/ganeti.tools.prepare_node_join_unittest.py
test/py/testutils/config_mock.py

index 8910b08..792527e 100644 (file)
@@ -144,6 +144,7 @@ HS_DIRS = \
        src/Ganeti/HTools/Backend \
        src/Ganeti/HTools/Cluster \
        src/Ganeti/HTools/Program \
+       src/Ganeti/HTools/Tags \
        src/Ganeti/Hypervisor \
        src/Ganeti/Hypervisor/Xen \
        src/Ganeti/JQScheduler \
@@ -630,6 +631,7 @@ utils_PYTHON = \
        lib/utils/security.py \
        lib/utils/storage.py \
        lib/utils/text.py \
+       lib/utils/tags.py \
        lib/utils/version.py \
        lib/utils/wrapper.py \
        lib/utils/x509.py \
@@ -657,6 +659,7 @@ docinput = \
        doc/design-2.13.rst \
        doc/design-2.14.rst \
        doc/design-2.15.rst \
+       doc/design-2.16.rst \
        doc/design-allocation-efficiency.rst \
        doc/design-autorepair.rst \
        doc/design-bulk-create.rst \
@@ -704,14 +707,17 @@ docinput = \
        doc/design-os.rst \
        doc/design-ovf-support.rst \
        doc/design-partitioned.rst \
+       doc/design-plain-redundancy.rst \
        doc/design-performance-tests.rst \
        doc/design-query-splitting.rst \
        doc/design-query2.rst \
        doc/design-query-splitting.rst \
        doc/design-reason-trail.rst \
+       doc/design-repaird.rst \
        doc/design-reservations.rst \
        doc/design-resource-model.rst \
        doc/design-restricted-commands.rst \
+       doc/design-scsi-kvm.rst \
        doc/design-shared-storage.rst \
        doc/design-shared-storage-redundancy.rst \
        doc/design-ssh-ports.rst \
@@ -908,6 +914,9 @@ HS_LIB_SRCS = \
        src/Ganeti/HTools/Backend/Text.hs \
        src/Ganeti/HTools/CLI.hs \
        src/Ganeti/HTools/Cluster.hs \
+       src/Ganeti/HTools/Cluster/AllocatePrimitives.hs \
+       src/Ganeti/HTools/Cluster/AllocateSecondary.hs \
+       src/Ganeti/HTools/Cluster/AllocationSolution.hs \
        src/Ganeti/HTools/Cluster/Evacuate.hs \
        src/Ganeti/HTools/Cluster/Metrics.hs \
        src/Ganeti/HTools/Cluster/Moves.hs \
@@ -934,6 +943,7 @@ HS_LIB_SRCS = \
        src/Ganeti/HTools/Program/Hroller.hs \
        src/Ganeti/HTools/Program/Main.hs \
        src/Ganeti/HTools/Tags.hs \
+       src/Ganeti/HTools/Tags/Constants.hs \
        src/Ganeti/HTools/Types.hs \
        src/Ganeti/Hypervisor/Xen.hs \
        src/Ganeti/Hypervisor/Xen/XmParser.hs \
@@ -1123,6 +1133,7 @@ HS_TEST_SRCS = \
        test/hs/Test/Ganeti/Utils.hs \
        test/hs/Test/Ganeti/Utils/MultiMap.hs \
        test/hs/Test/Ganeti/Utils/Statistics.hs \
+       test/hs/Test/Ganeti/WConfd/Ssconf.hs \
        test/hs/Test/Ganeti/WConfd/TempRes.hs
 
 
@@ -1687,12 +1698,16 @@ TEST_FILES = \
        test/data/htools/common-suffix.data \
        test/data/htools/empty-cluster.data \
        test/data/htools/hail-alloc-dedicated-1.json \
+       test/data/htools/hail-alloc-desired-location.json \
        test/data/htools/hail-alloc-drbd.json \
+       test/data/htools/hail-alloc-drbd-restricted.json \
+       test/data/htools/hail-alloc-ext.json \
        test/data/htools/hail-alloc-invalid-network.json \
        test/data/htools/hail-alloc-invalid-twodisks.json \
        test/data/htools/hail-alloc-restricted-network.json \
        test/data/htools/hail-alloc-nlocation.json \
        test/data/htools/hail-alloc-plain-tags.json \
+       test/data/htools/hail-alloc-secondary.json \
        test/data/htools/hail-alloc-spindles.json \
        test/data/htools/hail-alloc-twodisks.json \
        test/data/htools/hail-change-group.json \
@@ -1701,18 +1716,26 @@ TEST_FILES = \
        test/data/htools/hail-reloc-drbd.json \
        test/data/htools/hail-reloc-drbd-crowded.json \
        test/data/htools/hbal-cpu-speed.data \
+       test/data/htools/hbal-desiredlocation-1.data \
+       test/data/htools/hbal-desiredlocation-2.data \
+       test/data/htools/hbal-desiredlocation-3.data \
+       test/data/htools/hbal-desiredlocation-4.data \
        test/data/htools/hbal-dyn.data \
        test/data/htools/hbal-evac.data \
        test/data/htools/hbal-excl-tags.data \
        test/data/htools/hbal-forth.data \
        test/data/htools/hbal-location-1.data \
+       test/data/htools/hbal-location-exclusion.data \
        test/data/htools/hbal-location-2.data \
        test/data/htools/hbal-migration-1.data \
        test/data/htools/hbal-migration-2.data \
        test/data/htools/hbal-migration-3.data \
        test/data/htools/hail-multialloc-dedicated.json \
        test/data/htools/hbal-soft-errors.data \
+       test/data/htools/hbal-soft-errors2.data \
        test/data/htools/hbal-split-insts.data \
+       test/data/htools/hspace-bad-group.data \
+       test/data/htools/hspace-existing.data \
        test/data/htools/hspace-groups-one.data \
        test/data/htools/hspace-groups-two.data \
        test/data/htools/hspace-tiered-dualspec-exclusive.data \
@@ -1728,6 +1751,7 @@ TEST_FILES = \
        test/data/htools/multiple-master.data \
        test/data/htools/multiple-tags.data \
        test/data/htools/n1-failure.data \
+       test/data/htools/partly-used.data \
        test/data/htools/rapi/groups.json \
        test/data/htools/rapi/info.json \
        test/data/htools/rapi/instances.json \
@@ -1739,7 +1763,9 @@ TEST_FILES = \
        test/data/htools/hsqueeze-mixed-instances.data \
        test/data/htools/hsqueeze-overutilized.data \
        test/data/htools/hsqueeze-underutilized.data \
+       test/data/htools/plain-n1-restriction.data \
        test/data/htools/shared-n1-failure.data \
+       test/data/htools/shared-n1-restriction.data \
        test/data/htools/unique-reboot-order.data \
        test/data/mond-data.txt \
        test/hs/shelltests/htools-balancing.test \
@@ -1792,6 +1818,7 @@ TEST_FILES = \
        test/data/cluster_config_2.12.json \
        test/data/cluster_config_2.13.json \
        test/data/cluster_config_2.14.json \
+       test/data/cluster_config_2.15.json \
        test/data/instance-minor-pairing.txt \
        test/data/instance-disks.txt \
        test/data/ip-addr-show-dummy0.txt \
@@ -2364,6 +2391,8 @@ src/AutoConf.hs: Makefile src/AutoConf.hs.in $(PRINT_PY_CONSTANTS) \
            -DNODED_GROUP="$(NODED_GROUP)" \
            -DMOND_USER="$(MOND_USER)" \
            -DMOND_GROUP="$(MOND_GROUP)" \
+           -DMETAD_USER="$(METAD_USER)" \
+           -DMETAD_GROUP="$(METAD_GROUP)" \
            -DDISK_SEPARATOR="$(DISK_SEPARATOR)" \
            -DQEMUIMG_PATH="$(QEMUIMG_PATH)" \
            -DXEN_CMD="$(XEN_CMD)" \
@@ -2457,6 +2486,7 @@ $(REPLACE_VARS_SED): $(SHELL_ENV_INIT) Makefile stamp-directories
          echo 's#@''GNTLUXIDUSER@#$(LUXID_USER)#g'; \
          echo 's#@''GNTNODEDUSER@#$(NODED_USER)#g'; \
          echo 's#@''GNTMONDUSER@#$(MOND_USER)#g'; \
+         echo 's#@''GNTMETADUSER@#$(METAD_USER)#g'; \
          echo 's#@''GNTRAPIGROUP@#$(RAPI_GROUP)#g'; \
          echo 's#@''GNTADMINGROUP@#$(ADMIN_GROUP)#g'; \
          echo 's#@''GNTCONFDGROUP@#$(CONFD_GROUP)#g'; \
@@ -2465,6 +2495,7 @@ $(REPLACE_VARS_SED): $(SHELL_ENV_INIT) Makefile stamp-directories
          echo 's#@''GNTLUXIDGROUP@#$(LUXID_GROUP)#g'; \
          echo 's#@''GNTMASTERDGROUP@#$(MASTERD_GROUP)#g'; \
          echo 's#@''GNTMONDGROUP@#$(MOND_GROUP)#g'; \
+         echo 's#@''GNTMETADGROUP@#$(METAD_GROUP)#g'; \
          echo 's#@''GNTDAEMONSGROUP@#$(DAEMONS_GROUP)#g'; \
          echo 's#@''CUSTOM_ENABLE_MOND@#$(ENABLE_MOND)#g'; \
          echo 's#@''MODULES@#$(strip $(lint_python_code))#g'; \
@@ -2721,6 +2752,7 @@ hlint: $(HS_BUILT_SRCS) src/lint-hints.hs
          --ignore "Use first" \
          --ignore "Use &&&" \
          --ignore "Use &&" \
+         --ignore "Use ||" \
          --ignore "Use void" \
          --ignore "Reduce duplication" \
          --ignore "Use import/export shortcut" \
diff --git a/NEWS b/NEWS
index f212ca2..92e97cc 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,143 @@ News
 ====
 
 
+Version 2.16.0 beta2
+--------------------
+
+*(Released Thu, 28 Jan 2016)*
+
+Incompatible/important changes
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+- The IAllocator protocol has been extended by a new ``allocate-secondary``
+  request type. Currently, this new request type is only used when in disk
+  conversion to DRBD no secondary node is specified. As long as this new
+  feature is not used, a third-party IAllocator not aware of this extension can
+  be continued to be used.
+- ``htools`` now also take into account N+1 redundancy for plain and shared
+  storage. To obtain the old behavior, add the ``--no-capacity-checks`` option.
+- ``hail`` now tries to keep the overall cluster balanced; in particular it
+  now prefers more empty groups over groups that are internally more balanced.
+- The option ``--no-node-setup`` of ``gnt-node add`` is disabled.
+  Instead, the cluster configuration parameter ``modify_ssh_setup`` is
+  used to determine whether or not to manipulate the SSH setup of a new
+  node.
+- Timeouts for communication with luxid have been increased. As a consequence,
+  Ganeti tools communicating (directly or indirectly) with luxid also time out
+  later. Please increase all timeouts for higher level tools interacting with
+  Ganeti accordingly.
+
+New features
+~~~~~~~~~~~~
+
+- ``hbal`` can now be made aware of common causes of failures (for
+  nodes). Look at ``hbal`` man page's LOCATION TAGS section for more details.
+- ``hbal`` can now be made aware of desired location for instances. Look
+  at ``hbal`` man page's DESIRED LOCATION TAGS section for more details.
+- Secret parameters are now readacted in job files
+
+New dependencies
+~~~~~~~~~~~~~~~~
+
+- Using the metadata daemon now requires the presence of the 'setcap' utility.
+  On Debian-based systems, it is available as a part of the 'libcap2-bin'
+  package.
+
+Changes since beta1
+~~~~~~~~~~~~~~~~~~~
+- Set block buffering for UDSServer
+- Fix failover in case the source node is offline
+- Add a parameter to ignore groups in capacity checks
+- Make hspace correctly handle --independent-groups
+- Accept BoringSSL as a known good ssl library
+- Make CommitTemporaryIPs call out to WConfD
+- Fix requested instance desired location tags in IAllocator
+- monitor: Use hvinfo in QMP methods
+- KVM: Work around QEMU commit 48f364dd
+- KVM: Introduce scsi_controller_type and kvm_pci_reservations hvparams
+- Improvements in SSH key handling
+- Do not generate the ganeti_pub_keys file with --no-ssh-init
+- Support force option for deactivate disks on RAPI
+- Add a --dry-run option to htools
+- Extended logging to improve traceability
+- Many documentation improvements and cleanups
+- Performance optimizations on larger clusters
+- Various QA and testing improvements
+
+Fixes inherited from 2.15 branch:
+
+- Metad: ignore instances that have no communication NIC
+- For queries, take the correct base address of an IP block
+- Fix computation in network blocks
+- Use bulk-adding of keys in renew-crypto
+- Introduce bulk-adding of SSH keys
+- Handle SSH key distribution on auto promotion
+- Do not remove authorized key of node itself
+- Support force option for deactivate disks on RAPI
+- renew-crypto: use bulk-removal of SSH keys
+- Bulk-removal of SSH keys
+- Catch IOError of SSH files when removing node
+- Fix renew-crypto on one-node-cluster
+- Increase timeout of RPC adding/removing keys
+- After TestNodeModify, fix the pool of master candidates
+
+Fixes inherited from 2.14 branch:
+
+- bdev: Allow userspace-only disk templates
+- Export disk's userspace URI to OS scripts
+- Fix instance failover in case of DTS_EXT_MIRROR
+- Set node tags in iallocator htools backend
+- Fix faulty iallocator type check
+- Allow disk attachment to diskless instances
+- Allow disk attachment with external storage
+
+Fixes inherited from 2.13 branch:
+
+- Improve xl socat migrations
+- Renew-crypto: stop daemons on master node first
+- Extend timeout for gnt-cluster renew-crypto
+
+Fixes inherited from 2.12 branch:
+
+- Accept timeout errors when luxi down
+- Fix disabling of user shutdown reporting
+- gnt-node add: password auth is only one method
+- Fix inconsistency in python and haskell objects
+- Increase default disk size of burnin to 1G
+- Only search for Python-2 interpreters
+- Handle Xen 4.3 states better
+- Return the correct error code in the post-upgrade script
+- Make openssl refrain from DH altogether
+- Fix upgrades of instances with missing creation time
+- Check for healthy majority on master failover with voting
+- Pass arguments to correct daemons during master-failover
+
+Fixes inherited from 2.11 branch:
+
+- At IAlloc backend guess state from admin state
+- Fix default for --default-iallocator-params
+
+Fixes inherited from 2.10 branch:
+
+- Make htools tolerate missing "dtotal" and "dfree" on luxi
+- KVM: explicitly configure routed NICs late
+
+Fixes inherited from the 2.9 branch:
+
+- Security patch for CVE-2015-7944 RAPI Vulnerable to DoS via SSL renegotiation
+- Security patch for CVE-2015-7945 Leak DRBD secret via RAPI
+- replace-disks: fix --ignore-ipolicy
+
+
+Version 2.16.0 beta1
+--------------------
+
+*(Released Tue, 28 Jul 2015)*
+
+This was the first beta release of the 2.16 series. All important changes
+are listed in the latest 2.16 entry.
+
+
 Version 2.15.2
 --------------
 
diff --git a/README b/README
index 1c78531..4327d89 100644 (file)
--- a/README
+++ b/README
@@ -1,4 +1,4 @@
-Ganeti 2.15
+Ganeti 2.16
 ===========
 
 For installation instructions, read the INSTALL and the doc/install.rst
index 3e35ee1..6a0f79f 100755 (executable)
@@ -720,6 +720,9 @@ def HaskellOptToOptParse(opts, kind):
   elif kind == "onenode":
     return cli.cli_option(*opts, type="string",
                           completion_suggest=cli.OPT_COMPL_ONE_NODE)
+  elif kind == "manynodes":
+    # FIXME: no support for many nodes
+    return cli.cli_option(*opts, type="string")
   elif kind == "manyinstances":
     # FIXME: no support for many instances
     return cli.cli_option(*opts, type="string")
index 3540949..0e6dd6a 100644 (file)
@@ -1,5 +1,5 @@
 name:                ganeti
-version:             2.15
+version:             2.16
 homepage:            http://www.ganeti.org
 license:             BSD2
 license-file:        COPYING
index 9b5d06f..9befd2f 100644 (file)
@@ -1,8 +1,8 @@
 # Configure script for Ganeti
 m4_define([gnt_version_major], [2])
-m4_define([gnt_version_minor], [15])
-m4_define([gnt_version_revision], [2])
-m4_define([gnt_version_suffix], [])
+m4_define([gnt_version_minor], [16])
+m4_define([gnt_version_revision], [0])
+m4_define([gnt_version_suffix], [~beta2])
 m4_define([gnt_version_full],
           m4_format([%d.%d.%d%s],
                     gnt_version_major, gnt_version_minor,
@@ -337,7 +337,7 @@ AC_ARG_WITH([user-prefix],
     [ to change the default)]
   )],
   [user_masterd="${withval}masterd";
-   user_metad="$user_default";
+   user_metad="${withval}metad";
    user_rapi="${withval}rapi";
    user_confd="${withval}confd";
    user_wconfd="${withval}masterd";
@@ -363,6 +363,7 @@ AC_SUBST(KVMD_USER, $user_kvmd)
 AC_SUBST(LUXID_USER, $user_luxid)
 AC_SUBST(NODED_USER, $user_noded)
 AC_SUBST(MOND_USER, $user_mond)
+AC_SUBST(METAD_USER, $user_metad)
 
 # --with-group-prefix=...
 AC_ARG_WITH([group-prefix],
@@ -378,7 +379,7 @@ AC_ARG_WITH([group-prefix],
    group_kvmd="$group_default";
    group_luxid="${withval}luxid";
    group_masterd="${withval}masterd";
-   group_metad="$group_default";
+   group_metad="${withval}metad";
    group_noded="$group_default";
    group_daemons="${withval}daemons";
    group_mond="$group_default"],
@@ -404,6 +405,7 @@ AC_SUBST(METAD_GROUP, $group_metad)
 AC_SUBST(NODED_GROUP, $group_noded)
 AC_SUBST(DAEMONS_GROUP, $group_daemons)
 AC_SUBST(MOND_GROUP, $group_mond)
+AC_SUBST(METAD_GROUP, $group_metad)
 
 # Print the config to the user
 AC_MSG_NOTICE([Running ganeti-masterd as $group_masterd:$group_masterd])
index 7636fc9..6af85c2 100644 (file)
@@ -108,12 +108,27 @@ _daemon_usergroup() {
     mond)
       echo "@GNTMONDUSER@:@GNTMONDGROUP@"
       ;;
+    metad)
+      echo "@GNTMETADUSER@:@GNTMETADGROUP@"
+      ;;
     *)
       echo "root:@GNTDAEMONSGROUP@"
       ;;
   esac
 }
 
+# Specifies the additional capabilities needed by individual daemons
+_daemon_caps() {
+  case "$1" in
+    metad)
+      echo "cap_net_bind_service=+ep"
+      ;;
+    *)
+      echo ""
+      ;;
+  esac
+}
+
 # Checks whether the local machine is part of a cluster
 check_config() {
   local server_pem=$DATA_DIR/server.pem
@@ -280,6 +295,17 @@ start() {
 
   @PKGLIBDIR@/ensure-dirs
 
+  # Grant capabilities to daemons that need them
+  local daemoncaps=$(_daemon_caps $plain_name)
+  if [[ "$daemoncaps" != "" ]]; then
+    if type -p setcap >/dev/null; then
+      setcap $daemoncaps $(readlink -f $daemonexec)
+    else
+      echo "setcap missing, could not set capabilities for $name." >&2
+      return 1
+    fi
+  fi
+
   if type -p start-stop-daemon >/dev/null; then
     start-stop-daemon --start --quiet --oknodo \
       --pidfile $pidfile \
index a73a4be..b6a6379 100755 (executable)
@@ -13,6 +13,7 @@
 #Configuration
 : ${ARCH:=amd64}
 : ${DIST_RELEASE:=wheezy}
+: ${VARIANT:=}
 : ${CONF_DIR:=/etc/schroot/chroot.d}
 : ${CHROOT_DIR:=/srv/chroot}
 : ${ALTERNATIVE_EDITOR:=/usr/bin/vim.basic}
 # DATA_DIR
 # CHROOT_EXTRA_DEBIAN_PACKAGES
 
+# make the appended variant name more readable
+[ -n "$VARIANT" ] && VARIANT="-${VARIANT#-}"
+
 #Automatically generated variables
-CHROOTNAME=$DIST_RELEASE-$ARCH
+CHROOTNAME=$DIST_RELEASE-$ARCH$VARIANT
 CHNAME=building_$CHROOTNAME
 TEMP_CHROOT_CONF=$CONF_DIR/$CHNAME.conf
 FINAL_CHROOT_CONF=$CHROOTNAME.conf
@@ -35,16 +39,17 @@ COMP_FILEPATH=$ROOT/$COMP_FILENAME
 TEMP_DATA_DIR=`mktemp -d`
 ACTUAL_DATA_DIR=$DATA_DIR
 ACTUAL_DATA_DIR=${ACTUAL_DATA_DIR:-$TEMP_DATA_DIR}
-GHC_VERSION="7.6.3"
-CABAL_INSTALL_VERSION="1.18.0.2"
 SHA1_LIST='
 cabal-install-1.18.0.2.tar.gz 2d1f7a48d17b1e02a1e67584a889b2ff4176a773
+cabal-install-1.22.4.0.tar.gz b98eea96d321cdeed83a201c192dac116e786ec2
 ghc-7.6.3-i386-unknown-linux.tar.bz2 f042b4171a2d4745137f2e425e6949c185f8ea14
 ghc-7.6.3-x86_64-unknown-linux.tar.bz2 46ec3f3352ff57fba0dcbc8d9c20f7bcb6924b77
+ghc-7.8.4-i386-unknown-linux-deb7.tar.bz2 4f523f854c37a43b738359506a89a37a9fa9fc5f
+ghc-7.8.4-x86_64-unknown-linux-deb7.tar.bz2 3f68321b064e5c1ffcb05838b85bcc00aa2315b4
 '
 
 # export all variables needed in the schroot
-export ARCH GHC_VERSION CABAL_INSTALL_VERSION SHA1_LIST
+export ARCH SHA1_LIST
 
 # Use gzip --rsyncable if available, to speed up transfers of generated files
 # The environment variable GZIP is read automatically by 'gzip',
@@ -95,7 +100,7 @@ if [ ! -f $ACTUAL_DATA_DIR/temp.schroot.conf.in ]
 then
   cat <<END >$ACTUAL_DATA_DIR/temp.schroot.conf.in
 [${CHNAME}]
-description=Debian ${DIST_RELEASE} ${ARCH}
+description=Debian ${DIST_RELEASE}${VARIANT} ${ARCH}
 directory=${CHDIR}
 groups=${GROUP}
 users=root
@@ -183,10 +188,10 @@ function install_ghc {
   [ -n "$TDIR" ]
   if [ "$ARCH" == "amd64" ] ; then
     download "$TDIR"/ghc.tar.bz2 \
-      http://www.haskell.org/ghc/dist/${GHC_VERSION}/ghc-${GHC_VERSION}-x86_64-unknown-linux.tar.bz2
+      http://www.haskell.org/ghc/dist/${GHC_VERSION}/ghc-${GHC_VERSION}-x86_64-unknown-linux${GHC_VARIANT}.tar.bz2
   elif [ "$ARCH" == "i386" ] ; then
     download "$TDIR"/ghc.tar.bz2 \
-      http://www.haskell.org/ghc/dist/${GHC_VERSION}/ghc-${GHC_VERSION}-i386-unknown-linux.tar.bz2
+      http://www.haskell.org/ghc/dist/${GHC_VERSION}/ghc-${GHC_VERSION}-i386-unknown-linux${GHC_VARIANT}.tar.bz2
   else
     echo "Don't know what GHC to download for architecture $ARCH" >&2
     return 1
@@ -215,10 +220,16 @@ function install_cabal {
 }
 
 
-case $DIST_RELEASE in
+case ${DIST_RELEASE}${VARIANT} in
 
   squeeze)
 
+    GHC_VERSION="7.6.3"
+    GHC_VARIANT=""
+    CABAL_INSTALL_VERSION="1.18.0.2"
+    CABAL_LIB_VERSION=">=1.18.0 && <1.19"
+    export GHC_VERSION GHC_VARIANT CABAL_INSTALL_VERSION
+
     # do not install libghc6-network-dev, since it's too old, and just
     # confuses the dependencies
     in_chroot -- \
@@ -245,6 +256,7 @@ case $DIST_RELEASE in
 
     in_chroot -- \
       easy_install \
+        unittest2==0.5.1 \
         logilab-astng==0.24.1 \
         logilab-common==0.58.3 \
         mock==1.0.1 \
@@ -300,6 +312,7 @@ case $DIST_RELEASE in
         snap-server-0.9.4.0 \
         PSQueue-1.1 \
         \
+        "Cabal $CABAL_LIB_VERSION" \
         cabal-file-th-0.2.3 \
         shelltestrunner
 
@@ -308,7 +321,8 @@ case $DIST_RELEASE in
       $APT_INSTALL -t squeeze-backports \
         git \
         git-email \
-        vim
+        vim \
+        exuberant-ctags
 
 ;;
 
@@ -329,7 +343,7 @@ case $DIST_RELEASE in
       python-setuptools python-sphinx python-epydoc graphviz python-pyparsing \
       python-simplejson python-pycurl python-paramiko \
       python-bitarray python-ipaddr python-yaml qemu-utils python-coverage pep8 \
-      shelltestrunner python-dev openssh-client vim git git-email
+      shelltestrunner python-dev openssh-client vim git git-email exuberant-ctags
 
     # We need version 0.9.4 of pyinotify because the packaged version, 0.9.3, is
     # incompatibile with the packaged version of python-epydoc 3.0.1.
@@ -363,7 +377,7 @@ case $DIST_RELEASE in
         'hlint>=1.9.12'
 ;;
 
-  testing)
+  jessie)
 
     in_chroot -- \
       $APT_INSTALL \
@@ -382,7 +396,8 @@ case $DIST_RELEASE in
       python-setuptools python-sphinx python-epydoc graphviz python-pyparsing \
       python-simplejson python-pycurl python-pyinotify python-paramiko \
       python-bitarray python-ipaddr python-yaml qemu-utils python-coverage pep8 \
-      shelltestrunner python-dev pylint openssh-client vim git git-email
+      shelltestrunner python-dev pylint openssh-client \
+      vim git git-email exuberant-ctags
 
     in_chroot -- \
       cabal update
@@ -392,6 +407,102 @@ case $DIST_RELEASE in
        'hlint>=1.9.12'
 ;;
 
+  jessie-ghc78)
+
+    GHC_VERSION="7.8.4"
+    GHC_VARIANT="-deb7"
+    CABAL_INSTALL_VERSION="1.22.4.0"
+    # the version of the Cabal library below must match the version used by
+    # CABAL_INSTALL_VERSION, see the dependencies of cabal-install
+    CABAL_LIB_VERSION=">=1.22.2 && <1.23"
+    export GHC_VERSION GHC_VARIANT CABAL_INSTALL_VERSION
+
+    in_chroot -- \
+      $APT_INSTALL \
+        autoconf automake \
+        zlib1g-dev \
+        libgmp3-dev \
+        libcurl4-openssl-dev \
+        libpcre3-dev \
+        happy \
+        hlint hscolour pandoc \
+        graphviz qemu-utils \
+        python-docutils \
+        python-simplejson \
+        python-pyparsing \
+        python-pyinotify \
+        python-pycurl \
+        python-ipaddr \
+        python-yaml \
+        python-paramiko \
+        git \
+        git-email \
+        vim
+
+    in_chroot -- \
+      $APT_INSTALL python-setuptools python-dev build-essential
+
+    in_chroot -- \
+      easy_install \
+        logilab-astng==0.24.1 \
+        logilab-common==0.58.3 \
+        mock==1.0.1 \
+        pylint==0.26.0
+
+    in_chroot -- \
+      easy_install \
+        sphinx==1.1.3 \
+        pep8==1.3.3 \
+        coverage==3.4 \
+        bitarray==0.8.0
+
+    install_ghc
+
+    install_cabal
+
+    in_chroot -- \
+      cabal update
+
+    # since we're using Cabal >=1.16, we can use the parallel install option
+    in_chroot -- \
+      cabal install --global -j --enable-library-profiling \
+        attoparsec==0.12.1.6 \
+        base64-bytestring==1.0.0.1 \
+        blaze-builder==0.4.0.1 \
+        case-insensitive==1.2.0.4 \
+        Crypto==4.2.5.1 \
+        curl==1.3.8 \
+        happy==1.19.5 \
+        hashable==1.2.3.2 \
+        hinotify==0.3.7 \
+        hscolour==1.23 \
+        hslogger==1.2.8 \
+        json==0.9.1 \
+        lifted-base==0.2.3.6 \
+        lens==4.9.1 \
+        MonadCatchIO-transformers==0.3.1.3 \
+        network==2.6.0.2 \
+        parallel==3.2.0.6 \
+        parsec==3.1.7 \
+        regex-pcre==0.94.4 \
+        temporary==1.2.0.3 \
+        vector==0.10.12.3 \
+        zlib==0.5.4.2 \
+        \
+        hlint==1.9.20 \
+        HUnit==1.2.5.2 \
+        QuickCheck==2.8.1 \
+        test-framework==0.8.1.1 \
+        test-framework-hunit==0.3.0.1 \
+        test-framework-quickcheck2==0.3.0.3 \
+        \
+        snap-server==0.9.5.1 \
+        \
+        "Cabal $CABAL_LIB_VERSION" \
+        cabal-file-th==0.2.3 \
+        shelltestrunner==1.3.5
+;;
+
   precise)
     # ghc, git-email and other dependencies are hosted in the universe
     # repository, which is not enabled by default.
@@ -417,7 +528,7 @@ EOF
       python-setuptools python-sphinx python-epydoc graphviz python-pyparsing \
       python-simplejson python-pyinotify python-pycurl python-paramiko \
       python-bitarray python-ipaddr python-yaml qemu-utils python-coverage pep8 \
-      python-dev pylint openssh-client vim git git-email \
+      python-dev pylint openssh-client vim git git-email exuberant-ctags \
       build-essential
 
     in_chroot -- \
@@ -470,7 +581,8 @@ EOF
       python-setuptools python-sphinx python-epydoc graphviz python-pyparsing \
       python-simplejson python-pyinotify python-pycurl python-paramiko \
       python-bitarray python-ipaddr python-yaml qemu-utils python-coverage pep8 \
-      shelltestrunner python-dev pylint openssh-client vim git git-email \
+      shelltestrunner python-dev pylint openssh-client \
+      vim git git-email exuberant-ctags \
       build-essential
 
     in_chroot -- \
index 15b848a..c9b5a15 100644 (file)
@@ -48,7 +48,7 @@ master_doc = "index"
 
 # General information about the project.
 project = u"Ganeti"
-copyright = u"%s Google Inc." % ", ".join(map(str, range(2006, 2013 + 1)))
+copyright = u"%s Google Inc." % ", ".join(map(str, range(2006, 2015 + 1)))
 
 # The version info for the project you're documenting, acts as replacement for
 # |version| and |release|, also used in various other places throughout the
similarity index 68%
copy from doc/design-2.15.rst
copy to doc/design-2.16.rst
index 6da41b4..9731b17 100644 (file)
@@ -1,13 +1,14 @@
 ==================
-Ganeti 2.15 design
+Ganeti 2.16 design
 ==================
 
-The following designs have been partially implemented in Ganeti 2.15.
+The following designs have been partially implemented in Ganeti 2.16.
 
 - :doc:`design-configlock`
-- :doc:`design-shared-storage-redundancy`
+- :doc:`design-os`
 
-The following designs' implementations were completed in Ganeti 2.15.
+The following designs' implementations were completed in Ganeti 2.16.
 
-- :doc:`design-allocation-efficiency`
-- :doc:`design-dedicated-allocation`
+- :doc:`design-location`
+- :doc:`design-plain-redundancy`
+- :doc:`design-shared-storage-redundancy`
index 09cf2ba..b2ce6a2 100644 (file)
@@ -2,7 +2,7 @@
 Design document drafts
 ======================
 
-.. Last updated for Ganeti 2.15
+.. Last updated for Ganeti 2.16
 
 .. toctree::
    :maxdepth: 2
@@ -19,13 +19,13 @@ Design document drafts
    design-move-instance-improvements.rst
    design-node-security.rst
    design-ifdown.rst
-   design-location.rst
    design-reservations.rst
    design-sync-rate-throttling.rst
    design-network2.rst
    design-configlock.rst
    design-multi-storage-htools.rst
-   design-shared-storage-redundancy.rst
+   design-repaird.rst
+   design-scsi-kvm.rst
    design-disks.rst
 
 .. vim: set textwidth=72 :
index 9d0f7aa..64236c0 100644 (file)
@@ -109,3 +109,28 @@ Advise only
 These tags are of advisory nature only. That is, all ``htools`` will strictly
 obey the restrictions imposed by those tags, but Ganeti will not prevent users
 from manually instructing other migrations.
+
+
+Instance pinning
+================
+
+Sometimes, administrators want specific instances located in a particular,
+typically geographic, location. To support these kind of requests, instances
+can be assigned tags of the form *htools:desiredlocation:x* where *x* is a
+failure tag. Those tags indicate the the instance wants to be placed on a
+node tagged *x*. To make ``htools`` honor those desires, the metric is extended,
+appropriately weighted, by the following component.
+
+- Sum of dissatisfied desired locations number among all cluster instances.
+  An instance desired location is dissatisfied when the instance is assigned
+  a desired-location tag *x* where the node is not tagged with the location
+  tag *x*.
+
+Such metric extension allows to specify multiple desired locations for each
+instance. These desired locations may be contradictive as well. Contradictive
+desired locations mean that we don't care which one of desired locations will
+be satisfied.
+
+Again, instance pinning is just heuristics, not a hard enforced requirement;
+it will only be achieved by the cluster metrics favouring such placements.
+
diff --git a/doc/design-plain-redundancy.rst b/doc/design-plain-redundancy.rst
new file mode 100644 (file)
index 0000000..0233bcd
--- /dev/null
@@ -0,0 +1,61 @@
+======================================
+Redundancy for the plain disk template
+======================================
+
+.. contents:: :depth: 4
+
+This document describes how N+1 redundancy is achieved
+for instanes using the plain disk template.
+
+
+Current state and shortcomings
+==============================
+
+Ganeti has long considered N+1 redundancy for DRBD, making sure that
+on the secondary nodes enough memory is reserved to host the instances,
+should one node fail. Recently, ``htools`` have been extended to
+also take :doc:`design-shared-storage-redundancy` into account.
+
+For plain instances, there is no direct notion of redundancy: if the
+node the instance is running on dies, the instance is lost. However,
+if the instance can be reinstalled (e.g, because it is providing a
+stateless service), it does make sense to ask if the remaining nodes
+have enough free capacity for the instances to be recreated. This
+form of capacity planning is currently not addressed by current
+Ganeti.
+
+
+Proposed changes
+================
+
+The basic considerations follow those of :doc:`design-shared-storage-redundancy`.
+Also, the changes to the tools follow the same pattern.
+
+Definition of N+1 redundancy in the presence of shared and plain storage
+------------------------------------------------------------------------
+
+A cluster is considered N+1 redundant, if, for every node, the following
+steps can be carried out. First all DRBD instances are migrated out. Then,
+all shared-storage instances of that node are relocated to another node in
+the same node group. Finally, all plain instances of that node are reinstalled
+on a different node in the same node group; in the search for a new nodes for
+the plain instances, they will be recreated in order of decreasing memory
+size.
+
+Note that the first two setps are those in the definition of N+1 redundancy
+for shared storage. In particular, this notion of redundancy strictly extends
+the one for shared storage. Again, checking this notion of redundancy is
+computationally expensive and the non-DRBD part is mainly a capacity property
+in the sense that we expect the majority of instance moves that are fine
+from a DRBD point of view will not lead from a redundant to a non-redundant
+situation.
+
+Modifications to existing tools
+-------------------------------
+
+The changes to the exisiting tools are literally the same as
+for :doc:`design-shared-storage-redundancy` with the above definition of
+N+1 redundancy substituted in for that of redundancy for shared storage.
+In particular, ``gnt-cluster verify`` will not be changed and ``hbal``
+will use N+1 redundancy as a final filter step to disallow moves
+that lead from a redundant to a non-redundant situation.
diff --git a/doc/design-repaird.rst b/doc/design-repaird.rst
new file mode 100644 (file)
index 0000000..6dad3e7
--- /dev/null
@@ -0,0 +1,273 @@
+=========================
+Ganeti Maintenance Daemon
+=========================
+
+.. contents:: :depth: 4
+
+This design document outlines the implementation of a new Ganeti
+daemon coordinating all maintenance operations on a cluster
+(rebalancing, activate disks, ERROR_down handling, node repairs
+actions).
+
+
+Current state and shortcomings
+==============================
+
+With ``harep``, Ganeti has a basic mechanism for repairs of instances
+in a cluster. The ``harep`` tool can fix a broken DRBD status, migrate,
+failover, and reinstall instances. It is intended to be run regularly,
+e.g., via a cron job. It will submit appropriate Ganeti jobs to take
+action within the range allowed by instance tags and keep track
+of them by recoding the job ids in appropriate tags.
+
+Besides ``harep``, Ganeti offers no further support for repair automation.
+While useful, this setup can be insufficient in some situations.
+
+Failures in actual hardware, e.g., a physical disk, currently requires
+coordination around Ganeti: the hardware failure is detected on the node,
+Ganeti needs to be told to evacuate the node, and, once this is done, some
+other entity needs to coordinate the actual physical repair. Currently there
+is no support by Ganeti to automatically prepare everything for a hardware
+swap.
+
+
+Proposed changes
+================
+
+We propose the addition of an additional daemon, called ``maintd``
+that will coordinate cluster balance actions, instance repair actions,
+and work for hardware repair needs of individual nodes. The information
+about the work to be done will be obtained from a dedicated data collector
+via the :doc:`design-monitoring-agent`.
+
+Self-diagnose data collector
+----------------------------
+
+The monitoring daemon will get one additional dedicated data collector for
+node health. The collector will call an external command supposed to do
+any hardware-specific diagnose for the node it is running on. That command
+is configurable, but needs to be white-listed ahead of time by the node.
+For convenience, the empty string will stand for a build-in diagnose that
+always reports that everything is OK; this will also be the default value
+for this collector.
+
+Note that the self-diagnose data collector itself can, and usually will,
+call separate diagnose tools for separate subsystems. However, it always
+has to provide a consolidated description of the overall health state
+of the node.
+
+Protocol
+~~~~~~~~
+
+The collector script takes no arguments and is supposed to output the string
+representation of a single JSON object where the individual fields have the
+following meaning. Note that, if several things are broken on that node, the
+self-diagnose collector script has to merge them into a single repair action.
+
+status
+......
+
+This is a JSON string where the value is one of ``Ok``, ``live-repair``,
+``evacuate``, ``evacuate-failover``. This indicates the overall need for
+repair and Ganeti actions to be taken. The meaning of these states are
+no action needed, some action is needed that can be taken while instances
+continue to run on that node, it is necessary to evacuate and offline
+the node, and it is necessary to evacuate and offline the node without
+attempting live migrations, respectively.
+
+command
+.......
+
+If the status is ``live-repair``, a repair command can be specified.
+This command will be executed as repair action following the
+:doc:`design-restricted-commands`, however extended to read information
+on ``stdin``. The whole diagnose JSON object will be provided as ``stdin``
+to those commands.
+
+details
+.......
+
+An opaque JSON value that the repair daemon will just pass through and
+export. It is intended to contain information about the type of repair
+that needs to be done after the respective Ganeti action is finished.
+E.g., it might contain information which piece of hardware is to be
+swapped, once the node is fully evacuated and offlined.
+
+As two failures are considered different, if the output of the script
+encodes a different JSON object, the collector script should ensure
+that as long as the hardware status does not change, the output of the
+script is stable; otherwise this would cause various events reported for
+the same failure.
+
+Security considerations
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Command execution
+.................
+
+Obviously, running arbitrary commands that are part of the configuration
+poses a security risk. Note that an underlying design goal of Ganeti is
+that even with RAPI credentials known to the attacker, he still cannot
+obtain data from within the instances. As monitoring, however, is configurable
+via RAPI, we require the node to white-list the command using a mechanism
+similar to the :doc:`design-restricted-commands`; in our case, the white-listing
+directory will be ``/etc/ganeti/node-diagnose-commands``.
+
+For the repair-commands, as mentioned, we extend the
+:doc:`design-restricted-commands` by allowing input on ``stdin``. All other
+restrictions, in particular the white-listing requirement, remain. The
+white-listing directory will be ``/etc/ganeti/node-repair-commands``.
+
+Result forging
+..............
+
+As the repair daemon will take real Ganeti actions based on the diagnose
+reported by the self-diagnose script through the monitoring daemon, we
+need to verify integrity of such reports to avoid denial-of-service by
+fraudaulent error reports. Therefore, the monitoring daemon will sign
+the result by an hmac signature with the cluster hmac key, in the same
+way as it is done in the ``confd`` wire protocol (see :doc:`design-2.1`).
+
+Repair-event life cycle
+-----------------------
+
+Once a repair event is detected, a unique identifier is assigned to it.
+As long as the node-health collector returns the same output (as JSON
+object), this is still considered the same event.
+This identifier can be used to cancel an observed event at any time; for
+this an appropriate command-line and RAPI endpoint will be provided. Cancelling
+an event tells the repair daemon not to take any actions (despite them
+being requested) for this event and forget about it, as soon as it is
+no longer observed.
+
+Corresponding Ganeti actions will be initiated and success or failure of
+these Ganeti jobs monitored. All jobs submitted by the repair daemon
+will have the string ``gnt:daemon:maintd`` and the event identifier
+in the reason trail, so that :doc:`design-optables` is possible.
+Once a job fails, no further jobs will be submitted for this event
+to avoid further damage; the repair action is considered failed in this case.
+
+Once all requested actions succeeded, or one failed, the node where the
+event as observed will be tagged by a tag starting with ``maintd:repairready:``
+or ``maintd:repairfailed:``, respectively, where the event identifier is
+encoded in the rest of the tag. On the one hand, it can be used as an
+additional verification whether a node is ready for a specific repair.
+However, the main purpose is to provide a simple and uniform interface
+to acknowledge an event. Once a ``maintd:repairready`` tag is removed,
+the maintenance daemon will forget about this event, as soon as it is no
+longer observed by any monitoring daemon. Removing a ``maintd:repairfailed:``
+tag will make the maintenance daemon to unconditionally forget the event;
+note that, if the underlying problem is not fixed yet, this provides an
+easy way of restarting a repair flow.
+
+
+Repair daemon
+-------------
+
+The new daemon ``maintd`` will be running on the master node only. It will
+verify the master status of its node by popular vote in the same way as all the
+other master-only daemons. If started on a non-master node, it will exit
+immediately with exit code ``exitNotmaster``, i.e., 11.
+
+External Reporting Protocol
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Upon successful start, the daemon will bind to a port overridable at
+command-line, by default 1816, on the master network device. There it will
+serve the current repair state via HTTP. All queries will be HTTP GET
+requests and all answers will be encoded in JSON format. Initially, the
+following requests will be supported.
+
+``/``
+.....
+
+Returns the list of supported protocol versions, initially just ``[1]``.
+
+``/1/status``
+.............
+
+Returns a list of all non-cleared incidents. Each incident is reported
+as a JSON object with at least the following information.
+
+- ``id`` The unique identifier assigned to the event.
+
+- ``node`` The UUID of the node on which the even was observed.
+
+- ``original`` The very JSON object reported by self-diagnose data collector.
+
+- ``repair-status`` A string describing the progress made on this event so
+  far. It is one of the following.
+
+  + ``noted`` The event has been observed, but no action has been taken yet
+
+  + ``pending`` At least one job has been submitted in reaction to the event
+    and none of the submitted jobs has failed so far.
+
+  + ``canceled`` The event has been canceled, i.e., ordered to be ignored, but
+    is still observed.
+
+  + ``failed`` At least one of the submitted jobs has failed. To avoid further
+    damage, the repair daemon will not take any further action for this event.
+
+  + ``completed`` All Ganeti actions associated with this event have been
+    completed successfully, including tagging the node.
+
+- ``jobs`` The list of the numbers of ganeti jobs submitted in response to
+  this event.
+
+- ``tag`` A string that is the tag that either has been added to the node, or,
+  if the repair event is not yet finalized, will be added in case of success.
+
+State
+~~~~~
+
+As repairs, especially those involving physically swapping hardware, can take
+a long time, the repair daemon needs to store its state persistently. As we
+cannot exclude master-failovers during a repair cycle, it does so by storing
+it as part of the Ganeti configuration.
+
+This will be done by adding a new top-level entry to the Ganeti configuration.
+The SSConf will not be changed.
+
+Superseeding ``harep`` and implicit balancing
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To have a single point coordinating all repair actions, the new repair daemon
+will also have the ability to take over the work currently done by ``harep``.
+To allow a smooth transition, ``maintd`` when carrying out ``harep``'s duties
+will add tags in precisely the same way as ``harep`` does.
+As the new daemon will have to move instances, it will also have the ability
+to balance the cluster in a way coordinated with the necessary evacuation
+options; dynamic load information can be taken into account.
+
+The question on whether to do ``harep``'s work and whether to balance the
+cluster and if so using which strategy (e.g., taking dynamic load information
+into account or not, allowing disk moves or not) are configurable via the Ganeti
+configuration. The default will be to do neither of those tasks. ``harep`` will
+continue to exist unchanged as part of the ``htools``.
+
+Mode of operation
+~~~~~~~~~~~~~~~~~
+
+The repair daemon will poll the monitoring daemons for
+the value of the self-diagnose data collector at the same (configurable)
+rate the monitoring daemon collects this collector; if load-based balancing is
+enabled, it will also collect for the the load data needed.
+
+Repair events will be exposed on the web status page as soon as observed.
+The Ganeti jobs doing the actual maintenance will be submitted in rounds.
+A new round will be started if all jobs of the old round have finished, and
+there is an unhandled repair event or the cluster is unbalanced enough (provided
+that autobalancing is enabled).
+
+In each round, ``maintd`` will first determine the most invasive action for
+each node; despite the self-diagnose collector summing observations in a single
+action recommendation, a new, more invasive recommendation can be issued before
+the handling of the first recommendation is finished. For all nodes to be
+evacuated, the first evacuation task is scheduled, in a way that these tasks do
+not conflict with each other. Then, for all instances on a non-affected node,
+that need ``harep``-style repair (if enabled) those jobs are scheduled to the
+extend of not conflicting with each other. Then on the remaining nodes that
+are not part of a failed repair event either, the jobs
+of the first balancing step are scheduled. All those jobs of a round are
+submitted at once. As they do not conflict they will be able to run in parallel.
diff --git a/doc/design-scsi-kvm.rst b/doc/design-scsi-kvm.rst
new file mode 100644 (file)
index 0000000..2339ade
--- /dev/null
@@ -0,0 +1,247 @@
+==========
+KVM + SCSI
+==========
+
+.. contents:: :depth: 4
+
+This is a design document detailing the refactoring of device
+handling in the KVM Hypervisor. More specifically, it will use
+the latest QEMU device model and modify the hotplug implementation
+so that both PCI and SCSI devices can be managed.
+
+
+Current state and shortcomings
+==============================
+
+Ganeti currently supports SCSI virtual devices in the KVM hypervisor by
+setting the `disk_type` hvparam to `scsi`. Ganeti will eventually
+instruct QEMU to use the deprecated device model (i.e. -drive if=scsi),
+which will expose the backing store as an emulated SCSI device. This
+means that currently SCSI pass-through is not supported.
+
+On the other hand, the current hotplug implementation
+:doc:`design-hotplug` uses the latest QEMU
+device model (via the -device option) and is tailored to paravirtual
+devices, which leads to buggy behavior: if we hotplug a disk to an
+instance that is configured with disk_type=scsi hvparam, the
+disk which will get hot-plugged eventually will be a VirtIO device
+(i.e., virtio-blk-pci) on the PCI bus.
+
+The current implementation of creating the QEMU command line is
+error-prone, since an instance might not be able to boot due to PCI slot
+congestion.
+
+
+Proposed changes
+================
+
+We change the way that the KVM hypervisor handles block devices by
+introducing latest QEMU device model for SCSI devices as well, so that
+scsi-cd, scsi-hd, scsi-block, and scsi-generic device drivers are
+supported too. Additionally we refactor the hotplug implementation in
+order to support hotplugging of SCSI devices too. Finally, we change the
+way we keep track of device info inside runtime files, and the way we
+place each device upon instance startup.
+
+Design decisions
+================
+
+How to identify each device?
+
+Currently KVM does not support arbitrary IDs for devices; supported are
+only names starting with a letter, with max 32 chars length, and only
+including the '.', '_', '-' special chars. Currently we generate an ID
+with the following format: <device type>-<part of uuid>-pci-<slot>.
+This assumes that the device will be plugged in a certain slot on the
+PCI bus. Since we want to support devices on a SCSI bus too and adding
+the PCI slot to the ID is redundant, we dump the last two parts of the
+existing ID. Additionally we get rid of the 'hot' prefix of device type,
+and we add the next two parts of the UUID so the chance of collitions
+is reduced significantly. So, as an example, the device ID of a disk
+with UUID '9e7c85f6-b6e5-4243-b27d-680b78c6d203' would be now
+'disk-9e7c85f6-b6e5-4243'.
+
+
+Which buses does the guest eventually see?
+
+By default QEMU starts with a single PCI bus named "pci.0". In case a
+SCSI controller is added on this bus, a SCSI bus is created with
+the corresponding name: "scsi.0".
+Any SCSI disks will be attached on this SCSI bus. Currently Ganeti does
+not explicitly use a SCSI controller via a command line option, but lets
+QEMU add one automatically if needed. Here, in case we have a SCSI disk,
+a SCSI controller is explicitly added via the -device option. For the
+SCSI controller, we do not specify the PCI slot to use, but let QEMU find
+the first available (see below).
+
+
+What type of SCSI controller to use?
+
+QEMU uses the `lsi` controller by default. To make this configurable we
+add a new hvparam, `scsi_controller_type`. The available types will be
+`lsi`, `megasas`, and `virtio-scsi-pci`.
+
+
+Where to place the devices upon instance startup?
+
+The default QEMU machine type, `pc`, adds a `i440FX-pcihost`
+controller on the root bus that creates a PCI bus with `pci.0` alias.
+By default the first three slots of this bus are occupied: slot 0
+for Host bridge, slot 1 for ISA bridge, and slot 2 for VGA controller.
+Thereafter, the slots depend on the QEMU options passed in the command
+line.
+
+The main reason that we want to be fully aware of the configuration of a
+running instance (machine type, PCI and SCSI bus state, devices, etc.)
+is that in case of migration a QEMU process with the exact same
+configuration should be created on the target node. The configuration is
+kept in the runtime file created just before starting the instance.
+Since hotplug has been introduced, the only thing that can change after
+starting an instance is the configuration related to NICs and Disks.
+
+Before implementing hotplug, Ganeti did not specify PCI slots
+explicitly, but let QEMU decide how to place the devices on the
+corresponding bus. This does not work if we want to have hotplug-able
+devices and migrate-able VMs. Currently, upon runtime file creation, we
+try to reserve PCI slots based on the hvparams, the disks, and the NICs
+of the instance. This has three major shortcomings: first, we have to be
+aware which options modify the PCI bus which is practically impossible
+due to the huge amount of QEMU options, second, QEMU may change the
+default PCI configuration from version to version, and third, we cannot
+know if the extra options passed by the user via the `kvm_extra` hvparam
+modify the PCI bus.
+
+All the above makes the current implementation error prone: an instance
+might not be able to boot if we explicitly add a NIC/Disk on a specific
+PCI slot that QEMU has already used for another device while parsing
+its command line options. Besides that, now, we want to use the SCSI bus
+as well so the above mechanism is insufficient. Here, we decide to put
+only disks and NICs on specific slots on the corresponding bus, and let
+QEMU put everything else automatically. To this end, we decide to let
+the first 12 PCI slots be managed by QEMU, and we start adding PCI
+devices (VirtIO block and network devices) from the 13th slot onwards.
+As far as the SCSI bus is concerned, we decide to put each SCSI
+disk on a different scsi-id (which corresponds to a different target
+number in SCSI terminology). The SCSI bus will not have any default
+reservations.
+
+
+How to support the theoretical maximum of devices, 16 disks and 8 NICs?
+
+By default, one could add up to 20 devices on the PCI bus; that is the
+32 slots of the PCI bus, minus the starting 12 slots that Ganeti
+allows QEMU to manage on its own. In order to by able to add
+more PCI devices, we add the new `kvm_pci_reservations` hvparam to
+denote how many PCI slots QEMU will handle implicitly. The rest will be
+available for disk and NICs inserted explicitly by Ganeti. By default
+the default PCI reservations will be 12 as explained above.
+
+
+How to keep track of the bus state of a running instance?
+
+To be able to hotplug a device, we need to know which slot is
+available on the desired bus. Until now, we were using the ``query-pci``
+QMP command that returns the state of the PCI buses (i.e., which devices
+occupy which slots). Unfortunately, there is no equivalent for the SCSI
+buses. We could use the ``info qtree`` HMP command that practically
+dumps in plain text the whole device tree. This makes it really hard to
+parse. So we decide to generate the bus state of a running instance
+through our local runtime files.
+
+
+What info should be kept in runtime files?
+
+Runtime files are used for instance migration (to run a QEMU process on
+the target node with the same configuration) and for hotplug actions (to
+update the configuration of a running instance so that it can be
+migrated). Until now we were using devices only on the PCI bus, so only
+each device's PCI slot should be kept in the runtime file. This is
+obviously not enough. We decide to replace the `pci` slot of Disk and
+NIC configuration objects, with an `hvinfo` dict. It will contain all
+necessary info for constructing the appropriate -device QEMU option.
+Specifically the `driver`, `id`, and `bus` parameters will be present to
+all kind of devices. PCI devices will have the `addr` parameter, SCSI
+devices will have `channel`, `scsi-id`, and `lun`. NICs and Disks will
+have the extra `netdev` and `drive` parameters correspondingly.
+
+
+How to deal with existing instances?
+
+Only existing instances with paravirtual devices (configured via the
+disk_type and nic_type hvparam) use the latest QEMU device model. Only
+these have the `pci` slot filled. We will use the existing
+_UpgradeSerializedRuntime() method to migrate the old runtime format
+with `pci` slot in Disk and NIC configuration objects to the new one
+with `hvinfo` instead. The new hvinfo will contain the old driver
+(either virtio-blk-pci or virtio-net-pci), the old id
+(hotdisk-123456-pci-4), the default PCI bus (pci.0), and the old PCI
+slot (addr=4). This way those devices will still be hotplug-able, and
+the instance will still be migrate-able. When those instances are
+rebooted, the hvinfo will be re-generated.
+
+
+How to support downgrades?
+
+There are two possible ways, both not very pretty. The first one is to
+use _UpgradeSerializedRuntime() to remove the hvinfo slot. This would
+require the patching of all Ganeti versions down to 2.10 which is practically
+imposible. Another way is to ssh to all nodes and remove this slot upon
+a cluster downgrade. This ugly hack would go away on 2.17 since we support
+downgrades only to the previous minor version.
+
+
+Configuration changes
+---------------------
+
+The ``NIC`` and ``Disk`` objects get one extra slot: ``hvinfo``. It is
+hypervisor-specific and will never reach config.data. In case of the KVM
+Hypervisor it will contain all necessary info for constructing the -device
+QEMU option. Existing entries in runtime files that had a `pci` slot
+will be upgraded to have the corresponding `hvinfo` (see above).
+
+The new `scsi_controller_type` hvparam is added to denote what type of
+SCSI controller should be added to PCI bus if we have a SCSI disk.
+Allowed values will be `lsi`, `virtio-scsi-pci`, and `megasas`.
+We decide to use `lsi` by default since this is the one that QEMU
+adds automatically if not specified explicitly by an option.
+
+
+Hypervisor changes
+------------------
+
+The current implementation verifies if a hotplug action has succeeded
+by scanning the PCI bus and searching for a specific device ID. This
+will change, and we will use the ``query-block`` along with the
+``query-pci`` QMP command to find block devices that are attached to the
+SCSI bus as well.
+
+Up until now, if `disk_type` hvparam was set to `scsi`, QEMU would use the
+deprecated device model and end up using SCSI emulation, e.g.:
+
+  ::
+
+    -drive file=/var/run/ganeti/instance-disks/test:0,if=scsi,format=raw
+
+Now the equivalent, which will also enable hotplugging, will be to set
+disk_type to `scsi-hd`. The QEMU command line will include:
+
+  ::
+
+    -drive file=/var/run/ganeti/instance-disks/test:0,if=none,format=raw,id=disk-9e7c85f6-b6e5-4243
+    -device scsi-hd,id=disk-9e7c85f6-b6e5-4243,drive=disk-9e7c85f6-b6e5-4243,bus=scsi.0,channel=0,scsi-id=0,lun=0
+
+
+User interface
+--------------
+
+The `disk_type` hvparam will additionally support the `scsi-hd`,
+`scsi-block`, and `scsi-generic` values. The first one is equivalent to
+the existing `scsi` value and will make QEMU emulate a SCSI device,
+while the last two will add support for SCSI pass-through and will
+require a real SCSI device on the host.
+
+.. vim: set textwidth=72 :
+.. Local Variables:
+.. mode: rst
+.. fill-column: 72
+.. End:
index 14e8bc1..0347ef2 100644 (file)
@@ -65,9 +65,36 @@ Modifications to existing tools
   on the next move, it will filter out those moves that lead from a
   shared storage N+1 redundant configuration into one that isn't.
 
-- ``hspace`` computing the capacity for DRBD instances will be unchanged.
-  For shared storage instances, however, it will first evacuate one node
-  and then compute capacity as normal pretending that node was offline.
-  While this technically deviates from interatively doing what hail does,
-  it should still give a reasonable estimate of the cluster capacity without
-  significantly increasing the algorithmic complexity.
+- ``hspace`` computing the capacity for DRBD instances will be unchanged;
+  In particular, the options ``--accept-exisiting`` and ``--independent-groups``
+  will continue to work. For shared storage instances, however, will strictly
+  iterate over the same allocation step as hail does.
+
+
+Other modifications related to opportunistic locking
+----------------------------------------------------
+
+To allow parallel instance creation, instance creation jobs can be instructed
+to run with just whatever node locks currently available. In this case, an
+allocation has to be chosen from that restricted set of nodes. Currently, this
+is achieved by sending ``hail`` a cluster description where all other nodes
+are marked offline; that works as long as only local properties are considered.
+With global properties, however, the capacity of the cluster is materially
+underestimated, causing spurious global N+1 failures.
+
+Therefore, we conservatively extend the request format of ``hail`` by an
+optional parameter ``restrict-to-nodes``. If that parameter is given, only
+allocations on those nodes will be considered. This will be an additional
+restriction to ones currently considered (e.g., node must be online, a
+particular group might have been requested). If opportunistic locking is
+enabled, calls to the IAllocator will use this extension to signal which
+nodes to restrict to, instead of marking other nodes offline.
+
+It should be noted that this change brings a race. Two concurrent allocations
+might bring the cluster over the global N+1 capacity limit. As, however, the
+reason for opportunistic locking is an urgent need for instances, this seems
+acceptable; Ganeti generally follows the guideline that current problems are
+more important than future ones. Also, even with that change allocation is
+more careful than the current approach of completely ignoring N+1 redundancy
+for shared storage.
+
index 30950dc..6dcd886 100644 (file)
@@ -6,9 +6,12 @@ PartOf = ganeti-noded.target
 
 [Service]
 Type = simple
-Group = @GNTDAEMONSGROUP@
+User = @GNTMETADUSER@
+Group = @GNTMETADGROUP@
 ExecStart = @SBINDIR@/ganeti-metad -f
 Restart = on-failure
+CapabilityBoundingSet=CAP_NET_BIND_SERVICE
+Capabilities=cap_net_bind_service+=ep
 
 # ganeti-metad is started on-demand by noded, so there must be no Install
 # section.
index dfd20a0..de794bb 100644 (file)
@@ -1,7 +1,7 @@
 Ganeti customisation using hooks
 ================================
 
-Documents Ganeti version 2.15
+Documents Ganeti version 2.16
 
 .. contents::
 
index 13319e8..406f52a 100644 (file)
@@ -1,7 +1,7 @@
 Ganeti automatic instance allocation
 ====================================
 
-Documents Ganeti version 2.15
+Documents Ganeti version 2.16
 
 .. contents::
 
index 16d0301..225c88f 100644 (file)
@@ -81,6 +81,7 @@ and draft versions (which are either incomplete or not implemented).
    design-2.13.rst
    design-2.14.rst
    design-2.15.rst
+   design-2.16.rst
 
 Draft designs
 -------------
@@ -114,6 +115,7 @@ Draft designs
    design-hotplug.rst
    design-internal-shutdown.rst
    design-kvmd.rst
+   design-location.rst
    design-linuxha.rst
    design-lu-generated-jobs.rst
    design-monitoring-agent.rst
@@ -131,11 +133,13 @@ Draft designs
    design-ovf-support.rst
    design-partitioned
    design-performance-tests.rst
+   design-plain-redundancy.rst
    design-query2.rst
    design-query-splitting.rst
    design-reason-trail.rst
    design-restricted-commands.rst
    design-shared-storage.rst
+   design-shared-storage-redundancy.rst
    design-ssh-ports.rst
    design-storagetypes.rst
    design-systemd.rst
index 7521bab..ea86a8a 100644 (file)
@@ -1,7 +1,7 @@
 Security in Ganeti
 ==================
 
-Documents Ganeti version 2.15
+Documents Ganeti version 2.16
 
 Ganeti was developed to run on internal, trusted systems. As such, the
 security model is all-or-nothing.
index d3bd703..e6d74b0 100644 (file)
@@ -4,6 +4,7 @@
 @GNTLUXIDUSER@ @GNTDAEMONSGROUP@
 @GNTRAPIUSER@ @GNTDAEMONSGROUP@
 @GNTMONDUSER@ @GNTDAEMONSGROUP@
+@GNTMETADUSER@ @GNTDAEMONSGROUP@
 @GNTMASTERUSER@ @GNTADMINGROUP@
 @GNTRAPIUSER@ @GNTADMINGROUP@
 @GNTMASTERUSER@ @GNTCONFDGROUP@
index d10df70..dc01511 100644 (file)
@@ -1,9 +1,9 @@
 @GNTDAEMONSGROUP@
 @GNTADMINGROUP@
-@GNTMASTERUSER@
-@GNTRAPIUSER@
-@GNTCONFDUSER@
-@GNTWCONFDUSER@
-@GNTLUXIDUSER@
+@GNTMASTERDGROUP@
+@GNTRAPIGROUP@
+@GNTCONFDGROUP@
+@GNTWCONFDGROUP@
 @GNTLUXIDGROUP@
-@GNTMONDUSER@
+@GNTMONDGROUP@
+@GNTMETADGROUP@
index f4db420..7d2ee99 100644 (file)
@@ -4,4 +4,5 @@
 @GNTWCONFDUSER@ @GNTWCONFDGROUP@
 @GNTLUXIDUSER@ @GNTLUXIDGROUP@
 @GNTMONDUSER@ @GNTMONDGROUP@
+@GNTMETADUSER@ @GNTMETADGROUP@
 @GNTNODEDUSER@
index c49f9a6..7213a80 100644 (file)
@@ -1,7 +1,7 @@
 Virtual cluster support
 =======================
 
-Documents Ganeti version 2.15
+Documents Ganeti version 2.16
 
 .. contents::
 
index 4b523b6..0e5747a 100644 (file)
@@ -970,8 +970,8 @@ def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
   return (None, utils.GetCertificateDigest(cert_filename=cert_file))
 
 
-def _VerifySshSetup(node_status_list, my_name,
-                    pub_key_file=pathutils.SSH_PUB_KEYS):
+def _VerifySshSetup(node_status_list, my_name, ssh_key_type,
+                    ganeti_pub_keys_file=pathutils.SSH_PUB_KEYS):
   """Verifies the state of the SSH key files.
 
   @type node_status_list: list of tuples
@@ -980,8 +980,10 @@ def _VerifySshSetup(node_status_list, my_name,
     is_potential_master_candidate, online)
   @type my_name: str
   @param my_name: name of this node
-  @type pub_key_file: str
-  @param pub_key_file: filename of the public key file
+  @type ssh_key_type: one of L{constants.SSHK_ALL}
+  @param ssh_key_type: type of key used on nodes
+  @type ganeti_pub_keys_file: str
+  @param ganeti_pub_keys_file: filename of the public keys file
 
   """
   if node_status_list is None:
@@ -997,16 +999,16 @@ def _VerifySshSetup(node_status_list, my_name,
 
   result = []
 
-  if not os.path.exists(pub_key_file):
+  if not os.path.exists(ganeti_pub_keys_file):
     result.append("The public key file '%s' does not exist. Consider running"
                   " 'gnt-cluster renew-crypto --new-ssh-keys"
-                  " [--no-ssh-key-check]' to fix this." % pub_key_file)
+                  " [--no-ssh-key-check]' to fix this." % ganeti_pub_keys_file)
     return result
 
   pot_mc_uuids = [uuid for (uuid, _, _, _, _) in node_status_list]
   offline_nodes = [uuid for (uuid, _, _, _, online) in node_status_list
                    if not online]
-  pub_keys = ssh.QueryPubKeyFile(None)
+  pub_keys = ssh.QueryPubKeyFile(None, key_file=ganeti_pub_keys_file)
 
   if potential_master_candidate:
     # Check that the set of potential master candidates matches the
@@ -1029,14 +1031,14 @@ def _VerifySshSetup(node_status_list, my_name,
 
     (_, key_files) = \
       ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
-    (_, dsa_pub_key_filename) = key_files[constants.SSHK_DSA]
+    (_, node_pub_key_file) = key_files[ssh_key_type]
 
     my_keys = pub_keys[my_uuid]
 
-    dsa_pub_key = utils.ReadFile(dsa_pub_key_filename)
-    if dsa_pub_key.strip() not in my_keys:
+    node_pub_key = utils.ReadFile(node_pub_key_file)
+    if node_pub_key.strip() not in my_keys:
       result.append("The dsa key of node %s does not match this node's key"
-                    " in the pub key file." % (my_name))
+                    " in the pub key file." % my_name)
     if len(my_keys) != 1:
       result.append("There is more than one key for node %s in the public key"
                     " file." % my_name)
@@ -1103,7 +1105,7 @@ def _VerifySshClutter(node_status_list, my_name):
   return result
 
 
-def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
+def VerifyNode(what, cluster_name, all_hvparams):
   """Verify the status of the local node.
 
   Based on the input L{what} parameter, various checks are done on the
@@ -1131,11 +1133,6 @@ def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
   @param cluster_name: the cluster's name
   @type all_hvparams: dict of dict of strings
   @param all_hvparams: a dictionary mapping hypervisor names to hvparams
-  @type node_groups: a dict of strings
-  @param node_groups: node _names_ mapped to their group uuids (it's enough to
-      have only those nodes that are in `what["nodelist"]`)
-  @type groups_cfg: a dict of dict of strings
-  @param groups_cfg: a dictionary mapping group uuids to their configuration
   @rtype: dict
   @return: a dictionary with the same keys as the input dict, and
       values representing the result of the checks
@@ -1160,8 +1157,9 @@ def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
     result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
 
   if constants.NV_SSH_SETUP in what:
+    node_status_list, key_type = what[constants.NV_SSH_SETUP]
     result[constants.NV_SSH_SETUP] = \
-      _VerifySshSetup(what[constants.NV_SSH_SETUP], my_name)
+      _VerifySshSetup(node_status_list, my_name, key_type)
     if constants.NV_SSH_CLUTTER in what:
       result[constants.NV_SSH_CLUTTER] = \
         _VerifySshClutter(what[constants.NV_SSH_SETUP], my_name)
@@ -1180,19 +1178,15 @@ def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
 
     # Try to contact all nodes
     val = {}
+    ssh_port_map = ssconf.SimpleStore().GetSshPortMap()
     for node in nodes:
-      params = groups_cfg.get(node_groups.get(node))
-      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
-      logging.debug("Ssh port %s (None = default) for node %s",
-                    str(ssh_port), node)
-
       # We only test if master candidates can communicate to other nodes.
       # We cannot test if normal nodes cannot communicate with other nodes,
       # because the administrator might have installed additional SSH keys,
       # over which Ganeti has no power.
       if my_name in mcs:
         success, message = _GetSshRunner(cluster_name). \
-                              VerifyNodeHostname(node, ssh_port)
+                              VerifyNodeHostname(node, ssh_port_map[node])
         if not success:
           val[node] = message
 
@@ -1959,8 +1953,8 @@ def RemoveNodeSshKeyBulk(node_list,
   return result_msgs
 
 
-def _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
-                        pub_key_file=pathutils.SSH_PUB_KEYS,
+def _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map, ssh_key_type,
+                        ssh_key_bits, pub_key_file=pathutils.SSH_PUB_KEYS,
                         ssconf_store=None,
                         noded_cert_file=pathutils.NODED_CERT_FILE,
                         run_cmd_fn=ssh.RunSshCmdWithStdin,
@@ -1973,6 +1967,10 @@ def _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
   @param node_name: name of the node whose key is remove
   @type ssh_port_map: dict of str to int
   @param ssh_port_map: mapping of node names to their SSH port
+  @type ssh_key_type: One of L{constants.SSHK_ALL}
+  @param ssh_key_type: the type of SSH key to be generated
+  @type ssh_key_bits: int
+  @param ssh_key_bits: the length of the key to be generated
 
   """
   if not ssconf_store:
@@ -1987,7 +1985,7 @@ def _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
   data = {}
   _InitSshUpdateData(data, noded_cert_file, ssconf_store)
   cluster_name = data[constants.SSHS_CLUSTER_NAME]
-  data[constants.SSHS_GENERATE] = {constants.SSHS_SUFFIX: suffix}
+  data[constants.SSHS_GENERATE] = (ssh_key_type, ssh_key_bits, suffix)
 
   run_cmd_fn(cluster_name, node_name, pathutils.SSH_UPDATE,
              ssh_port_map.get(node_name), data,
@@ -2062,8 +2060,9 @@ def _ReplaceMasterKeyOnMaster(root_keyfiles):
 
 
 def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
-                 potential_master_candidates,
-                 pub_key_file=pathutils.SSH_PUB_KEYS,
+                 potential_master_candidates, old_key_type, new_key_type,
+                 new_key_bits,
+                 ganeti_pub_keys_file=pathutils.SSH_PUB_KEYS,
                  ssconf_store=None,
                  noded_cert_file=pathutils.NODED_CERT_FILE,
                  run_cmd_fn=ssh.RunSshCmdWithStdin):
@@ -2077,8 +2076,14 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
   @type master_candidate_uuids: list of str
   @param master_candidate_uuids: list of UUIDs of master candidates or
     master node
-  @type pub_key_file: str
-  @param pub_key_file: file path of the the public key file
+  @type old_key_type: One of L{constants.SSHK_ALL}
+  @param old_key_type: the type of SSH key already present on nodes
+  @type new_key_type: One of L{constants.SSHK_ALL}
+  @param new_key_type: the type of SSH key to be generated
+  @type new_key_bits: int
+  @param new_key_bits: the length of the key to be generated
+  @type ganeti_pub_keys_file: str
+  @param ganeti_pub_keys_file: file path of the the public key file
   @type noded_cert_file: str
   @param noded_cert_file: path of the noded SSL certificate file
   @type run_cmd_fn: function
@@ -2100,8 +2105,9 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
 
   (_, root_keyfiles) = \
     ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
-  (_, dsa_pub_keyfile) = root_keyfiles[constants.SSHK_DSA]
-  old_master_key = utils.ReadFile(dsa_pub_keyfile)
+  (_, old_pub_keyfile) = root_keyfiles[old_key_type]
+  (_, new_pub_keyfile) = root_keyfiles[new_key_type]
+  old_master_key = utils.ReadFile(old_pub_keyfile)
 
   node_uuid_name_map = zip(node_uuids, node_names)
 
@@ -2132,7 +2138,8 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
     node_list.append((node_uuid, node_name, master_candidate,
                       potential_master_candidate))
 
-    keys_by_uuid = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
+    keys_by_uuid = ssh.QueryPubKeyFile([node_uuid],
+                                       key_file=ganeti_pub_keys_file)
     if not keys_by_uuid:
       raise errors.SshUpdateError("No public key of node %s (UUID %s) found,"
                                   " not generating a new key."
@@ -2140,7 +2147,7 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
 
     if master_candidate:
       logging.debug("Fetching old SSH key from node '%s'.", node_name)
-      old_pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile,
+      old_pub_key = ssh.ReadRemoteSshPubKeys(old_pub_keyfile,
                                              node_name, cluster_name,
                                              ssh_port_map[node_name],
                                              False, # ask_key
@@ -2176,15 +2183,15 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
       in node_list:
 
     logging.debug("Generating new SSH key for node '%s'.", node_name)
-    _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
-                        pub_key_file=pub_key_file,
+    _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map, new_key_type,
+                        new_key_bits, pub_key_file=ganeti_pub_keys_file,
                         ssconf_store=ssconf_store,
                         noded_cert_file=noded_cert_file,
                         run_cmd_fn=run_cmd_fn)
 
     try:
       logging.debug("Fetching newly created SSH key from node '%s'.", node_name)
-      pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile,
+      pub_key = ssh.ReadRemoteSshPubKeys(new_pub_keyfile,
                                          node_name, cluster_name,
                                          ssh_port_map[node_name],
                                          False, # ask_key
@@ -2194,10 +2201,9 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
                                   " (UUID %s)" % (node_name, node_uuid))
 
     if potential_master_candidate:
-      ssh.RemovePublicKey(node_uuid, key_file=pub_key_file)
-      ssh.AddPublicKey(node_uuid, pub_key, key_file=pub_key_file)
+      ssh.RemovePublicKey(node_uuid, key_file=ganeti_pub_keys_file)
+      ssh.AddPublicKey(node_uuid, pub_key, key_file=ganeti_pub_keys_file)
 
-    logging.debug("Add ssh key of node '%s'.", node_name)
     node_info = SshAddNodeInfo(name=node_name,
                                uuid=node_uuid,
                                to_authorized_keys=master_candidate,
@@ -2207,7 +2213,7 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
 
   node_errors = AddNodeSshKeyBulk(
       node_keys_to_add, potential_master_candidates,
-      pub_key_file=pub_key_file, ssconf_store=ssconf_store,
+      pub_key_file=ganeti_pub_keys_file, ssconf_store=ssconf_store,
       noded_cert_file=noded_cert_file,
       run_cmd_fn=run_cmd_fn)
   if node_errors:
@@ -2216,12 +2222,14 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
   # Renewing the master node's key
 
   # Preserve the old keys for now
-  old_master_keys_by_uuid = _GetOldMasterKeys(master_node_uuid, pub_key_file)
+  old_master_keys_by_uuid = _GetOldMasterKeys(master_node_uuid,
+                                              ganeti_pub_keys_file)
 
   # Generate a new master key with a suffix, don't touch the old one for now
   logging.debug("Generate new ssh key of master.")
   _GenerateNodeSshKey(master_node_uuid, master_node_name, ssh_port_map,
-                      pub_key_file=pub_key_file,
+                      new_key_type, new_key_bits,
+                      pub_key_file=ganeti_pub_keys_file,
                       ssconf_store=ssconf_store,
                       noded_cert_file=noded_cert_file,
                       run_cmd_fn=run_cmd_fn,
@@ -2230,16 +2238,16 @@ def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
   new_master_key_dict = _GetNewMasterKey(root_keyfiles, master_node_uuid)
 
   # Replace master key in the master nodes' public key file
-  ssh.RemovePublicKey(master_node_uuid, key_file=pub_key_file)
+  ssh.RemovePublicKey(master_node_uuid, key_file=ganeti_pub_keys_file)
   for pub_key in new_master_key_dict[master_node_uuid]:
-    ssh.AddPublicKey(master_node_uuid, pub_key, key_file=pub_key_file)
+    ssh.AddPublicKey(master_node_uuid, pub_key, key_file=ganeti_pub_keys_file)
 
   # Add new master key to all node's public and authorized keys
   logging.debug("Add new master key to all nodes.")
   node_errors = AddNodeSshKey(
       master_node_uuid, master_node_name, potential_master_candidates,
       to_authorized_keys=True, to_public_keys=True,
-      get_public_keys=False, pub_key_file=pub_key_file,
+      get_public_keys=False, pub_key_file=ganeti_pub_keys_file,
       ssconf_store=ssconf_store, noded_cert_file=noded_cert_file,
       run_cmd_fn=run_cmd_fn)
   if node_errors:
index 7b6fbfe..a824977 100644 (file)
@@ -485,16 +485,17 @@ def _InitCheckDrbdHelper(drbd_helper, drbd_enabled):
 def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
                 master_netmask, master_netdev, file_storage_dir,
                 shared_file_storage_dir, gluster_storage_dir,
-                candidate_pool_size, secondary_ip=None,
-                vg_name=None, beparams=None, nicparams=None, ndparams=None,
-                hvparams=None, diskparams=None, enabled_hypervisors=None,
-                modify_etc_hosts=True, modify_ssh_setup=True,
-                maintain_node_health=False, drbd_helper=None, uid_pool=None,
-                default_iallocator=None, default_iallocator_params=None,
-                primary_ip_version=None, ipolicy=None,
-                prealloc_wipe_disks=False, use_external_mip_script=False,
-                hv_state=None, disk_state=None, enabled_disk_templates=None,
-                install_image=None, zeroing_image=None, compression_tools=None,
+                candidate_pool_size, ssh_key_type, ssh_key_bits,
+                secondary_ip=None, vg_name=None, beparams=None, nicparams=None,
+                ndparams=None, hvparams=None, diskparams=None,
+                enabled_hypervisors=None, modify_etc_hosts=True,
+                modify_ssh_setup=True, maintain_node_health=False,
+                drbd_helper=None, uid_pool=None, default_iallocator=None,
+                default_iallocator_params=None, primary_ip_version=None,
+                ipolicy=None, prealloc_wipe_disks=False,
+                use_external_mip_script=False, hv_state=None, disk_state=None,
+                enabled_disk_templates=None, install_image=None,
+                zeroing_image=None, compression_tools=None,
                 enabled_user_shutdown=False):
   """Initialise the cluster.
 
@@ -713,7 +714,7 @@ def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
     utils.AddHostToEtcHosts(hostname.name, hostname.ip)
 
   if modify_ssh_setup:
-    ssh.InitSSHSetup()
+    ssh.InitSSHSetup(ssh_key_type, ssh_key_bits)
 
   if default_iallocator is not None:
     alloc_script = utils.FindFile(default_iallocator,
@@ -797,6 +798,8 @@ def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
     zeroing_image=zeroing_image,
     compression_tools=compression_tools,
     enabled_user_shutdown=enabled_user_shutdown,
+    ssh_key_type=ssh_key_type,
+    ssh_key_bits=ssh_key_bits,
     )
   master_node_config = objects.Node(name=hostname.name,
                                     primary_ip=hostname.ip,
@@ -814,7 +817,7 @@ def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
 
   master_uuid = cfg.GetMasterNode()
   if modify_ssh_setup:
-    ssh.InitPubKeyFile(master_uuid)
+    ssh.InitPubKeyFile(master_uuid, ssh_key_type)
   # set up the inter-node password and certificate
   _InitGanetiServerSetup(hostname.name, cfg)
 
index ae58ede..9f4d530 100644 (file)
@@ -238,6 +238,8 @@ __all__ = [
   "SPLIT_ISPECS_OPTS",
   "SRC_DIR_OPT",
   "SRC_NODE_OPT",
+  "SSH_KEY_BITS_OPT",
+  "SSH_KEY_TYPE_OPT",
   "STARTUP_PAUSED_OPT",
   "STATIC_OPT",
   "SUBMIT_OPT",
@@ -1594,6 +1596,17 @@ LONG_SLEEP_OPT = cli_option(
     "--long-sleep", default=False, dest="long_sleep",
     help="Allow long shutdowns when backing up instances", action="store_true")
 
+SSH_KEY_TYPE_OPT = \
+    cli_option("--ssh-key-type", default=None,
+               choices=list(constants.SSHK_ALL), dest="ssh_key_type",
+               help="Type of SSH key deployed by Ganeti for cluster actions")
+
+SSH_KEY_BITS_OPT = \
+    cli_option("--ssh-key-bits", default=None,
+               type="int", dest="ssh_key_bits",
+               help="Length of SSH keys generated by Ganeti, in bits")
+
+
 #: Options provided by all commands
 COMMON_OPTS = [DEBUG_OPT, REASON_OPT]
 
index 40792e2..4acb02e 100644 (file)
@@ -299,6 +299,14 @@ def InitCluster(opts, args):
   else:
     enabled_user_shutdown = False
 
+  if opts.ssh_key_type:
+    ssh_key_type = opts.ssh_key_type
+  else:
+    ssh_key_type = constants.SSH_DEFAULT_KEY_TYPE
+
+  ssh_key_bits = ssh.DetermineKeyBits(ssh_key_type, opts.ssh_key_bits, None,
+                                      None)
+
   bootstrap.InitCluster(cluster_name=args[0],
                         secondary_ip=opts.secondary_ip,
                         vg_name=vg_name,
@@ -333,6 +341,8 @@ def InitCluster(opts, args):
                         zeroing_image=zeroing_image,
                         compression_tools=compression_tools,
                         enabled_user_shutdown=enabled_user_shutdown,
+                        ssh_key_type=ssh_key_type,
+                        ssh_key_bits=ssh_key_bits,
                         )
   op = opcodes.OpClusterPostInit()
   SubmitOpCode(op, opts=opts)
@@ -612,6 +622,9 @@ def ShowClusterConfig(opts, args):
       ("zeroing image", result["zeroing_image"]),
       ("compression tools", result["compression_tools"]),
       ("enabled user shutdown", result["enabled_user_shutdown"]),
+      ("modify ssh setup", result["modify_ssh_setup"]),
+      ("ssh_key_type", result["ssh_key_type"]),
+      ("ssh_key_bits", result["ssh_key_bits"]),
       ]),
 
     ("Default node parameters",
@@ -787,7 +800,7 @@ def VerifyDisks(opts, args):
   """
   cl = GetClient()
 
-  op = opcodes.OpClusterVerifyDisks()
+  op = opcodes.OpClusterVerifyDisks(group_name=opts.nodegroup)
 
   result = SubmitOpCode(op, cl=cl, opts=opts)
 
@@ -972,11 +985,12 @@ def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
   return pem
 
 
+# pylint: disable=R0913
 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
                  rapi_cert_filename, new_spice_cert, spice_cert_filename,
                  spice_cacert_filename, new_confd_hmac_key, new_cds,
                  cds_filename, force, new_node_cert, new_ssh_keys,
-                 verbose, debug):
+                 ssh_key_type, ssh_key_bits, verbose, debug):
   """Renews cluster certificates, keys and secrets.
 
   @type new_cluster_cert: bool
@@ -1004,10 +1018,14 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
   @param new_node_cert: Whether to generate new node certificates
   @type new_ssh_keys: bool
   @param new_ssh_keys: Whether to generate new node SSH keys
+  @type ssh_key_type: One of L{constants.SSHK_ALL}
+  @param ssh_key_type: The type of SSH key to be generated
+  @type ssh_key_bits: int
+  @param ssh_key_bits: The length of the key to be generated
   @type verbose: boolean
-  @param verbose: show verbose output
+  @param verbose: Show verbose output
   @type debug: boolean
-  @param debug: show debug output
+  @param debug: Show debug output
 
   """
   ToStdout("Updating certificates now. Running \"gnt-cluster verify\" "
@@ -1188,7 +1206,9 @@ def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
     cl = GetClient()
     renew_op = opcodes.OpClusterRenewCrypto(
         node_certificates=new_node_cert or new_cluster_cert,
-        ssh_keys=new_ssh_keys)
+        renew_ssh_keys=new_ssh_keys,
+        ssh_key_type=ssh_key_type,
+        ssh_key_bits=ssh_key_bits)
     SubmitOpCode(renew_op, cl=cl)
 
   ToStdout("All requested certificates and keys have been replaced."
@@ -1205,18 +1225,25 @@ def _BuildGanetiPubKeys(options, pub_key_file=pathutils.SSH_PUB_KEYS, cl=None,
   """Recreates the 'ganeti_pub_key' file by polling all nodes.
 
   """
+
+  if not cl:
+    cl = GetClient()
+
+  (cluster_name, master_node, modify_ssh_setup, ssh_key_type) = \
+    cl.QueryConfigValues(["cluster_name", "master_node", "modify_ssh_setup",
+                          "ssh_key_type"])
+
+  # In case Ganeti is not supposed to modify the SSH setup, simply exit and do
+  # not update this file.
+  if not modify_ssh_setup:
+    return
+
   if os.path.exists(pub_key_file):
     utils.CreateBackup(pub_key_file)
     utils.RemoveFile(pub_key_file)
 
   ssh.ClearPubKeyFile(pub_key_file)
 
-  if not cl:
-    cl = GetClient()
-
-  (cluster_name, master_node) = \
-    cl.QueryConfigValues(["cluster_name", "master_node"])
-
   online_nodes = get_online_nodes_fn([], cl=cl)
   ssh_ports = get_nodes_ssh_ports_fn(online_nodes + [master_node], cl)
   ssh_port_map = dict(zip(online_nodes + [master_node], ssh_ports))
@@ -1229,7 +1256,7 @@ def _BuildGanetiPubKeys(options, pub_key_file=pathutils.SSH_PUB_KEYS, cl=None,
 
   _, pub_key_filename, _ = \
     ssh.GetUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False,
-                     kind=constants.SSHK_DSA, _homedir_fn=homedir_fn)
+                     kind=ssh_key_type, _homedir_fn=homedir_fn)
 
   # get the key file of the master node
   pub_key = utils.ReadFile(pub_key_filename)
@@ -1263,6 +1290,8 @@ def RenewCrypto(opts, args):
                       opts.force,
                       opts.new_node_cert,
                       opts.new_ssh_keys,
+                      opts.ssh_key_type,
+                      opts.ssh_key_bits,
                       opts.verbose,
                       opts.debug > 0)
 
@@ -2415,7 +2444,7 @@ commands = {
      HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
      IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
      ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
-     ENABLED_USER_SHUTDOWN_OPT,
+     ENABLED_USER_SHUTDOWN_OPT, SSH_KEY_BITS_OPT, SSH_KEY_TYPE_OPT,
      ]
      + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
     "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
@@ -2439,7 +2468,7 @@ commands = {
      VERIFY_CLUTTER_OPT],
     "", "Does a check on the cluster configuration"),
   "verify-disks": (
-    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
+    VerifyDisks, ARGS_NONE, [PRIORITY_OPT, NODEGROUP_OPT],
     "", "Does a check on the cluster disk status"),
   "repair-disk-sizes": (
     RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
@@ -2515,7 +2544,7 @@ commands = {
      NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
      NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
      NEW_NODE_CERT_OPT, NEW_SSH_KEY_OPT, NOSSH_KEYCHECK_OPT,
-     VERBOSE_OPT],
+     VERBOSE_OPT, SSH_KEY_BITS_OPT, SSH_KEY_TYPE_OPT],
     "[opts...]",
     "Renews cluster certificates, keys and secrets"),
   "epo": (
index d05fbc2..48a557f 100644 (file)
@@ -583,6 +583,16 @@ def TestJobqueue(opts, _):
   return 0
 
 
+def TestOsParams(opts, _):
+  """Set secret os parameters.
+
+  """
+  op = opcodes.OpTestOsParams(osparams_secret=opts.osparams_secret)
+  SubmitOrSend(op, opts)
+
+  return 0
+
+
 def ListLocks(opts, args): # pylint: disable=W0613
   """List all locks.
 
@@ -785,6 +795,10 @@ commands = {
   "test-jobqueue": (
     TestJobqueue, ARGS_NONE, [PRIORITY_OPT],
     "", "Test a few aspects of the job queue"),
+  "test-osparams": (
+    TestOsParams, ARGS_NONE, [OSPARAMS_SECRET_OPT] + SUBMIT_OPTS,
+    "[--os-parameters-secret <params>]",
+    "Test secret os parameter transmission"),
   "locks": (
     ListLocks, ARGS_NONE,
     [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT, VERBOSE_OPT],
index 52da28e..cc5b851 100644 (file)
@@ -1398,12 +1398,7 @@ def SetInstanceParams(opts, args):
 
   # verify the user provided parameters for disk template conversions
   if opts.disk_template:
-    if (not opts.node and
-        opts.disk_template in constants.DTS_INT_MIRROR):
-      ToStderr("Changing the disk template to a mirrored one requires"
-               " specifying a secondary node")
-      return 1
-    elif (opts.ext_params and
+    if (opts.ext_params and
           opts.disk_template != constants.DT_EXT):
       ToStderr("Specifying ExtStorage parameters requires converting"
                " to the '%s' disk template" % constants.DT_EXT)
@@ -1442,6 +1437,7 @@ def SetInstanceParams(opts, args):
                                    file_driver=opts.file_driver,
                                    file_storage_dir=opts.file_storage_dir,
                                    remote_node=opts.node,
+                                   iallocator=opts.iallocator,
                                    pnode=opts.new_primary_node,
                                    hvparams=opts.hvparams,
                                    beparams=opts.beparams,
@@ -1658,7 +1654,8 @@ commands = {
   "modify": (
     SetInstanceParams, ARGS_ONE_INSTANCE,
     [BACKEND_OPT, DISK_OPT, FORCE_OPT, HVOPTS_OPT, NET_OPT] + SUBMIT_OPTS +
-    [DISK_TEMPLATE_OPT, SINGLE_NODE_OPT, OS_OPT, FORCE_VARIANT_OPT,
+    [DISK_TEMPLATE_OPT, SINGLE_NODE_OPT, IALLOCATOR_OPT,
+     OS_OPT, FORCE_VARIANT_OPT,
      OSPARAMS_OPT, OSPARAMS_PRIVATE_OPT, DRY_RUN_OPT, PRIORITY_OPT, NWSYNC_OPT,
      OFFLINE_INST_OPT, ONLINE_INST_OPT, IGNORE_IPOLICY_OPT, RUNTIME_MEM_OPT,
      NOCONFLICTSCHECK_OPT, NEW_PRIMARY_OPT, HOTPLUG_OPT,
index 87f3d19..972376d 100644 (file)
@@ -230,12 +230,17 @@ def _SetupSSH(options, cluster_name, node, ssh_port, cl):
   (_, cert_pem) = \
     utils.ExtractX509Certificate(utils.ReadFile(pathutils.NODED_CERT_FILE))
 
+  (ssh_key_type, ssh_key_bits) = \
+    cl.QueryConfigValues(["ssh_key_type", "ssh_key_bits"])
+
   data = {
     constants.SSHS_CLUSTER_NAME: cluster_name,
     constants.SSHS_NODE_DAEMON_CERTIFICATE: cert_pem,
     constants.SSHS_SSH_HOST_KEY: host_keys,
     constants.SSHS_SSH_ROOT_KEY: root_keys,
     constants.SSHS_SSH_AUTHORIZED_KEYS: candidate_keys,
+    constants.SSHS_SSH_KEY_TYPE: ssh_key_type,
+    constants.SSHS_SSH_KEY_BITS: ssh_key_bits,
     }
 
   ssh.RunSshCmdWithStdin(cluster_name, node, pathutils.PREPARE_NODE_JOIN,
@@ -244,9 +249,9 @@ def _SetupSSH(options, cluster_name, node, ssh_port, cl):
                          use_cluster_key=False, ask_key=options.ssh_key_check,
                          strict_host_check=options.ssh_key_check)
 
-  (_, dsa_pub_keyfile) = root_keyfiles[constants.SSHK_DSA]
-  pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile, node, cluster_name,
-                                     ssh_port, options.ssh_key_check,
+  (_, pub_keyfile) = root_keyfiles[ssh_key_type]
+  pub_key = ssh.ReadRemoteSshPubKeys(pub_keyfile, node, cluster_name, ssh_port,
+                                     options.ssh_key_check,
                                      options.ssh_key_check)
   # Unfortunately, we have to add the key with the node name rather than
   # the node's UUID here, because at this point, we do not have a UUID yet.
@@ -312,7 +317,17 @@ def AddNode(opts, args):
   # read the cluster name from the master
   (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
 
-  if opts.node_setup:
+  if not opts.node_setup:
+    ToStdout("-- WARNING -- \n"
+             "The option --no-node-setup is disabled. Whether or not the\n"
+             "SSH setup is manipulated while adding a node is determined\n"
+             "by the 'modify_ssh_setup' value in the cluster-wide\n"
+             "configuration instead.\n")
+
+  (modify_ssh_setup, ) = \
+    cl.QueryConfigValues(["modify_ssh_setup"])
+
+  if modify_ssh_setup:
     ToStderr("-- WARNING -- \n"
              "Performing this operation is going to perform the following\n"
              "changes to the target machine (%s) and the current cluster\n"
@@ -324,7 +339,7 @@ def AddNode(opts, args):
              "  generated public SSH key will be distributed to all other\n"
              "  cluster nodes.\n", node)
 
-  if opts.node_setup:
+  if modify_ssh_setup:
     _SetupSSH(opts, cluster_name, node, ssh_port, cl)
 
   bootstrap.SetupNodeDaemon(opts, cluster_name, node, ssh_port)
@@ -342,7 +357,7 @@ def AddNode(opts, args):
                          master_capable=opts.master_capable,
                          disk_state=disk_state,
                          hv_state=hv_state,
-                         node_setup=opts.node_setup)
+                         node_setup=modify_ssh_setup)
   SubmitOpCode(op, opts=opts)
 
 
index ee02417..5fd9b8d 100644 (file)
@@ -128,6 +128,7 @@ from ganeti.cmdlib.misc import \
   LUExtStorageDiagnose, \
   LURestrictedCommand
 from ganeti.cmdlib.test import \
+  LUTestOsParams, \
   LUTestDelay, \
   LUTestJqueue, \
   LUTestAllocator
index 6307bb1..57eb8d5 100644 (file)
@@ -115,7 +115,7 @@ class LogicalUnit(object): # pylint: disable=R0902
   HTYPE = None
   REQ_BGL = True
 
-  def __init__(self, processor, op, context, cfg,
+  def __init__(self, processor, op, cfg,
                rpc_runner, wconfdcontext, wconfd):
     """Constructor for LogicalUnit.
 
@@ -135,7 +135,6 @@ class LogicalUnit(object): # pylint: disable=R0902
     self.cfg = cfg
     self.wconfdlocks = []
     self.wconfdcontext = wconfdcontext
-    self.context = context
     self.rpc = rpc_runner
     self.wconfd = wconfd # wconfd module to use, for testing
 
index cfe5feb..43df844 100644 (file)
@@ -90,13 +90,19 @@ class LUClusterRenewCrypto(NoHooksLU):
   def CheckPrereq(self):
     """Check prerequisites.
 
-    This checks whether the cluster is empty.
-
-    Any errors are signaled by raising errors.OpPrereqError.
+    Notably the compatibility of specified key bits and key type.
 
     """
-    self._ssh_renewal_suppressed = \
-      not self.cfg.GetClusterInfo().modify_ssh_setup and self.op.ssh_keys
+    cluster_info = self.cfg.GetClusterInfo()
+
+    self.ssh_key_type = self.op.ssh_key_type
+    if self.ssh_key_type is None:
+      self.ssh_key_type = cluster_info.ssh_key_type
+
+    self.ssh_key_bits = ssh.DetermineKeyBits(self.ssh_key_type,
+                                             self.op.ssh_key_bits,
+                                             cluster_info.ssh_key_type,
+                                             cluster_info.ssh_key_bits)
 
   def _RenewNodeSslCertificates(self, feedback_fn):
     """Renews the nodes' SSL certificates.
@@ -159,9 +165,12 @@ class LUClusterRenewCrypto(NoHooksLU):
 
     self.cfg.SetCandidateCerts(digest_map)
 
-  def _RenewSshKeys(self):
+  def _RenewSshKeys(self, feedback_fn):
     """Renew all nodes' SSH keys.
 
+    @type feedback_fn: function
+    @param feedback_fn: logging function, see L{ganeti.cmdlist.base.LogicalUnit}
+
     """
     master_uuid = self.cfg.GetMasterNode()
 
@@ -172,23 +181,37 @@ class LUClusterRenewCrypto(NoHooksLU):
     node_uuids = [uuid for (uuid, _) in nodes_uuid_names]
     potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
     master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
+
+    cluster_info = self.cfg.GetClusterInfo()
+
     result = self.rpc.call_node_ssh_keys_renew(
       [master_uuid],
       node_uuids, node_names,
       master_candidate_uuids,
-      potential_master_candidates)
+      potential_master_candidates,
+      cluster_info.ssh_key_type, # Old key type
+      self.ssh_key_type,         # New key type
+      self.ssh_key_bits)         # New key bits
     result[master_uuid].Raise("Could not renew the SSH keys of all nodes")
 
+    # After the keys have been successfully swapped, time to commit the change
+    # in key type
+    cluster_info.ssh_key_type = self.ssh_key_type
+    cluster_info.ssh_key_bits = self.ssh_key_bits
+    self.cfg.Update(cluster_info, feedback_fn)
+
   def Exec(self, feedback_fn):
     if self.op.node_certificates:
       feedback_fn("Renewing Node SSL certificates")
       self._RenewNodeSslCertificates(feedback_fn)
-    if self.op.ssh_keys and not self._ssh_renewal_suppressed:
-      feedback_fn("Renewing SSH keys")
-      self._RenewSshKeys()
-    elif self._ssh_renewal_suppressed:
-      feedback_fn("Cannot renew SSH keys if the cluster is configured to not"
-                  " modify the SSH setup.")
+
+    if self.op.renew_ssh_keys:
+      if self.cfg.GetClusterInfo().modify_ssh_setup:
+        feedback_fn("Renewing SSH keys")
+        self._RenewSshKeys(feedback_fn)
+      else:
+        feedback_fn("Cannot renew SSH keys if the cluster is configured to not"
+                    " modify the SSH setup.")
 
 
 class LUClusterActivateMasterIp(NoHooksLU):
index dfa1294..772ea9a 100644 (file)
@@ -193,9 +193,14 @@ class LUClusterVerifyDisks(NoHooksLU):
 
   def ExpandNames(self):
     self.share_locks = ShareAll()
-    self.needed_locks = {
-      locking.LEVEL_NODEGROUP: locking.ALL_SET,
-      }
+    if self.op.group_name:
+      self.needed_locks = {
+        locking.LEVEL_NODEGROUP: [self.cfg.LookupNodeGroup(self.op.group_name)]
+        }
+    else:
+      self.needed_locks = {
+        locking.LEVEL_NODEGROUP: locking.ALL_SET,
+        }
 
   def Exec(self, feedback_fn):
     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
@@ -1766,6 +1771,36 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     if n_drained:
       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
 
+  def _VerifyExclusionTags(self, nodename, pinst, ctags):
+    """Verify that all instances have different exclusion tags.
+
+    @type nodename: string
+    @param nodename: the name of the node for which the check is done
+    @type pinst: list of string
+    @param pinst: list of UUIDs of those instances having the given node
+        as primary node
+    @type ctags: list of string
+    @param ctags: tags of the cluster
+
+    """
+    exclusion_prefixes = utils.GetExclusionPrefixes(ctags)
+    tags_seen = set([])
+    conflicting_tags = set([])
+    for iuuid in pinst:
+      allitags = self.my_inst_info[iuuid].tags
+      if allitags is None:
+        allitags = []
+      itags = set([tag for tag in allitags
+                   if utils.IsGoodTag(exclusion_prefixes, tag)])
+      conflicts = itags.intersection(tags_seen)
+      if len(conflicts) > 0:
+        conflicting_tags = conflicting_tags.union(conflicts)
+      tags_seen = tags_seen.union(itags)
+
+    self._ErrorIf(len(conflicting_tags) > 0, constants.CV_EEXTAGS, nodename,
+                  "Tags where there is more than one instance: %s",
+                  list(conflicting_tags), code=constants.CV_WARNING)
+
   def Exec(self, feedback_fn): # pylint: disable=R0915
     """Verify integrity of the node group, performing various test on nodes.
 
@@ -1838,7 +1873,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       }
 
     if self.cfg.GetClusterInfo().modify_ssh_setup:
-      node_verify_param[constants.NV_SSH_SETUP] = self._PrepareSshSetupCheck()
+      node_verify_param[constants.NV_SSH_SETUP] = \
+        (self._PrepareSshSetupCheck(), self.cfg.GetClusterInfo().ssh_key_type)
       if self.op.verify_clutter:
         node_verify_param[constants.NV_SSH_CLUTTER] = True
 
@@ -1928,10 +1964,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     if self._exclusive_storage:
       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
 
-    node_group_uuids = dict(map(lambda n: (n.name, n.group),
-                                self.cfg.GetAllNodesInfo().values()))
-    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
-
     # At this point, we have the in-memory data structures complete,
     # except for the runtime information, which we'll gather next
 
@@ -1965,9 +1997,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
                                              node_verify_param,
                                              cluster_name,
-                                             hvparams,
-                                             node_group_uuids,
-                                             groups_config)
+                                             hvparams)
       nvinfo_endtime = time.time()
 
       if self.extra_lv_nodes and vg_name is not None:
@@ -1977,9 +2007,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
             self.rpc.call_node_verify(self.extra_lv_nodes,
                                       {constants.NV_LVLIST: vg_name},
                                       self.cfg.GetClusterName(),
-                                      self.cfg.GetClusterInfo().hvparams,
-                                      node_group_uuids,
-                                      groups_config)
+                                      self.cfg.GetClusterInfo().hvparams)
       else:
         extra_lv_nvinfo = {}
 
@@ -2007,9 +2035,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
         feedback_fn("* Gathering information about the master node")
         vf_nvinfo.update(self.rpc.call_node_verify(
            additional_node_uuids, {key: node_verify_param[key]},
-           self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
-           node_group_uuids,
-           groups_config))
+           self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
       else:
         vf_nvinfo = all_nvinfo
         vf_node_info = self.my_node_info.values()
@@ -2105,6 +2131,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
                         "node is running unknown instance %s", inst_uuid)
 
+        self._VerifyExclusionTags(node_i.name, nimg.pinst, cluster.tags)
+
     self._VerifyGroupDRBDVersion(all_nvinfo)
     self._VerifyGroupLVM(node_image, vg_name)
 
index 1d79a3e..638abd7 100644 (file)
@@ -511,7 +511,6 @@ def AdjustCandidatePool(
     lu.LogInfo("Promoted nodes to master candidate role: %s",
                utils.CommaJoin(node.name for node in mod_list))
     for node in mod_list:
-      lu.context.ReaddNode(node)
       AddNodeCertToCandidateCerts(lu, lu.cfg, node.uuid)
       if modify_ssh_setup:
         AddMasterCandidateSshKey(
index aec9d9f..4b2c6ee 100644 (file)
@@ -448,6 +448,7 @@ class LUInstanceCreate(LogicalUnit):
       node_name_whitelist = self.cfg.GetNodeNames(
         set(self.owned_locks(locking.LEVEL_NODE)) &
         set(self.owned_locks(locking.LEVEL_NODE_RES)))
+      logging.debug("Trying to allocate on nodes %s", node_name_whitelist)
     else:
       node_name_whitelist = None
 
@@ -893,7 +894,9 @@ class LUInstanceCreate(LogicalUnit):
 
     assert (self.owned_locks(locking.LEVEL_NODE) ==
             self.owned_locks(locking.LEVEL_NODE_RES)), \
-      "Node locks differ from node resource locks"
+      ("Node locks differ from node resource locks (%s vs %s)"
+       % (self.owned_locks(locking.LEVEL_NODE),
+          self.owned_locks(locking.LEVEL_NODE_RES)))
 
     #### node related checks
 
index a35e95c..486c14e 100644 (file)
@@ -39,6 +39,7 @@ from ganeti import errors
 from ganeti import ht
 from ganeti import hypervisor
 from ganeti import locking
+from ganeti.masterd import iallocator
 from ganeti import netutils
 from ganeti import objects
 from ganeti import utils
@@ -51,7 +52,8 @@ from ganeti.cmdlib.common import INSTANCE_DOWN, \
   CheckParamsNotGlobal, \
   IsExclusiveStorageEnabledNode, CheckHVParams, CheckOSParams, \
   GetUpdatedParams, CheckInstanceState, ExpandNodeUuidAndName, \
-  IsValidDiskAccessModeCombination, AnnotateDiskParams
+  IsValidDiskAccessModeCombination, AnnotateDiskParams, \
+  CheckIAllocatorOrNode
 from ganeti.cmdlib.instance_storage import CalculateFileStorageDir, \
   CheckDiskExtProvider, CheckNodesFreeDiskPerVG, CheckRADOSFreeSpace, \
   CheckSpindlesExclusiveStorage, ComputeDiskSizePerVG, ComputeDisksInfo, \
@@ -357,10 +359,7 @@ class LUInstanceSetParams(LogicalUnit):
 
       # mirrored template node checks
       if self.op.disk_template in constants.DTS_INT_MIRROR:
-        if not self.op.remote_node:
-          raise errors.OpPrereqError("Changing the disk template to a mirrored"
-                                     " one requires specifying a secondary"
-                                     " node", errors.ECODE_INVAL)
+        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
       elif self.op.remote_node:
         self.LogWarning("Changing the disk template to a non-mirrored one,"
                         " the secondary node will be ignored")
@@ -444,6 +443,12 @@ class LUInstanceSetParams(LogicalUnit):
           ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
                                 self.op.remote_node)
         self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node_uuid)
+      elif self.op.disk_template in constants.DTS_INT_MIRROR:
+        # If we have to find the secondary node for a conversion to DRBD,
+        # close node locks to the whole node group.
+        self.needed_locks[locking.LEVEL_NODE] = \
+          list(self.cfg.GetNodeGroupMembersByNodes(
+            self.needed_locks[locking.LEVEL_NODE]))
     elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
@@ -685,7 +690,8 @@ class LUInstanceSetParams(LogicalUnit):
                                        default_vg, self.op.ext_params)
 
     # mirror node verification
-    if self.op.disk_template in constants.DTS_INT_MIRROR:
+    if self.op.disk_template in constants.DTS_INT_MIRROR \
+        and self.op.remote_node_uuid:
       if self.op.remote_node_uuid == pnode_uuid:
         raise errors.OpPrereqError("Given new secondary node %s is the same"
                                    " as the primary node of the instance" %
@@ -720,7 +726,8 @@ class LUInstanceSetParams(LogicalUnit):
     if not self.op.disk_template in constants.DTS_EXCL_STORAGE:
       # Make sure none of the nodes require exclusive storage
       nodes = [pnode_info]
-      if self.op.disk_template in constants.DTS_INT_MIRROR:
+      if self.op.disk_template in constants.DTS_INT_MIRROR \
+          and self.op.remote_node_uuid:
         assert snode_info
         nodes.append(snode_info)
       has_es = lambda n: IsExclusiveStorageEnabledNode(self.cfg, n)
@@ -742,8 +749,9 @@ class LUInstanceSetParams(LogicalUnit):
           utils.AllDiskOfType(inst_disks, [constants.DT_PLAIN])):
       # for conversions from the 'plain' to the 'drbd' disk template, check
       # only the remote node's capacity
-      req_sizes = ComputeDiskSizePerVG(self.op.disk_template, self.disks_info)
-      CheckNodesFreeDiskPerVG(self, [self.op.remote_node_uuid], req_sizes)
+      if self.op.remote_node_uuid:
+        req_sizes = ComputeDiskSizePerVG(self.op.disk_template, self.disks_info)
+        CheckNodesFreeDiskPerVG(self, [self.op.remote_node_uuid], req_sizes)
     elif self.op.disk_template in constants.DTS_LVM:
       # rest lvm-based capacity checks
       node_uuids = [pnode_uuid]
@@ -1406,6 +1414,24 @@ class LUInstanceSetParams(LogicalUnit):
     """
     feedback_fn("Converting disk template from 'plain' to 'drbd'")
 
+    if not self.op.remote_node_uuid:
+      feedback_fn("Using %s to choose new secondary" % self.op.iallocator)
+
+      req = iallocator.IAReqInstanceAllocateSecondary(
+        name=self.op.instance_name)
+      ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+      ial.Run(self.op.iallocator)
+
+      if not ial.success:
+        raise errors.OpPrereqError("Can's find secondary node using"
+                                   " iallocator %s: %s" %
+                                   (self.op.iallocator, ial.info),
+                                   errors.ECODE_NORES)
+      feedback_fn("%s choose %s as new secondary"
+                  % (self.op.iallocator, ial.result))
+      self.op.remote_node = ial.result
+      self.op.remote_node_uuid = self.cfg.GetNodeInfoByName(ial.result).uuid
+
     pnode_uuid = self.instance.primary_node
     snode_uuid = self.op.remote_node_uuid
     old_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
index c0eccce..210fd97 100644 (file)
@@ -311,8 +311,6 @@ class LUNodeAdd(LogicalUnit):
       result = rpcrunner.call_node_verify_light(
           [node_name], vparams, cname,
           self.cfg.GetClusterInfo().hvparams,
-          {node_name: self.node_group},
-          self.cfg.GetAllNodeGroupsInfoDict()
         )[node_name]
       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
       if errmsgs:
@@ -438,10 +436,7 @@ class LUNodeAdd(LogicalUnit):
     result = self.rpc.call_node_verify(
                node_verifier_uuids, node_verify_param,
                self.cfg.GetClusterName(),
-               self.cfg.GetClusterInfo().hvparams,
-               {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
-               self.cfg.GetAllNodeGroupsInfoDict()
-               )
+               self.cfg.GetClusterInfo().hvparams)
     for verifier in node_verifier_uuids:
       result[verifier].Raise("Cannot communicate with node %s" % verifier)
       nl_payload = result[verifier].payload[constants.NV_NODELIST]
@@ -455,7 +450,6 @@ class LUNodeAdd(LogicalUnit):
     self._InitOpenVSwitch()
 
     if self.op.readd:
-      self.context.ReaddNode(self.new_node)
       RedistributeAncillaryFiles(self)
       # make sure we redistribute the config
       self.cfg.Update(self.new_node, feedback_fn)
@@ -465,7 +459,7 @@ class LUNodeAdd(LogicalUnit):
         result.Warn("Node failed to demote itself from master candidate status",
                     self.LogWarning)
     else:
-      self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
+      self.cfg.AddNode(self.new_node, self.proc.GetECId())
       RedistributeAncillaryFiles(self)
 
     # We create a new certificate even if the node is readded
@@ -868,7 +862,6 @@ class LUNodeSetParams(LogicalUnit):
     # this will trigger job queue propagation or cleanup if the mc
     # flag changed
     if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
-      self.context.ReaddNode(node)
 
       if modify_ssh_setup:
         if self.old_role == self._ROLE_CANDIDATE:
@@ -1591,7 +1584,7 @@ class LUNodeRemove(LogicalUnit):
     AdjustCandidatePool(
         self, [self.node.uuid], master_node, potential_master_candidates,
         feedback_fn, modify_ssh_setup)
-    self.context.RemoveNode(self.cfg, self.node)
+    self.cfg.RemoveNode(self.node.uuid)
 
     # Run post hooks on the node before it's removed
     RunPostHook(self, self.node.name)
index 5ec7c92..167be3d 100644 (file)
@@ -353,6 +353,23 @@ class LUTestJqueue(NoHooksLU):
     return True
 
 
+class LUTestOsParams(NoHooksLU):
+  """Utility LU to test secret OS parameter transmission.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+
+  def Exec(self, feedback_fn):
+    if self.op.osparams_secret:
+      msg = "Secret OS parameters: %s" % self.op.osparams_secret.Unprivate()
+      feedback_fn(msg)
+    else:
+      raise errors.OpExecError("Opcode needs secret parameters")
+
+
 class LUTestAllocator(NoHooksLU):
   """Run allocator tests.
 
index dfe2625..179e08b 100644 (file)
@@ -147,16 +147,6 @@ def _CheckInstanceDiskIvNames(disks):
   return result
 
 
-def _UpdateIvNames(base_idx, disks):
-  """Update the C{iv_name} attribute of disks.
-
-  @type disks: list of L{objects.Disk}
-
-  """
-  for (idx, disk) in enumerate(disks):
-    disk.iv_name = "disk/%s" % (base_idx + idx)
-
-
 class ConfigWriter(object):
   """The interface to the cluster configuration.
 
@@ -308,116 +298,21 @@ class ConfigWriter(object):
     """
     return self._UnlockedGetInstanceDisks(inst_uuid)
 
-  def _UnlockedAddDisk(self, disk, replace=False):
-    """Add a disk to the config.
-
-    @type disk: L{objects.Disk}
-    @param disk: The disk object
-
-    """
+  def AddInstanceDisk(self, inst_uuid, disk, idx=None, replace=False):
+    """Add a disk to the config and attach it to instance."""
     if not isinstance(disk, objects.Disk):
-      raise errors.ProgrammerError("Invalid type passed to _UnlockedAddDisk")
-
-    logging.info("Adding disk %s to configuration", disk.uuid)
+      raise errors.ProgrammerError("Invalid type passed to AddInstanceDisk")
 
-    if replace:
-      self._CheckUUIDpresent(disk)
-    else:
-      self._CheckUniqueUUID(disk, include_temporary=False)
-      disk.serial_no = 1
-      disk.ctime = disk.mtime = time.time()
     disk.UpgradeConfig()
-    self._ConfigData().disks[disk.uuid] = disk
-    self._ConfigData().cluster.serial_no += 1
-    self._UnlockedReleaseDRBDMinors(disk.uuid)
-
-  def _UnlockedAttachInstanceDisk(self, inst_uuid, disk_uuid, idx=None):
-    """Attach a disk to an instance.
-
-    @type inst_uuid: string
-    @param inst_uuid: The UUID of the instance object
-    @type disk_uuid: string
-    @param disk_uuid: The UUID of the disk object
-    @type idx: int
-    @param idx: the index of the newly attached disk; if not
-      passed, the disk will be attached as the last one.
-
-    """
-    instance = self._UnlockedGetInstanceInfo(inst_uuid)
-    if instance is None:
-      raise errors.ConfigurationError("Instance %s doesn't exist"
-                                      % inst_uuid)
-    if disk_uuid not in self._ConfigData().disks:
-      raise errors.ConfigurationError("Disk %s doesn't exist" % disk_uuid)
-
-    if idx is None:
-      idx = len(instance.disks)
-    else:
-      if idx < 0:
-        raise IndexError("Not accepting negative indices other than -1")
-      elif idx > len(instance.disks):
-        raise IndexError("Got disk index %s, but there are only %s" %
-                         (idx, len(instance.disks)))
-
-    # Disk must not be attached anywhere else
-    for inst in self._ConfigData().instances.values():
-      if disk_uuid in inst.disks:
-        raise errors.ReservationError("Disk %s already attached to instance %s"
-                                      % (disk_uuid, inst.name))
-
-    instance.disks.insert(idx, disk_uuid)
-    instance_disks = self._UnlockedGetInstanceDisks(inst_uuid)
-    _UpdateIvNames(idx, instance_disks[idx:])
-    instance.serial_no += 1
-    instance.mtime = time.time()
-
-  @ConfigSync()
-  def AddInstanceDisk(self, inst_uuid, disk, idx=None, replace=False):
-    """Add a disk to the config and attach it to instance.
-
-    This is a simple wrapper over L{_UnlockedAddDisk} and
-    L{_UnlockedAttachInstanceDisk}.
-
-    """
-    self._UnlockedAddDisk(disk, replace=replace)
-    self._UnlockedAttachInstanceDisk(inst_uuid, disk.uuid, idx)
+    utils.SimpleRetry(True, self._wconfd.AddInstanceDisk, 0.1, 30,
+                      args=[inst_uuid, disk.ToDict(), idx, replace])
+    self.OutDate()
 
-  @ConfigSync()
   def AttachInstanceDisk(self, inst_uuid, disk_uuid, idx=None):
-    """Attach an existing disk to an instance.
-
-    This is a simple wrapper over L{_UnlockedAttachInstanceDisk}.
-
-    """
-    self._UnlockedAttachInstanceDisk(inst_uuid, disk_uuid, idx)
-
-  def _UnlockedDetachInstanceDisk(self, inst_uuid, disk_uuid):
-    """Detach a disk from an instance.
-
-    @type inst_uuid: string
-    @param inst_uuid: The UUID of the instance object
-    @type disk_uuid: string
-    @param disk_uuid: The UUID of the disk object
-
-    """
-    instance = self._UnlockedGetInstanceInfo(inst_uuid)
-    if instance is None:
-      raise errors.ConfigurationError("Instance %s doesn't exist"
-                                      % inst_uuid)
-    if disk_uuid not in self._ConfigData().disks:
-      raise errors.ConfigurationError("Disk %s doesn't exist" % disk_uuid)
-
-    # Check if disk is attached to the instance
-    if disk_uuid not in instance.disks:
-      raise errors.ProgrammerError("Disk %s is not attached to an instance"
-                                   % disk_uuid)
-
-    idx = instance.disks.index(disk_uuid)
-    instance.disks.remove(disk_uuid)
-    instance_disks = self._UnlockedGetInstanceDisks(inst_uuid)
-    _UpdateIvNames(idx, instance_disks[idx:])
-    instance.serial_no += 1
-    instance.mtime = time.time()
+    """Attach an existing disk to an instance."""
+    utils.SimpleRetry(True, self._wconfd.AttachInstanceDisk, 0.1, 30,
+                      args=[inst_uuid, disk_uuid, idx])
+    self.OutDate()
 
   def _UnlockedRemoveDisk(self, disk_uuid):
     """Remove the disk from the configuration.
@@ -440,24 +335,17 @@ class ConfigWriter(object):
     del self._ConfigData().disks[disk_uuid]
     self._ConfigData().cluster.serial_no += 1
 
-  @ConfigSync()
   def RemoveInstanceDisk(self, inst_uuid, disk_uuid):
-    """Detach a disk from an instance and remove it from the config.
-
-    This is a simple wrapper over L{_UnlockedDetachInstanceDisk} and
-    L{_UnlockedRemoveDisk}.
-
-    """
-    self._UnlockedDetachInstanceDisk(inst_uuid, disk_uuid)
-    self._UnlockedRemoveDisk(disk_uuid)
+    """Detach a disk from an instance and remove it from the config."""
+    utils.SimpleRetry(True, self._wconfd.RemoveInstanceDisk, 0.1, 30,
+                      args=[inst_uuid, disk_uuid])
+    self.OutDate()
 
-  @ConfigSync()
   def DetachInstanceDisk(self, inst_uuid, disk_uuid):
-    """Detach a disk from an instance.
-
-    This is a simple wrapper over L{_UnlockedDetachInstanceDisk}.
-    """
-    self._UnlockedDetachInstanceDisk(inst_uuid, disk_uuid)
+    """Detach a disk from an instance."""
+    utils.SimpleRetry(True, self._wconfd.DetachInstanceDisk, 0.1, 30,
+                      args=[inst_uuid, disk_uuid])
+    self.OutDate()
 
   def _UnlockedGetDiskInfo(self, disk_uuid):
     """Returns information about a disk.
@@ -730,31 +618,10 @@ class ConfigWriter(object):
     """
     self._wconfd.ReserveMAC(self._GetWConfdContext(), mac)
 
-  def _UnlockedCommitTemporaryIps(self, _ec_id):
-    """Commit all reserved IP address to their respective pools
-
-    """
-    if self._offline:
-      raise errors.ProgrammerError("Can't call CommitTemporaryIps"
-                                   " in offline mode")
-    ips = self._wconfd.ListReservedIps(self._GetWConfdContext())
-    for action, address, net_uuid in ips:
-      self._UnlockedCommitIp(action, net_uuid, address)
-
-  def _UnlockedCommitIp(self, action, net_uuid, address):
-    """Commit a reserved IP address to an IP pool.
-
-    The IP address is taken from the network's IP pool and marked as free.
-
-    """
-    nobj = self._UnlockedGetNetwork(net_uuid)
-    if nobj is None:
-      raise errors.ProgrammerError("Network '%s' not found" % (net_uuid, ))
-    pool = network.AddressPool(nobj)
-    if action == constants.RESERVE_ACTION:
-      pool.Reserve(address)
-    elif action == constants.RELEASE_ACTION:
-      pool.Release(address)
+  @ConfigSync(shared=1)
+  def CommitTemporaryIps(self, _ec_id):
+    """Tell WConfD to commit all temporary ids"""
+    self._wconfd.CommitTemporaryIps(self._GetWConfdContext())
 
   def ReleaseIp(self, net_uuid, address, _ec_id):
     """Give a specific IP address back to an IP pool.
@@ -1154,6 +1021,11 @@ class ConfigWriter(object):
 
     return result
 
+  @ConfigSync(shared=1)
+  def VerifyConfigAndLog(self, feedback_fn=None):
+    """A simple wrapper around L{_UnlockedVerifyConfigAndLog}"""
+    return self._UnlockedVerifyConfigAndLog(feedback_fn=feedback_fn)
+
   def _UnlockedVerifyConfigAndLog(self, feedback_fn=None):
     """Verify the configuration and log any errors.
 
@@ -1194,19 +1066,10 @@ class ConfigWriter(object):
     """
     return self._UnlockedVerifyConfig()
 
-  @ConfigSync()
   def AddTcpUdpPort(self, port):
-    """Adds a new port to the available port pool.
-
-    @warning: this method does not "flush" the configuration (via
-        L{_WriteConfig}); callers should do that themselves once the
-        configuration is stable
-
-    """
-    if not isinstance(port, int):
-      raise errors.ProgrammerError("Invalid type passed for port")
-
-    self._ConfigData().cluster.tcpudp_port_pool.add(port)
+    """Adds a new port to the available port pool."""
+    utils.SimpleRetry(True, self._wconfd.AddTcpUdpPort, 0.1, 30, args=[port])
+    self.OutDate()
 
   @ConfigSync(shared=1)
   def GetPortList(self):
@@ -1215,26 +1078,17 @@ class ConfigWriter(object):
     """
     return self._ConfigData().cluster.tcpudp_port_pool.copy()
 
-  @ConfigSync()
   def AllocatePort(self):
-    """Allocate a port.
-
-    The port will be taken from the available port pool or from the
-    default port range (and in this case we increase
-    highest_used_port).
+    """Allocate a port."""
+    def WithRetry():
+      port = self._wconfd.AllocatePort()
+      self.OutDate()
 
-    """
-    # If there are TCP/IP ports configured, we use them first.
-    if self._ConfigData().cluster.tcpudp_port_pool:
-      port = self._ConfigData().cluster.tcpudp_port_pool.pop()
-    else:
-      port = self._ConfigData().cluster.highest_used_port + 1
-      if port >= constants.LAST_DRBD_PORT:
-        raise errors.ConfigurationError("The highest used port is greater"
-                                        " than %s. Aborting." %
-                                        constants.LAST_DRBD_PORT)
-      self._ConfigData().cluster.highest_used_port = port
-    return port
+      if port is None:
+        raise utils.RetryAgain()
+      else:
+        return port
+    return utils.Retry(WithRetry, 0.1, 30)
 
   @ConfigSync(shared=1)
   def ComputeDRBDMap(self):
@@ -1283,7 +1137,7 @@ class ConfigWriter(object):
                   node_uuids, result)
     return result
 
-  def _UnlockedReleaseDRBDMinors(self, disk_uuid):
+  def ReleaseDRBDMinors(self, disk_uuid):
     """Release temporary drbd minors allocated for a given disk.
 
     This is just a wrapper over a call to WConfd.
@@ -1300,22 +1154,6 @@ class ConfigWriter(object):
     if not self._offline:
       self._wconfd.ReleaseDRBDMinors(disk_uuid)
 
-  @ConfigSync()
-  def ReleaseDRBDMinors(self, disk_uuid):
-    """Release temporary drbd minors allocated for a given disk.
-
-    This should be called on the error paths, on the success paths
-    it's automatically called by the ConfigWriter add and update
-    functions.
-
-    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
-
-    @type disk_uuid: string
-    @param disk_uuid: the disk for which temporary minors should be released
-
-    """
-    self._UnlockedReleaseDRBDMinors(disk_uuid)
-
   @ConfigSync(shared=1)
   def GetInstanceDiskTemplate(self, inst_uuid):
     """Return the disk template of an instance.
@@ -1767,23 +1605,12 @@ class ConfigWriter(object):
     if not isinstance(instance, objects.Instance):
       raise errors.ProgrammerError("Invalid type passed to AddInstance")
 
-    all_macs = self._AllMACs()
-    for nic in instance.nics:
-      if nic.mac in all_macs:
-        raise errors.ConfigurationError("Cannot add instance %s:"
-                                        " MAC address '%s' already in use." %
-                                        (instance.name, nic.mac))
-
-    if replace:
-      self._CheckUUIDpresent(instance)
-    else:
-      self._CheckUniqueUUID(instance, include_temporary=False)
-
     instance.serial_no = 1
-    instance.ctime = instance.mtime = time.time()
 
     utils.SimpleRetry(True, self._wconfd.AddInstance, 0.1, 30,
-                      args=[instance.ToDict(), self._GetWConfdContext()])
+                      args=[instance.ToDict(),
+                            self._GetWConfdContext(),
+                            replace])
     self.OutDate()
 
   def _EnsureUUID(self, item, ec_id):
@@ -1834,32 +1661,17 @@ class ConfigWriter(object):
     @return: the updated instance object
 
     """
-    if inst_uuid not in self._ConfigData().instances:
-      raise errors.ConfigurationError("Unknown instance '%s'" %
-                                      inst_uuid)
-    instance = self._ConfigData().instances[inst_uuid]
-
-    if status is None:
-      status = instance.admin_state
-    if disks_active is None:
-      disks_active = instance.disks_active
-    if admin_state_source is None:
-      admin_state_source = instance.admin_state_source
-
-    assert status in constants.ADMINST_ALL, \
-           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
-
-    if instance.admin_state != status or \
-       instance.disks_active != disks_active or \
-       instance.admin_state_source != admin_state_source:
-      instance.admin_state = status
-      instance.disks_active = disks_active
-      instance.admin_state_source = admin_state_source
-      instance.serial_no += 1
-      instance.mtime = time.time()
-    return instance
+    def WithRetry():
+      result = self._wconfd.SetInstanceStatus(inst_uuid, status,
+                                              disks_active, admin_state_source)
+      self.OutDate()
+
+      if result is None:
+        raise utils.RetryAgain()
+      else:
+        return result
+    return objects.Instance.FromDict(utils.Retry(WithRetry, 0.1, 30))
 
-  @ConfigSync()
   def MarkInstanceUp(self, inst_uuid):
     """Mark the instance status to up in the config.
 
@@ -1872,7 +1684,6 @@ class ConfigWriter(object):
     return self._SetInstanceStatus(inst_uuid, constants.ADMINST_UP, True,
                                    constants.ADMIN_SOURCE)
 
-  @ConfigSync()
   def MarkInstanceOffline(self, inst_uuid):
     """Mark the instance status to down in the config.
 
@@ -1885,32 +1696,13 @@ class ConfigWriter(object):
     return self._SetInstanceStatus(inst_uuid, constants.ADMINST_OFFLINE, False,
                                    constants.ADMIN_SOURCE)
 
-  @ConfigSync()
   def RemoveInstance(self, inst_uuid):
     """Remove the instance from the configuration.
 
     """
-    if inst_uuid not in self._ConfigData().instances:
-      raise errors.ConfigurationError("Unknown instance '%s'" % inst_uuid)
-
-    # If a network port has been allocated to the instance,
-    # return it to the pool of free ports.
-    inst = self._ConfigData().instances[inst_uuid]
-    network_port = getattr(inst, "network_port", None)
-    if network_port is not None:
-      self._ConfigData().cluster.tcpudp_port_pool.add(network_port)
-
-    instance = self._UnlockedGetInstanceInfo(inst_uuid)
-
-    # FIXME: After RemoveInstance is moved to WConfd, use its internal
-    # function from TempRes module.
-    for nic in instance.nics:
-      if nic.network and nic.ip:
-        # Return all IP addresses to the respective address pools
-        self._UnlockedCommitIp(constants.RELEASE_ACTION, nic.network, nic.ip)
-
-    del self._ConfigData().instances[inst_uuid]
-    self._ConfigData().cluster.serial_no += 1
+    utils.SimpleRetry(True, self._wconfd.RemoveInstance, 0.1, 30,
+                      args=[inst_uuid])
+    self.OutDate()
 
   @ConfigSync()
   def RenameInstance(self, inst_uuid, new_name):
@@ -1939,7 +1731,6 @@ class ConfigWriter(object):
     # Force update of ssconf files
     self._ConfigData().cluster.serial_no += 1
 
-  @ConfigSync()
   def MarkInstanceDown(self, inst_uuid):
     """Mark the status of an instance to down in the configuration.
 
@@ -1953,7 +1744,6 @@ class ConfigWriter(object):
     return self._SetInstanceStatus(inst_uuid, constants.ADMINST_DOWN, None,
                                    constants.ADMIN_SOURCE)
 
-  @ConfigSync()
   def MarkInstanceUserDown(self, inst_uuid):
     """Mark the status of an instance to user down in the configuration.
 
@@ -1965,7 +1755,6 @@ class ConfigWriter(object):
     self._SetInstanceStatus(inst_uuid, constants.ADMINST_DOWN, None,
                             constants.USER_SOURCE)
 
-  @ConfigSync()
   def MarkInstanceDisksActive(self, inst_uuid):
     """Mark the status of instance disks active.
 
@@ -1975,7 +1764,6 @@ class ConfigWriter(object):
     """
     return self._SetInstanceStatus(inst_uuid, None, True, None)
 
-  @ConfigSync()
   def MarkInstanceDisksInactive(self, inst_uuid):
     """Mark the status of instance disks inactive.
 
@@ -2195,7 +1983,6 @@ class ConfigWriter(object):
     """
     return self._UnlockedGetInstanceNames(inst_uuids)
 
-  @ConfigSync()
   def SetInstancePrimaryNode(self, inst_uuid, target_node_uuid):
     """Sets the primary node of an existing instance
 
@@ -2205,7 +1992,9 @@ class ConfigWriter(object):
     @type target_node_uuid: string
 
     """
-    self._UnlockedGetInstanceInfo(inst_uuid).primary_node = target_node_uuid
+    utils.SimpleRetry(True, self._wconfd.SetInstancePrimaryNode, 0.1, 30,
+                      args=[inst_uuid, target_node_uuid])
+    self.OutDate()
 
   @ConfigSync()
   def SetDiskNodes(self, disk_uuid, nodes):
@@ -3202,7 +2991,6 @@ class ConfigWriter(object):
     """
     return DetachedConfig(self._ConfigData())
 
-  @ConfigSync()
   def Update(self, target, feedback_fn, ec_id=None):
     """Notify function to be called after updates.
 
@@ -3218,60 +3006,57 @@ class ConfigWriter(object):
     @param feedback_fn: Callable feedback function
 
     """
-    if self._ConfigData() is None:
-      raise errors.ProgrammerError("Configuration file not read,"
-                                   " cannot save.")
-
-    def check_serial(target, current):
-      if current is None:
-        raise errors.ConfigurationError("Configuration object unknown")
-      elif current.serial_no != target.serial_no:
-        raise errors.ConfigurationError("Configuration object updated since"
-                                        " it has been read: %d != %d",
-                                        current.serial_no, target.serial_no)
 
-    def replace_in(target, tdict):
-      check_serial(target, tdict.get(target.uuid))
-      tdict[target.uuid] = target
-
-    update_serial = False
+    update_function = None
     if isinstance(target, objects.Cluster):
-      check_serial(target, self._ConfigData().cluster)
-      self._ConfigData().cluster = target
+      if self._offline:
+        self.UpdateOfflineCluster(target, feedback_fn)
+        return
+      else:
+        update_function = self._wconfd.UpdateCluster
     elif isinstance(target, objects.Node):
-      replace_in(target, self._ConfigData().nodes)
-      update_serial = True
+      update_function = self._wconfd.UpdateNode
     elif isinstance(target, objects.Instance):
-      replace_in(target, self._ConfigData().instances)
+      update_function = self._wconfd.UpdateInstance
     elif isinstance(target, objects.NodeGroup):
-      replace_in(target, self._ConfigData().nodegroups)
+      update_function = self._wconfd.UpdateNodeGroup
     elif isinstance(target, objects.Network):
-      replace_in(target, self._ConfigData().networks)
+      update_function = self._wconfd.UpdateNetwork
     elif isinstance(target, objects.Disk):
-      replace_in(target, self._ConfigData().disks)
+      update_function = self._wconfd.UpdateDisk
     else:
       raise errors.ProgrammerError("Invalid object type (%s) passed to"
                                    " ConfigWriter.Update" % type(target))
-    target.serial_no += 1
-    target.mtime = now = time.time()
 
-    if update_serial:
-      # for node updates, we need to increase the cluster serial too
-      self._ConfigData().cluster.serial_no += 1
-      self._ConfigData().cluster.mtime = now
+    def WithRetry():
+      result = update_function(target.ToDict())
+      self.OutDate()
 
-    if isinstance(target, objects.Disk):
-      self._UnlockedReleaseDRBDMinors(target.uuid)
+      if result is None:
+        raise utils.RetryAgain()
+      else:
+        return result
+    vals = utils.Retry(WithRetry, 0.1, 30)
+    self.OutDate()
+    target.serial_no = vals[0]
+    target.mtime = float(vals[1])
 
     if ec_id is not None:
       # Commit all ips reserved by OpInstanceSetParams and OpGroupSetParams
       # FIXME: After RemoveInstance is moved to WConfd, use its internal
       # functions from TempRes module.
-      self._UnlockedCommitTemporaryIps(ec_id)
+      self.CommitTemporaryIps(ec_id)
 
     # Just verify the configuration with our feedback function.
     # It will get written automatically by the decorator.
-    self._UnlockedVerifyConfigAndLog(feedback_fn=feedback_fn)
+    self.VerifyConfigAndLog(feedback_fn=feedback_fn)
+
+  @ConfigSync()
+  def UpdateOfflineCluster(self, target, feedback_fn):
+    self._ConfigData().cluster = target
+    target.serial_no += 1
+    target.mtime = time.time()
+    self.VerifyConfigAndLog(feedback_fn=feedback_fn)
 
   def _UnlockedDropECReservations(self, _ec_id):
     """Drop per-execution-context reservations
index cff0d84..3fff5d5 100644 (file)
@@ -142,58 +142,6 @@ class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
     return False
 
 
-class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
-  """A stream server to use with asyncore.
-
-  Each request is accepted, and then dispatched to a separate asyncore
-  dispatcher to handle.
-
-  """
-
-  _REQUEST_QUEUE_SIZE = 5
-
-  def __init__(self, family, address):
-    """Constructor for AsyncStreamServer
-
-    @type family: integer
-    @param family: socket family (one of socket.AF_*)
-    @type address: address family dependent
-    @param address: address to bind the socket to
-
-    """
-    GanetiBaseAsyncoreDispatcher.__init__(self)
-    self.family = family
-    self.create_socket(self.family, socket.SOCK_STREAM)
-    self.set_reuse_addr()
-    self.bind(address)
-    self.listen(self._REQUEST_QUEUE_SIZE)
-
-  # this method is overriding an asyncore.dispatcher method
-  def handle_accept(self):
-    """Accept a new client connection.
-
-    Creates a new instance of the handler class, which will use asyncore to
-    serve the client.
-
-    """
-    accept_result = utils.IgnoreSignals(self.accept)
-    if accept_result is not None:
-      connected_socket, client_address = accept_result
-      if self.family == socket.AF_UNIX:
-        # override the client address, as for unix sockets nothing meaningful
-        # is passed in from accept anyway
-        client_address = netutils.GetSocketCredentials(connected_socket)
-      logging.info("Accepted connection from %s",
-                   netutils.FormatAddress(client_address, family=self.family))
-      self.handle_connection(connected_socket, client_address)
-
-  def handle_connection(self, connected_socket, client_address):
-    """Handle an already accepted connection.
-
-    """
-    raise NotImplementedError
-
-
 class AsyncTerminatedMessageStream(asynchat.async_chat):
   """A terminator separated message stream asyncore module.
 
index 3a194e0..edadc3b 100644 (file)
--- a/lib/ht.py
+++ b/lib/ht.py
@@ -442,6 +442,20 @@ def TPrivate(val_type):
   return desc(fn)
 
 
+def TSecret(val_type):
+  """Checks if a given value is an instance of Private.
+
+  However, the type is named Secret in the Haskell equivalent.
+
+  """
+  def fn(val):
+    return isinstance(val, Private) and val_type(val.Get())
+
+  desc = WithDesc("Private %s" % Parens(val_type))
+
+  return desc(fn)
+
+
 def TListOf(my_type):
   """Checks if a given value is a list with all elements of the same type.
 
@@ -637,6 +651,7 @@ def TStorageType(val):
 TTagKind = TElemOf(constants.VALID_TAG_TYPES)
 TDdmSimple = TElemOf(constants.DDMS_VALUES)
 TVerifyOptionalChecks = TElemOf(constants.VERIFY_OPTIONAL_CHECKS)
+TSshKeyType = TElemOf(constants.SSHK_ALL)
 
 
 @WithDesc("IPv4 network")
index 4df0246..1e89c82 100644 (file)
@@ -109,6 +109,53 @@ _RUNTIME_ENTRY = {
   constants.HOTPLUG_TARGET_DISK: lambda d, e: (d, e[0], e[1])
   }
 
+_DEVICE_TYPE = {
+  constants.HOTPLUG_TARGET_NIC: lambda hvp: hvp[constants.HV_NIC_TYPE],
+  constants.HOTPLUG_TARGET_DISK: lambda hvp: hvp[constants.HV_DISK_TYPE],
+  }
+
+_DEVICE_DRIVER = {
+  constants.HOTPLUG_TARGET_NIC:
+    lambda ht: "virtio-net-pci" if ht == constants.HT_NIC_PARAVIRTUAL else ht,
+  constants.HOTPLUG_TARGET_DISK:
+    lambda ht: "virtio-blk-pci" if ht == constants.HT_DISK_PARAVIRTUAL else ht,
+  }
+
+
+# NICs and paravirtual disks
+# show up as devices on the PCI bus (one slot per device).
+# SCSI disks will be placed on the SCSI bus.
+_DEVICE_BUS = {
+  constants.HOTPLUG_TARGET_NIC:
+    lambda _: _PCI_BUS,
+  constants.HOTPLUG_TARGET_DISK:
+    lambda ht: _SCSI_BUS if ht in constants.HT_SCSI_DEVICE_TYPES else _PCI_BUS
+  }
+
+_HOTPLUGGABLE_DEVICE_TYPES = {
+  # All available NIC types except for ne2k_isa
+  constants.HOTPLUG_TARGET_NIC: [
+    constants.HT_NIC_E1000,
+    constants.HT_NIC_I82551,
+    constants.HT_NIC_I8259ER,
+    constants.HT_NIC_I85557B,
+    constants.HT_NIC_NE2K_PCI,
+    constants.HT_NIC_PARAVIRTUAL,
+    constants.HT_NIC_PCNET,
+    constants.HT_NIC_RTL8139,
+    ],
+  constants.HOTPLUG_TARGET_DISK: [
+    constants.HT_DISK_PARAVIRTUAL,
+    constants.HT_DISK_SCSI_BLOCK,
+    constants.HT_DISK_SCSI_GENERIC,
+    constants.HT_DISK_SCSI_HD,
+    constants.HT_DISK_SCSI_CD,
+    ]
+  }
+
+_PCI_BUS = "pci.0"
+_SCSI_BUS = "scsi.0"
+
 _MIGRATION_CAPS_DELIM = ":"
 
 
@@ -155,23 +202,93 @@ def _GetDriveURI(disk, link, uri):
 def _GenerateDeviceKVMId(dev_type, dev):
   """Helper function to generate a unique device name used by KVM
 
-  QEMU monitor commands use names to identify devices. Here we use their pci
-  slot and a part of their UUID to name them. dev.pci might be None for old
-  devices in the cluster.
+  QEMU monitor commands use names to identify devices. Since the UUID
+  is too long for a device ID (36 chars vs. 30), we choose to use
+  only the part until the third '-' with a disk/nic prefix.
+  For example if a disk has UUID '932df160-7a22-4067-a566-7e0ca8386133'
+  the resulting device ID would be 'disk-932df160-7a22-4067'.
 
-  @type dev_type: sting
-  @param dev_type: device type of param dev
+  @type dev_type: string
+  @param dev_type: device type of param dev (HOTPLUG_TARGET_DISK|NIC)
   @type dev: L{objects.Disk} or L{objects.NIC}
   @param dev: the device object for which we generate a kvm name
-  @raise errors.HotplugError: in case a device has no pci slot (old devices)
 
   """
+  return "%s-%s" % (dev_type.lower(), dev.uuid.rsplit("-", 2)[0])
 
-  if not dev.pci:
-    raise errors.HotplugError("Hotplug is not supported for %s with UUID %s" %
-                              (dev_type, dev.uuid))
 
-  return "%s-%s-pci-%d" % (dev_type.lower(), dev.uuid.split("-")[0], dev.pci)
+def _GenerateDeviceHVInfoStr(hvinfo):
+  """Construct the -device option string for hvinfo dict
+
+  PV disk: virtio-blk-pci,id=disk-1234,bus=pci.0,addr=0x9
+  PV NIC:  virtio-net-pci,id=nic-1234,bus=pci.0,addr=0x9
+  SG disk: scsi-generic,id=disk-1234,bus=scsi.0,channel=0,scsi-id=1,lun=0
+
+  @type hvinfo: dict
+  @param hvinfo: dictionary created by _GenerateDeviceHVInfo()
+
+  @rtype: string
+  @return: The constructed string to be passed along with a -device option
+
+  """
+
+  # work on a copy
+  d = dict(hvinfo)
+  hvinfo_str = d.pop("driver")
+  for k, v in d.items():
+    hvinfo_str += ",%s=%s" % (k, v)
+
+  return hvinfo_str
+
+
+def _GenerateDeviceHVInfo(dev_type, kvm_devid, hv_dev_type, bus_slots):
+  """Helper function to generate hvinfo of a device (disk, NIC)
+
+  hvinfo will hold all necessary info for generating the -device QEMU option.
+  We have two main buses: a PCI bus and a SCSI bus (created by a SCSI
+  controller on the PCI bus).
+
+  In case of PCI devices we add them on a free PCI slot (addr) on the first PCI
+  bus (pci.0), and in case of SCSI devices we decide to put each disk on a
+  different SCSI target (scsi-id) on the first SCSI bus (scsi.0).
+
+  @type dev_type: string
+  @param dev_type: either HOTPLUG_TARGET_DISK or HOTPLUG_TARGET_NIC
+  @type kvm_devid: string
+  @param kvm_devid: the id of the device
+  @type hv_dev_type: string
+  @param hv_dev_type: either disk_type or nic_type hvparam
+  @type bus_slots: dict
+  @param bus_slots: the current slots of the first PCI and SCSI buses
+
+  @rtype: dict
+  @return: dict including all necessary info (driver, id, bus and bus location)
+           for generating a -device QEMU option for either a disk or a NIC
+
+  """
+  driver = _DEVICE_DRIVER[dev_type](hv_dev_type)
+  bus = _DEVICE_BUS[dev_type](hv_dev_type)
+  slots = bus_slots[bus]
+  slot = utils.GetFreeSlot(slots, reserve=True)
+
+  hvinfo = {
+    "driver": driver,
+    "id": kvm_devid,
+    "bus": bus,
+    }
+
+  if bus == _PCI_BUS:
+    hvinfo.update({
+      "addr": hex(slot),
+      })
+  elif bus == _SCSI_BUS:
+    hvinfo.update({
+      "channel": 0,
+      "scsi-id": slot,
+      "lun": 0,
+      })
+
+  return hvinfo
 
 
 def _GetExistingDeviceInfo(dev_type, device, runtime):
@@ -187,8 +304,7 @@ def _GetExistingDeviceInfo(dev_type, device, runtime):
   @type runtime: tuple (cmd, nics, hvparams, disks)
   @param runtime: the runtime data to search for the device
   @raise errors.HotplugError: in case the requested device does not
-    exist (e.g. device has been added without --hotplug option) or
-    device info has not pci slot (e.g. old devices in the cluster)
+    exist (e.g. device has been added without --hotplug option)
 
   """
   index = _DEVICE_RUNTIME_INDEX[dev_type]
@@ -220,10 +336,38 @@ def _UpgradeSerializedRuntime(serialized_runtime):
   else:
     serialized_disks = []
 
+  def update_hvinfo(dev, dev_type):
+    """ Remove deprecated pci slot and substitute it with hvinfo """
+    if "hvinfo" not in dev:
+      dev["hvinfo"] = {}
+      uuid = dev["uuid"]
+      # Ganeti used to save the PCI slot of paravirtual devices
+      # (virtio-blk-pci, virtio-net-pci) in runtime files during
+      # _GenerateKVMRuntime() and HotAddDevice().
+      # In this case we had a -device QEMU option in the command line with id,
+      # drive|netdev, bus, and addr params. All other devices did not have an
+      # id nor placed explicitly on a bus.
+      # hot- prefix is removed in 2.16. Here we add it explicitly to
+      # handle old instances in the cluster properly.
+      if "pci" in dev:
+        # This is practically the old _GenerateDeviceKVMId()
+        dev["hvinfo"]["id"] = "hot%s-%s-%s-%s" % (dev_type.lower(),
+                                                  uuid.split("-")[0],
+                                                  "pci",
+                                                  dev["pci"])
+        dev["hvinfo"]["addr"] = hex(dev["pci"])
+        dev["hvinfo"]["bus"] = _PCI_BUS
+        del dev["pci"]
+
   for nic in serialized_nics:
     # Add a dummy uuid slot if an pre-2.8 NIC is found
     if "uuid" not in nic:
       nic["uuid"] = utils.NewUUID()
+    update_hvinfo(nic, constants.HOTPLUG_TARGET_NIC)
+
+  for disk_entry in serialized_disks:
+    # We have a (Disk, link, uri) tuple
+    update_hvinfo(disk_entry[0], constants.HOTPLUG_TARGET_DISK)
 
   return kvm_cmd, serialized_nics, hvparams, serialized_disks
 
@@ -332,6 +476,8 @@ class KVMHypervisor(hv_base.BaseHypervisor):
       hv_base.ParamInSet(True, constants.HT_KVM_VALID_NIC_TYPES),
     constants.HV_DISK_TYPE:
       hv_base.ParamInSet(True, constants.HT_KVM_VALID_DISK_TYPES),
+    constants.HV_KVM_SCSI_CONTROLLER_TYPE:
+      hv_base.ParamInSet(True, constants.HT_KVM_VALID_SCSI_CONTROLLER_TYPES),
     constants.HV_KVM_CDROM_DISK_TYPE:
       hv_base.ParamInSet(False, constants.HT_KVM_VALID_DISK_TYPES),
     constants.HV_USB_MOUSE:
@@ -369,6 +515,11 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     constants.HV_KVM_EXTRA: hv_base.NO_CHECK,
     constants.HV_KVM_MACHINE_VERSION: hv_base.NO_CHECK,
     constants.HV_KVM_MIGRATION_CAPS: hv_base.NO_CHECK,
+    constants.HV_KVM_PCI_RESERVATIONS:
+      (False, lambda x: (x >= 0 and x <= constants.QEMU_PCI_SLOTS),
+       "The number of PCI slots managed by QEMU (max: %s)" %
+       constants.QEMU_PCI_SLOTS,
+       None, None),
     constants.HV_VNET_HDR: hv_base.NO_CHECK,
     }
 
@@ -405,8 +556,9 @@ class KVMHypervisor(hv_base.BaseHypervisor):
   _NETDEV_RE = re.compile(r"^-netdev\s", re.M)
   _DISPLAY_RE = re.compile(r"^-display\s", re.M)
   _MACHINE_RE = re.compile(r"^-machine\s", re.M)
-  _VIRTIO_NET_RE = re.compile(r"^name \"%s\"" % _VIRTIO_NET_PCI, re.M)
-  _VIRTIO_BLK_RE = re.compile(r"^name \"%s\"" % _VIRTIO_BLK_PCI, re.M)
+  _DEVICE_DRIVER_SUPPORTED = \
+    staticmethod(lambda drv, devlist:
+                 re.compile(r"^name \"%s\"" % drv, re.M).search(devlist))
   # match  -drive.*boot=on|off on different lines, but in between accept only
   # dashes not preceeded by a new line (which would mean another option
   # different than -drive is starting)
@@ -418,8 +570,25 @@ class KVMHypervisor(hv_base.BaseHypervisor):
   _INFO_VERSION_CMD = "info version"
 
   # Slot 0 for Host bridge, Slot 1 for ISA bridge, Slot 2 for VGA controller
-  _DEFAULT_PCI_RESERVATIONS = "11100000000000000000000000000000"
-  _SOUNDHW_WITH_PCI_SLOT = ["ac97", "es1370", "hda"]
+  # and the rest up to slot 11 will be used by QEMU implicitly.
+  # Ganeti will add disks and NICs from slot 12 onwards.
+  # NOTE: This maps to the default PCI bus created by pc machine type
+  # by default (pci.0). The q35 creates a PCIe bus that is not hotpluggable
+  # and should be handled differently (pcie.0).
+  # NOTE: This bitarray here is defined for more fine-grained control.
+  # Currently the number of slots is QEMU_PCI_SLOTS and the reserved
+  # ones are the first QEMU_DEFAULT_PCI_RESERVATIONS.
+  # If the above constants change without updating _DEFAULT_PCI_RESERVATIONS
+  # properly, TestGenerateDeviceHVInfo() will probably break.
+  _DEFAULT_PCI_RESERVATIONS = "11111111111100000000000000000000"
+  # The SCSI bus is created on demand or automatically and is empty.
+  # For simplicity we decide to use a different target (scsi-id)
+  # for each SCSI disk. Here we support 16 SCSI disks which is
+  # actually the current hard limit (constants.MAX_DISKS).
+  # NOTE: Max device counts depend on the SCSI controller type;
+  # Just for the record, lsi supports up to 7, megasas 64,
+  # and virtio-scsi-pci 255.
+  _DEFAULT_SCSI_RESERVATIONS = "0000000000000000"
 
   ANCILLARY_FILES = [
     _KVM_NETWORK_SCRIPT,
@@ -898,19 +1067,23 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     needs_boot_flag = self._BOOT_RE.search(kvmhelp)
 
     dev_opts = []
-    device_driver = None
     disk_type = up_hvp[constants.HV_DISK_TYPE]
+    # paravirtual implies either '-device virtio-blk-pci... -drive if=none...'
+    # for new QEMU versions or '-drive if=virtio' for old QEMU versions
     if disk_type == constants.HT_DISK_PARAVIRTUAL:
-      if_val = ",if=%s" % self._VIRTIO
-      try:
-        if self._VIRTIO_BLK_RE.search(devlist):
-          if_val = ",if=none"
-          # will be passed in -device option as driver
-          device_driver = self._VIRTIO_BLK_PCI
-      except errors.HypervisorError, _:
-        pass
+      driver = self._VIRTIO_BLK_PCI
+      iface = self._VIRTIO
+    else:
+      driver = iface = disk_type
+
+    # Check if a specific driver is supported by QEMU device model.
+    if self._DEVICE_DRIVER_SUPPORTED(driver, devlist):
+      if_val = ",if=none" # for the -drive option
+      device_driver = driver # for the -device option
     else:
-      if_val = ",if=%s" % disk_type
+      if_val = ",if=%s" % iface # for the -drive option
+      device_driver = None # without -device option
+
     # AIO mode
     aio_mode = up_hvp[constants.HV_KVM_DISK_AIO]
     if aio_mode == constants.HT_KVM_AIO_NATIVE:
@@ -947,16 +1120,16 @@ class KVMHypervisor(hv_base.BaseHypervisor):
       drive_val = "file=%s,format=raw%s%s%s%s" % \
                   (drive_uri, if_val, boot_val, cache_val, aio_val)
 
-      if device_driver:
-        # kvm_disks are the 4th entry of runtime file that did not exist in
-        # the past. That means that cfdev should always have pci slot and
-        # _GenerateDeviceKVMId() will not raise a exception.
-        kvm_devid = _GenerateDeviceKVMId(constants.HOTPLUG_TARGET_DISK, cfdev)
-        drive_val += (",id=%s" % kvm_devid)
-        drive_val += (",bus=0,unit=%d" % cfdev.pci)
-        dev_val = ("%s,drive=%s,id=%s" %
-                   (device_driver, kvm_devid, kvm_devid))
-        dev_val += ",bus=pci.0,addr=%s" % hex(cfdev.pci)
+      # virtio-blk-pci case
+      if device_driver is not None:
+        # hvinfo will exist for paravirtual devices either due to
+        # _UpgradeSerializedRuntime() for old instances or due to
+        # _GenerateKVMRuntime() for new instances.
+        kvm_devid = cfdev.hvinfo["id"]
+        drive_val += ",id=%s" % kvm_devid
+        # Add driver, id, bus, and addr or channel, scsi-id, lun if any.
+        dev_val = _GenerateDeviceHVInfoStr(cfdev.hvinfo)
+        dev_val += ",drive=%s" % kvm_devid
         dev_opts.extend(["-device", dev_val])
 
       dev_opts.extend(["-drive", drive_val])
@@ -1062,26 +1235,23 @@ class KVMHypervisor(hv_base.BaseHypervisor):
 
     kvm_cmd.extend(["-pidfile", pidfile])
 
-    pci_reservations = bitarray(self._DEFAULT_PCI_RESERVATIONS)
+    bus_slots = self._GetBusSlots(hvp)
 
     # As requested by music lovers
     if hvp[constants.HV_SOUNDHW]:
       soundhw = hvp[constants.HV_SOUNDHW]
-      # For some reason only few sound devices require a PCI slot
-      # while the Audio controller *must* be in slot 3.
-      # That's why we bridge this option early in command line
-      if soundhw in self._SOUNDHW_WITH_PCI_SLOT:
-        _ = utils.GetFreeSlot(pci_reservations, reserve=True)
       kvm_cmd.extend(["-soundhw", soundhw])
 
-    if hvp[constants.HV_DISK_TYPE] == constants.HT_DISK_SCSI:
-      # The SCSI controller requires another PCI slot.
-      _ = utils.GetFreeSlot(pci_reservations, reserve=True)
+    if hvp[constants.HV_DISK_TYPE] in constants.HT_SCSI_DEVICE_TYPES:
+      # In case a SCSI disk is given, QEMU adds a SCSI contorller
+      # (LSI Logic / Symbios Logic 53c895a) implicitly.
+      # Here, we add the controller explicitly with the default id.
+      kvm_cmd.extend([
+        "-device",
+        "%s,id=scsi" % hvp[constants.HV_KVM_SCSI_CONTROLLER_TYPE]
+        ])
 
-    # Add id to ballon and place to the first available slot (3 or 4)
-    addr = utils.GetFreeSlot(pci_reservations, reserve=True)
-    pci_info = ",bus=pci.0,addr=%s" % hex(addr)
-    kvm_cmd.extend(["-balloon", "virtio,id=balloon%s" % pci_info])
+    kvm_cmd.extend(["-balloon", "virtio"])
     kvm_cmd.extend(["-daemonize"])
     if not instance.hvparams[constants.HV_ACPI]:
       kvm_cmd.extend(["-no-acpi"])
@@ -1317,9 +1487,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
       else:
         # Enable the spice agent communication channel between the host and the
         # agent.
-        addr = utils.GetFreeSlot(pci_reservations, reserve=True)
-        pci_info = ",bus=pci.0,addr=%s" % hex(addr)
-        kvm_cmd.extend(["-device", "virtio-serial-pci,id=spice%s" % pci_info])
+        kvm_cmd.extend(["-device", "virtio-serial-pci,id=spice"])
         kvm_cmd.extend([
           "-device",
           "virtserialport,chardev=spicechannel0,name=com.redhat.spice.0",
@@ -1366,14 +1534,21 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     if hvp[constants.HV_KVM_EXTRA]:
       kvm_cmd.extend(hvp[constants.HV_KVM_EXTRA].split(" "))
 
+    def _generate_kvm_device(dev_type, dev):
+      """Helper for generating a kvm device out of a Ganeti device."""
+      kvm_devid = _GenerateDeviceKVMId(dev_type, dev)
+      hv_dev_type = _DEVICE_TYPE[dev_type](hvp)
+      dev.hvinfo = _GenerateDeviceHVInfo(dev_type, kvm_devid,
+                                         hv_dev_type, bus_slots)
+
     kvm_disks = []
     for disk, link_name, uri in block_devices:
-      disk.pci = utils.GetFreeSlot(pci_reservations, disk.pci, True)
+      _generate_kvm_device(constants.HOTPLUG_TARGET_DISK, disk)
       kvm_disks.append((disk, link_name, uri))
 
     kvm_nics = []
     for nic in instance.nics:
-      nic.pci = utils.GetFreeSlot(pci_reservations, nic.pci, True)
+      _generate_kvm_device(constants.HOTPLUG_TARGET_NIC, nic)
       kvm_nics.append(nic)
 
     hvparams = hvp
@@ -1496,15 +1671,13 @@ class KVMHypervisor(hv_base.BaseHypervisor):
       }
     update_features = {}
     if nic_type == constants.HT_NIC_PARAVIRTUAL:
-      nic_model = self._VIRTIO
-      try:
-        if self._VIRTIO_NET_RE.search(devlist):
-          nic_model = self._VIRTIO_NET_PCI
-          update_features["vnet_hdr"] = up_hvp[constants.HV_VNET_HDR]
-      except errors.HypervisorError, _:
+      if self._DEVICE_DRIVER_SUPPORTED(self._VIRTIO_NET_PCI, devlist):
+        nic_model = self._VIRTIO_NET_PCI
+        update_features["vnet_hdr"] = up_hvp[constants.HV_VNET_HDR]
+      else:
         # Older versions of kvm don't support DEVICE_LIST, but they don't
         # have new virtio syntax either.
-        pass
+        nic_model = self._VIRTIO
 
       if up_hvp[constants.HV_VHOST_NET]:
         # Check for vhost_net support.
@@ -1620,16 +1793,14 @@ class KVMHypervisor(hv_base.BaseHypervisor):
           vhostfd = ""
 
         if kvm_supports_netdev:
-          nic_val = "%s,mac=%s" % (nic_model, nic.mac)
-          try:
-            # kvm_nics already exist in old runtime files and thus there might
-            # be some entries without pci slot (therefore try: except:)
-            kvm_devid = _GenerateDeviceKVMId(constants.HOTPLUG_TARGET_NIC, nic)
-            netdev = kvm_devid
-            nic_val += (",id=%s,bus=pci.0,addr=%s" % (kvm_devid, hex(nic.pci)))
-          except errors.HotplugError:
+          # Non paravirtual NICs hvinfo is empty
+          if "id" in nic.hvinfo:
+            nic_val = _GenerateDeviceHVInfoStr(nic.hvinfo)
+            netdev = nic.hvinfo["id"]
+          else:
+            nic_val = "%s" % nic_model
             netdev = "netdev%d" % nic_seq
-          nic_val += (",netdev=%s%s" % (netdev, nic_extra))
+          nic_val += (",netdev=%s,mac=%s%s" % (netdev, nic.mac, nic_extra))
           tap_val = ("type=tap,id=%s,%s%s%s" %
                      (netdev, tapfd, vhostfd, tap_extra))
           kvm_cmd.extend(["-netdev", tap_val, "-device", nic_val])
@@ -1828,9 +1999,21 @@ class KVMHypervisor(hv_base.BaseHypervisor):
   def VerifyHotplugSupport(self, instance, action, dev_type):
     """Verifies that hotplug is supported.
 
-    @raise errors.HypervisorError: in one of the previous cases
+    Hotplug is not supported if:
+
+      - the instance is not running
+      - the device type is not hotplug-able
+      - the QMP version does not support the corresponding commands
+
+    @raise errors.HypervisorError: if one of the above applies
 
     """
+    runtime = self._LoadKVMRuntime(instance)
+    device_type = _DEVICE_TYPE[dev_type](runtime[2])
+    if device_type not in _HOTPLUGGABLE_DEVICE_TYPES[dev_type]:
+      msg = "Hotplug is not supported for device type %s" % device_type
+      raise errors.HypervisorError(msg)
+
     if dev_type == constants.HOTPLUG_TARGET_DISK:
       if action == constants.HOTPLUG_ACTION_ADD:
         self.qmp.CheckDiskHotAddSupport()
@@ -1862,19 +2045,69 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     if (int(v_major), int(v_min)) < (1, 7):
       raise errors.HotplugError("Hotplug not supported for qemu versions < 1.7")
 
+  def _GetBusSlots(self, hvp=None, runtime=None):
+    """Helper function to get the slots of PCI and SCSI QEMU buses.
+
+    This will return the status of the first PCI and SCSI buses. By default
+    QEMU boots with one PCI bus (pci.0) and occupies the first 3 PCI slots. If
+    a SCSI disk is found then a SCSI controller is added on the PCI bus and a
+    SCSI bus (scsi.0) is created.
+
+    During hotplug we could query QEMU via info qtree HMP command but parsing
+    the result is too complicated. Instead we use the info stored in runtime
+    files. We parse NIC and disk entries and based on their hvinfo we reserve
+    the corresponding slots.
+
+    The runtime argument is a tuple as returned by _LoadKVMRuntime(). Obtain
+    disks and NICs from it. In case a runtime file is not available (see
+    _GenerateKVMRuntime()) we return the bus slots that QEMU boots with by
+    default.
+
+    """
+    # This is by default and returned during _GenerateKVMRuntime()
+    bus_slots = {
+      _PCI_BUS: bitarray(self._DEFAULT_PCI_RESERVATIONS),
+      _SCSI_BUS: bitarray(self._DEFAULT_SCSI_RESERVATIONS),
+      }
+
+    # Adjust the empty slots depending of the corresponding hvparam
+    if hvp and constants.HV_KVM_PCI_RESERVATIONS in hvp:
+      res = hvp[constants.HV_KVM_PCI_RESERVATIONS]
+      pci = bitarray(constants.QEMU_PCI_SLOTS)
+      pci.setall(False) # pylint: disable=E1101
+      pci[0:res:1] = True
+      bus_slots[_PCI_BUS] = pci
+
+    # This is during hot-add
+    if runtime:
+      _, nics, _, disks = runtime
+      disks = [d for d, _, _ in disks]
+      for d in disks + nics:
+        if not d.hvinfo or "bus" not in d.hvinfo:
+          continue
+        bus = d.hvinfo["bus"]
+        slots = bus_slots[bus]
+        if bus == _PCI_BUS:
+          slot = d.hvinfo["addr"]
+          slots[int(slot, 16)] = True
+        elif bus == _SCSI_BUS:
+          slot = d.hvinfo["scsi-id"]
+          slots[slot] = True
+
+    return bus_slots
+
   @_with_qmp
-  def _VerifyHotplugCommand(self, _instance,
-                            device, kvm_devid, should_exist):
+  def _VerifyHotplugCommand(self, _instance, kvm_devid, should_exist):
     """Checks if a previous hotplug command has succeeded.
 
     Depending on the should_exist value, verifies that an entry identified by
-    the PCI slot and device ID is present or not.
+    device ID is present or not.
 
     @raise errors.HypervisorError: if result is not the expected one
 
     """
     for i in range(5):
-      found = self.qmp.HasPCIDevice(device, kvm_devid)
+      found = self.qmp.HasDevice(kvm_devid)
       logging.info("Verifying hotplug command (retry %s): %s", i, found)
       if found and should_exist:
         break
@@ -1896,30 +2129,46 @@ class KVMHypervisor(hv_base.BaseHypervisor):
   def HotAddDevice(self, instance, dev_type, device, extra, seq):
     """ Helper method to hot-add a new device
 
-    It gets free pci slot generates the device name and invokes the
-    device specific method.
+    It generates the device ID and hvinfo, and invokes the
+    device-specific method.
 
     """
-    # in case of hot-mod this is given
-    if device.pci is None:
-      device.pci = self.qmp.GetFreePCISlot()
     kvm_devid = _GenerateDeviceKVMId(dev_type, device)
     runtime = self._LoadKVMRuntime(instance)
+    up_hvp = runtime[2]
+    device_type = _DEVICE_TYPE[dev_type](up_hvp)
+    bus_state = self._GetBusSlots(up_hvp, runtime)
+    # in case of hot-mod this is given
+    if not device.hvinfo:
+      device.hvinfo = _GenerateDeviceHVInfo(dev_type, kvm_devid,
+                                            device_type, bus_state)
     if dev_type == constants.HOTPLUG_TARGET_DISK:
       uri = _GetDriveURI(device, extra[0], extra[1])
-      self.qmp.HotAddDisk(device, kvm_devid, uri)
+
+      def drive_add_fn(filename):
+        """Helper function that uses HMP to hot-add a drive."""
+        cmd = "drive_add dummy file=%s,if=none,id=%s,format=raw" % \
+          (filename, kvm_devid)
+        self._CallMonitorCommand(instance.name, cmd)
+
+      # This must be done indirectly due to the fact that we pass the drive's
+      # file descriptor via QMP first, then we add the corresponding drive that
+      # refers to this fd. Note that if the QMP connection terminates before
+      # a drive which keeps a reference to the fd passed via the add-fd QMP
+      # command has been created, then the fd gets closed and cannot be used
+      # later (e.g., via an drive_add HMP command).
+      self.qmp.HotAddDisk(device, kvm_devid, uri, drive_add_fn)
     elif dev_type == constants.HOTPLUG_TARGET_NIC:
       kvmpath = instance.hvparams[constants.HV_KVM_PATH]
       kvmhelp = self._GetKVMOutput(kvmpath, self._KVMOPT_HELP)
       devlist = self._GetKVMOutput(kvmpath, self._KVMOPT_DEVICELIST)
-      up_hvp = runtime[2]
       features, _, _ = self._GetNetworkDeviceFeatures(up_hvp, devlist, kvmhelp)
       (tap, tapfds, vhostfds) = OpenTap(features=features)
       self._ConfigureNIC(instance, seq, device, tap)
       self.qmp.HotAddNic(device, kvm_devid, tapfds, vhostfds, features)
       utils.WriteFile(self._InstanceNICFile(instance.name, seq), data=tap)
 
-    self._VerifyHotplugCommand(instance, device, kvm_devid, True)
+    self._VerifyHotplugCommand(instance, kvm_devid, True)
     # update relevant entries in runtime file
     index = _DEVICE_RUNTIME_INDEX[dev_type]
     entry = _RUNTIME_ENTRY[dev_type](device, extra)
@@ -1931,7 +2180,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     """ Helper method for hot-del device
 
     It gets device info from runtime file, generates the device name and
-    invokes the device specific method.
+    invokes the device-specific method.
 
     """
     runtime = self._LoadKVMRuntime(instance)
@@ -1946,23 +2195,23 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     elif dev_type == constants.HOTPLUG_TARGET_NIC:
       self.qmp.HotDelNic(kvm_devid)
       utils.RemoveFile(self._InstanceNICFile(instance.name, seq))
-    self._VerifyHotplugCommand(instance, kvm_device, kvm_devid, False)
+    self._VerifyHotplugCommand(instance, kvm_devid, False)
     index = _DEVICE_RUNTIME_INDEX[dev_type]
     runtime[index].remove(entry)
     self._SaveKVMRuntime(instance, runtime)
 
-    return kvm_device.pci
+    return kvm_device.hvinfo
 
   def HotModDevice(self, instance, dev_type, device, _, seq):
     """ Helper method for hot-mod device
 
     It gets device info from runtime file, generates the device name and
-    invokes the device specific method. Currently only NICs support hot-mod
+    invokes the device-specific method. Currently only NICs support hot-mod
 
     """
     if dev_type == constants.HOTPLUG_TARGET_NIC:
-      # putting it back in the same pci slot
-      device.pci = self.HotDelDevice(instance, dev_type, device, _, seq)
+      # putting it back in the same bus and slot
+      device.hvinfo = self.HotDelDevice(instance, dev_type, device, _, seq)
       self.HotAddDevice(instance, dev_type, device, _, seq)
 
   @classmethod
index f2b7d02..658f20c 100644 (file)
@@ -48,6 +48,7 @@ from bitarray import bitarray
 
 from ganeti import errors
 from ganeti import utils
+from ganeti import constants
 from ganeti import serializer
 
 
@@ -255,7 +256,11 @@ class QmpConnection(MonitorSocket):
   _CAPABILITIES_COMMAND = "qmp_capabilities"
   _QUERY_COMMANDS = "query-commands"
   _MESSAGE_END_TOKEN = "\r\n"
-  _QEMU_PCI_SLOTS = 32 # The number of PCI slots QEMU exposes by default
+  # List of valid attributes for the device_add QMP command.
+  # Extra attributes found in device's hvinfo will be ignored.
+  _DEVICE_ATTRIBUTES = [
+    "driver", "id", "bus", "addr", "channel", "scsi-id", "lun"
+    ]
 
   def __init__(self, monitor_filename):
     super(QmpConnection, self).__init__(monitor_filename)
@@ -459,6 +464,15 @@ class QmpConnection(MonitorSocket):
 
       return response[self._RETURN_KEY]
 
+  def _filter_hvinfo(self, hvinfo):
+    """Filter non valid keys of the device's hvinfo (if any)."""
+    ret = {}
+    for k in self._DEVICE_ATTRIBUTES:
+      if k in hvinfo:
+        ret[k] = hvinfo[k]
+
+    return ret
+
   @_ensure_connection
   def HotAddNic(self, nic, devid, tapfds=None, vhostfds=None, features=None):
     """Hot-add a NIC
@@ -501,13 +515,12 @@ class QmpConnection(MonitorSocket):
     self.Execute("netdev_add", arguments)
 
     arguments = {
-      "driver": "virtio-net-pci",
-      "id": devid,
-      "bus": "pci.0",
-      "addr": hex(nic.pci),
       "netdev": devid,
       "mac": nic.mac,
     }
+    # Note that hvinfo that _GenerateDeviceHVInfo() creates
+    # sould include *only* the driver, id, bus, and addr keys
+    arguments.update(self._filter_hvinfo(nic.hvinfo))
     if enable_mq:
       arguments.update({
         "mq": "on",
@@ -524,12 +537,13 @@ class QmpConnection(MonitorSocket):
     self.Execute("netdev_del", {"id": devid})
 
   @_ensure_connection
-  def HotAddDisk(self, disk, devid, uri):
+  def HotAddDisk(self, disk, devid, uri, drive_add_fn=None):
     """Hot-add a disk
 
     Try opening the device to obtain a fd and pass it with SCM_RIGHTS. This
     will be omitted in case of userspace access mode (open will fail).
-    Then use blockdev-add and then device_add.
+    Then use blockdev-add QMP command or drive_add_fn() callback if any.
+    The add the guest device.
 
     """
     if os.path.exists(uri):
@@ -543,28 +557,41 @@ class QmpConnection(MonitorSocket):
       filename = uri
       fdset = None
 
-    arguments = {
-      "options": {
-        "driver": "raw",
-        "id": devid,
-        "file": {
-          "driver": "file",
-          "filename": filename,
+    # FIXME: Use blockdev-add/blockdev-del when properly implemented in QEMU.
+    # This is an ugly hack to work around QEMU commits 48f364dd and da2cf4e8:
+    #  * HMP's drive_del is not supported any more on a drive added
+    #    via QMP's blockdev-add
+    #  * Stay away from immature blockdev-add unless you want to help
+    #     with development.
+    # Using drive_add here must be done via a callback due to the fact that if
+    # a QMP connection terminates before a drive keeps a reference to the fd
+    # passed via the add-fd QMP command, then the fd gets closed and
+    # cannot be used later.
+    if drive_add_fn:
+      drive_add_fn(filename)
+    else:
+      arguments = {
+        "options": {
+          "driver": "raw",
+          "id": devid,
+          "file": {
+            "driver": "file",
+            "filename": filename,
+          }
         }
       }
-    }
-    self.Execute("blockdev-add", arguments)
+      self.Execute("blockdev-add", arguments)
 
     if fdset is not None:
       self._RemoveFdset(fdset)
 
     arguments = {
-      "driver": "virtio-blk-pci",
-      "id": devid,
-      "bus": "pci.0",
-      "addr": hex(disk.pci),
       "drive": devid,
     }
+    # Note that hvinfo that _GenerateDeviceHVInfo() creates
+    # sould include *only* the driver, id, bus, and
+    # addr or channel, scsi-id, and lun keys
+    arguments.update(self._filter_hvinfo(disk.hvinfo))
     self.Execute("device_add", arguments)
 
   @_ensure_connection
@@ -589,17 +616,51 @@ class QmpConnection(MonitorSocket):
     devices = bus["devices"]
     return devices
 
+  def _HasPCIDevice(self, devid):
+    """Check if a specific device ID exists on the PCI bus.
+
+    """
+    for d in self._GetPCIDevices():
+      if d["qdev_id"] == devid:
+        return True
+
+    return False
+
+  def _GetBlockDevices(self):
+    """Get the block devices of a running instance.
+
+    The query-block QMP command returns a list of dictionaries
+    including information for each virtual disk. For example:
+
+    [{"device": "disk-049f140d", "inserted": {"file": ..., "image": ...}}]
+
+    @rtype: list of dicts
+    @return: Info about the virtual disks of the instance.
+
+    """
+    self._check_connection()
+    devices = self.Execute("query-block")
+    return devices
+
+  def _HasBlockDevice(self, devid):
+    """Check if a specific device ID exists among block devices.
+
+    """
+    for d in self._GetBlockDevices():
+      if d["device"] == devid:
+        return True
+
+    return False
+
   @_ensure_connection
-  def HasPCIDevice(self, device, devid):
+  def HasDevice(self, devid):
     """Check if a specific device exists or not on a running instance.
 
-    It will match the PCI slot of the device and the id currently
-    obtained by _GenerateDeviceKVMId().
+    It first checks the PCI devices and then the block devices.
 
     """
-    for d in self._GetPCIDevices():
-      if d["qdev_id"] == devid and d["slot"] == device.pci:
-        return True
+    if (self._HasPCIDevice(devid) or self._HasBlockDevice(devid)):
+      return True
 
     return False
 
@@ -608,7 +669,7 @@ class QmpConnection(MonitorSocket):
     """Get the first available PCI slot of a running instance.
 
     """
-    slots = bitarray(self._QEMU_PCI_SLOTS)
+    slots = bitarray(constants.QEMU_PCI_SLOTS)
     slots.setall(False) # pylint: disable=E1101
     for d in self._GetPCIDevices():
       slot = d["slot"]
index 2b90d00..fcddaef 100644 (file)
 
 """Module implementing the job queue handling.
 
-Locking: there's a single, large lock in the L{JobQueue} class. It's
-used by all other classes in this module.
-
-@var JOBQUEUE_THREADS: the number of worker threads we start for
-    processing jobs
-
 """
 
 import logging
@@ -56,7 +50,6 @@ except ImportError:
 from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
-from ganeti import workerpool
 from ganeti import locking
 from ganeti import luxi
 from ganeti import opcodes
@@ -77,12 +70,6 @@ from ganeti import vcluster
 from ganeti.cmdlib import cluster
 
 
-JOBQUEUE_THREADS = 1
-
-# member lock names to be passed to @ssynchronized decorator
-_LOCK = "_lock"
-_QUEUE = "_queue"
-
 #: Retrieves "id" attribute
 _GetIdAttr = operator.attrgetter("id")
 
@@ -208,7 +195,7 @@ class _QueuedJob(object):
   # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "processor_lock", "writable", "archived",
+               "writable", "archived",
                "livelock", "process_id",
                "__weakref__"]
 
@@ -275,12 +262,6 @@ class _QueuedJob(object):
     obj.ops_iter = None
     obj.cur_opctx = None
 
-    # Read-only jobs are not processed and therefore don't need a lock
-    if writable:
-      obj.processor_lock = threading.Lock()
-    else:
-      obj.processor_lock = None
-
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
               "id=%s" % self.id,
@@ -584,7 +565,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
-  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
@@ -609,7 +589,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # And finally replicate the job status
     self._queue.UpdateJobUnlocked(self._job)
 
-  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyRetry(self):
     """Mark opcode again as lock-waiting.
 
@@ -620,7 +599,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._op.status = constants.OP_STATUS_WAITING
     logging.debug("Opcode will be retried. Back to waiting.")
 
-  @locking.ssynchronized(_QUEUE, shared=1)
   def _AppendFeedback(self, timestamp, log_type, log_msg):
     """Internal feedback append function, with locks
 
@@ -972,7 +950,6 @@ class _JobProcessor(object):
 
     logging.debug("Processing job %s", job.id)
 
-    queue.acquire(shared=1)
     try:
       opcount = len(job.ops)
 
@@ -1035,11 +1012,7 @@ class _JobProcessor(object):
 
           assert not opctx.jobdeps, "Not all dependencies were removed"
 
-          queue.release()
-          try:
-            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
-          finally:
-            queue.acquire(shared=1)
+          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
 
           op.status = op_status
           op.result = op_result
@@ -1144,115 +1117,6 @@ class _JobProcessor(object):
         return self.DEFER
     finally:
       assert job.writable, "Job became read-only while being processed"
-      queue.release()
-
-
-def _EvaluateJobProcessorResult(depmgr, job, result):
-  """Looks at a result from L{_JobProcessor} for a job.
-
-  To be used in a L{_JobQueueWorker}.
-
-  """
-  if result == _JobProcessor.FINISHED:
-    # Notify waiting jobs
-    depmgr.NotifyWaiters(job.id)
-
-  elif result == _JobProcessor.DEFER:
-    # Schedule again
-    raise workerpool.DeferTask(priority=job.CalcPriority())
-
-  elif result == _JobProcessor.WAITDEP:
-    # No-op, dependency manager will re-schedule
-    pass
-
-  else:
-    raise errors.ProgrammerError("Job processor returned unknown status %s" %
-                                 (result, ))
-
-
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
-
-  """
-  def RunTask(self, job): # pylint: disable=W0221
-    """Job executor.
-
-    @type job: L{_QueuedJob}
-    @param job: the job to be processed
-
-    """
-    assert job.writable, "Expected writable job"
-
-    # Ensure only one worker is active on a single job. If a job registers for
-    # a dependency job, and the other job notifies before the first worker is
-    # done, the job can end up in the tasklist more than once.
-    job.processor_lock.acquire()
-    try:
-      return self._RunTaskInner(job)
-    finally:
-      job.processor_lock.release()
-
-  def _RunTaskInner(self, job):
-    """Executes a job.
-
-    Must be called with per-job lock acquired.
-
-    """
-    queue = job.queue
-    assert queue == self.pool.queue
-
-    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
-    setname_fn(None)
-
-    proc = mcpu.Processor(queue.context, job.id)
-
-    # Create wrapper for setting thread name
-    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
-                                    proc.ExecOpCode)
-
-    _EvaluateJobProcessorResult(queue.depmgr, job,
-                                _JobProcessor(queue, wrap_execop_fn, job)())
-
-  @staticmethod
-  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
-    """Updates the worker thread name to include a short summary of the opcode.
-
-    @param setname_fn: Callable setting worker thread name
-    @param execop_fn: Callable for executing opcode (usually
-                      L{mcpu.Processor.ExecOpCode})
-
-    """
-    setname_fn(op)
-    try:
-      return execop_fn(op, *args, **kwargs)
-    finally:
-      setname_fn(None)
-
-  @staticmethod
-  def _GetWorkerName(job, op):
-    """Sets the worker thread name.
-
-    @type job: L{_QueuedJob}
-    @type op: L{opcodes.OpCode}
-
-    """
-    parts = ["Job%s" % job.id]
-
-    if op:
-      parts.append(op.TinySummary())
-
-    return "/".join(parts)
-
-
-class _JobQueueWorkerPool(workerpool.WorkerPool):
-  """Simple class implementing a job-processing workerpool.
-
-  """
-  def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__("Jq",
-                                              JOBQUEUE_THREADS,
-                                              _JobQueueWorker)
-    self.queue = queue
 
 
 class _JobDependencyManager:
@@ -1265,33 +1129,14 @@ class _JobDependencyManager:
    CONTINUE,
    WRONGSTATUS) = range(1, 6)
 
-  def __init__(self, getstatus_fn, enqueue_fn):
+  def __init__(self, getstatus_fn):
     """Initializes this class.
 
     """
     self._getstatus_fn = getstatus_fn
-    self._enqueue_fn = enqueue_fn
 
     self._waiters = {}
-    self._lock = locking.SharedLock("JobDepMgr")
 
-  @locking.ssynchronized(_LOCK, shared=1)
-  def GetLockInfo(self, requested): # pylint: disable=W0613
-    """Retrieves information about waiting jobs.
-
-    @type requested: set
-    @param requested: Requested information, see C{query.LQ_*}
-
-    """
-    # No need to sort here, that's being done by the lock manager and query
-    # library. There are no priorities for notifying jobs, hence all show up as
-    # one item under "pending".
-    return [("job/%s" % job_id, None, None,
-             [("job", [job.id for job in waiters])])
-            for job_id, waiters in self._waiters.items()
-            if waiters]
-
-  @locking.ssynchronized(_LOCK, shared=1)
   def JobWaiting(self, job):
     """Checks if a job is waiting.
 
@@ -1299,7 +1144,6 @@ class _JobDependencyManager:
     return compat.any(job in jobs
                       for jobs in self._waiters.values())
 
-  @locking.ssynchronized(_LOCK)
   def CheckAndRegister(self, job, dep_job_id, dep_status):
     """Checks if a dependency job has the requested status.
 
@@ -1365,31 +1209,6 @@ class _JobDependencyManager:
                    if not waiters]:
       del self._waiters[job_id]
 
-  def NotifyWaiters(self, job_id):
-    """Notifies all jobs waiting for a certain job ID.
-
-    @attention: Do not call until L{CheckAndRegister} returned a status other
-      than C{WAITDEP} for C{job_id}, or behaviour is undefined
-    @type job_id: int
-    @param job_id: Job ID
-
-    """
-    assert ht.TJobId(job_id)
-
-    self._lock.acquire()
-    try:
-      self._RemoveEmptyWaitersUnlocked()
-
-      jobs = self._waiters.pop(job_id, None)
-    finally:
-      self._lock.release()
-
-    if jobs:
-      # Re-add jobs to workerpool
-      logging.debug("Re-adding %s jobs which were waiting for job %s",
-                    len(jobs), job_id)
-      self._enqueue_fn(jobs)
-
 
 class JobQueue(object):
   """Queue used to manage the jobs.
@@ -1408,26 +1227,10 @@ class JobQueue(object):
         data and other ganeti objects
 
     """
-    self.primary_jid = None
     self.context = context
     self._memcache = weakref.WeakValueDictionary()
     self._my_hostname = netutils.Hostname.GetSysName()
 
-    # The Big JobQueue lock. If a code block or method acquires it in shared
-    # mode safe it must guarantee concurrency with all the code acquiring it in
-    # shared mode, including itself. In order not to acquire it at all
-    # concurrency must be guaranteed with all code acquiring it in shared mode
-    # and all code acquiring it exclusively.
-    self._lock = locking.SharedLock("JobQueue")
-
-    self.acquire = self._lock.acquire
-    self.release = self._lock.release
-
-    # Read serial file
-    self._last_serial = jstore.ReadSerial()
-    assert self._last_serial is not None, ("Serial file was modified between"
-                                           " check in jstore and here")
-
     # Get initial list of nodes
     self._nodes = dict((n.name, n.primary_ip)
                        for n in cfg.GetAllNodesInfo().values()
@@ -1436,66 +1239,8 @@ class JobQueue(object):
     # Remove master node
     self._nodes.pop(self._my_hostname, None)
 
-    # TODO: Check consistency across nodes
-
-    self._queue_size = None
-    self._UpdateQueueSizeUnlocked()
-    assert ht.TInt(self._queue_size)
-
     # Job dependencies
-    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
-                                        self._EnqueueJobs)
-
-    # Setup worker pool
-    self._wpool = _JobQueueWorkerPool(self)
-
-  def _PickupJobUnlocked(self, job_id):
-    """Load a job from the job queue
-
-    Pick up a job that already is in the job queue and start/resume it.
-
-    """
-    if self.primary_jid:
-      logging.warning("Job process asked to pick up %s, but already has %s",
-                      job_id, self.primary_jid)
-
-    self.primary_jid = int(job_id)
-
-    job = self._LoadJobUnlocked(job_id)
-
-    if job is None:
-      logging.warning("Job %s could not be read", job_id)
-      return
-
-    job.AddReasons(pickup=True)
-
-    status = job.CalcStatus()
-    if status == constants.JOB_STATUS_QUEUED:
-      job.SetPid(os.getpid())
-      self._EnqueueJobsUnlocked([job])
-      logging.info("Restarting job %s", job.id)
-
-    elif status in (constants.JOB_STATUS_RUNNING,
-                    constants.JOB_STATUS_WAITING,
-                    constants.JOB_STATUS_CANCELING):
-      logging.warning("Unfinished job %s found: %s", job.id, job)
-
-      if status == constants.JOB_STATUS_WAITING:
-        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
-        job.SetPid(os.getpid())
-        self._EnqueueJobsUnlocked([job])
-        logging.info("Restarting job %s", job.id)
-      else:
-        to_encode = errors.OpExecError("Unclean master daemon shutdown")
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              _EncodeOpError(to_encode))
-        job.Finalize()
-
-    self.UpdateJobUnlocked(job)
-
-  @locking.ssynchronized(_LOCK)
-  def PickupJob(self, job_id):
-    self._PickupJobUnlocked(job_id)
+    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
 
   def _GetRpc(self, address_list):
     """Gets RPC runner with context.
@@ -1503,67 +1248,6 @@ class JobQueue(object):
     """
     return rpc.JobQueueRunner(self.context, address_list)
 
-  @locking.ssynchronized(_LOCK)
-  def AddNode(self, node):
-    """Register a new node with the queue.
-
-    @type node: L{objects.Node}
-    @param node: the node object to be added
-
-    """
-    node_name = node.name
-    assert node_name != self._my_hostname
-
-    # Clean queue directory on added node
-    result = self._GetRpc(None).call_jobqueue_purge(node_name)
-    msg = result.fail_msg
-    if msg:
-      logging.warning("Cannot cleanup queue directory on node %s: %s",
-                      node_name, msg)
-
-    if not node.master_candidate:
-      # remove if existing, ignoring errors
-      self._nodes.pop(node_name, None)
-      # and skip the replication of the job ids
-      return
-
-    # Upload the whole queue excluding archived jobs
-    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
-
-    # Upload current serial file
-    files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
-
-    # Static address list
-    addrs = [node.primary_ip]
-
-    for file_name in files:
-      # Read file content
-      content = utils.ReadFile(file_name)
-
-      result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
-                             file_name, content)
-      msg = result[node_name].fail_msg
-      if msg:
-        logging.error("Failed to upload file %s to node %s: %s",
-                      file_name, node_name, msg)
-
-    msg = result[node_name].fail_msg
-    if msg:
-      logging.error("Failed to set queue drained flag on node %s: %s",
-                    node_name, msg)
-
-    self._nodes[node_name] = node.primary_ip
-
-  @locking.ssynchronized(_LOCK)
-  def RemoveNode(self, node_name):
-    """Callback called when removing nodes from the cluster.
-
-    @type node_name: str
-    @param node_name: the name of the node to remove
-
-    """
-    self._nodes.pop(node_name, None)
-
   @staticmethod
   def _CheckRpcResult(result, nodes, failmsg):
     """Verifies the status of an RPC call.
@@ -1799,15 +1483,6 @@ class JobQueue(object):
 
     if not raw_data:
       logging.debug("No data available for job %s", job_id)
-      if int(job_id) == self.primary_jid:
-        logging.warning("My own job file (%s) disappeared;"
-                        " this should only happy at cluster desctruction",
-                        job_id)
-        if mcpu.lusExecuting[0] == 0:
-          logging.warning("Not in execution; cleaning up myself due to missing"
-                          " job file")
-          logging.shutdown()
-          os._exit(1) # pylint: disable=W0212
       return None
 
     if writable is None:
@@ -1842,12 +1517,6 @@ class JobQueue(object):
       logging.exception("Can't load/parse job %s", job_id)
       return None
 
-  def _UpdateQueueSizeUnlocked(self):
-    """Update the queue size.
-
-    """
-    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
-
   @classmethod
   def SubmitManyJobs(cls, jobs):
     """Create and store multiple jobs.
@@ -1856,14 +1525,6 @@ class JobQueue(object):
     return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
 
   @staticmethod
-  def _FormatSubmitError(msg, ops):
-    """Formats errors which occurred while submitting a job.
-
-    """
-    return ("%s; opcodes %s" %
-            (msg, utils.CommaJoin(op.Summary() for op in ops)))
-
-  @staticmethod
   def _ResolveJobDependencies(resolve_fn, deps):
     """Resolves relative job IDs in dependencies.
 
@@ -1894,28 +1555,6 @@ class JobQueue(object):
 
     return (True, result)
 
-  @locking.ssynchronized(_LOCK)
-  def _EnqueueJobs(self, jobs):
-    """Helper function to add jobs to worker pool's queue.
-
-    @type jobs: list
-    @param jobs: List of all jobs
-
-    """
-    return self._EnqueueJobsUnlocked(jobs)
-
-  def _EnqueueJobsUnlocked(self, jobs):
-    """Helper function to add jobs to worker pool's queue.
-
-    @type jobs: list
-    @param jobs: List of all jobs
-
-    """
-    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
-    self._wpool.AddManyTasks([(job, ) for job in jobs],
-                             priority=[job.CalcPriority() for job in jobs],
-                             task_id=map(_GetIdAttr, jobs))
-
   def _GetJobStatusForDependencies(self, job_id):
     """Gets the status of a job for dependencies.
 
@@ -1983,7 +1622,6 @@ class JobQueue(object):
     else:
       return None
 
-  @locking.ssynchronized(_LOCK)
   def CancelJob(self, job_id):
     """Cancels a job.
 
@@ -1997,7 +1635,6 @@ class JobQueue(object):
 
     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
 
-  @locking.ssynchronized(_LOCK)
   def ChangeJobPriority(self, job_id, priority):
     """Changes a job's priority.
 
@@ -2016,13 +1653,6 @@ class JobQueue(object):
 
     def fn(job):
       (success, msg) = job.ChangePriority(priority)
-
-      if success:
-        try:
-          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
-        except workerpool.NoSuchTask:
-          logging.debug("Job %s is not in workerpool at this time", job.id)
-
       return (success, msg)
 
     return self._ModifyJobUnlocked(job_id, fn)
@@ -2053,124 +1683,3 @@ class JobQueue(object):
       self.UpdateJobUnlocked(job)
 
     return (success, msg)
-
-  def _ArchiveJobsUnlocked(self, jobs):
-    """Archives jobs.
-
-    @type jobs: list of L{_QueuedJob}
-    @param jobs: Job objects
-    @rtype: int
-    @return: Number of archived jobs
-
-    """
-    archive_jobs = []
-    rename_files = []
-    for job in jobs:
-      assert job.writable, "Can't archive read-only job"
-      assert not job.archived, "Can't cancel archived job"
-
-      if job.CalcStatus() not in constants.JOBS_FINALIZED:
-        logging.debug("Job %s is not yet done", job.id)
-        continue
-
-      archive_jobs.append(job)
-
-      old = self._GetJobPath(job.id)
-      new = self._GetArchivedJobPath(job.id)
-      rename_files.append((old, new))
-
-    # TODO: What if 1..n files fail to rename?
-    self._RenameFilesUnlocked(rename_files)
-
-    logging.debug("Successfully archived job(s) %s",
-                  utils.CommaJoin(job.id for job in archive_jobs))
-
-    # Since we haven't quite checked, above, if we succeeded or failed renaming
-    # the files, we update the cached queue size from the filesystem. When we
-    # get around to fix the TODO: above, we can use the number of actually
-    # archived jobs to fix this.
-    self._UpdateQueueSizeUnlocked()
-    return len(archive_jobs)
-
-  def _Query(self, fields, qfilter):
-    qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
-                       namefield="id")
-
-    # Archived jobs are only looked at if the "archived" field is referenced
-    # either as a requested field or in the filter. By default archived jobs
-    # are ignored.
-    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
-
-    job_ids = qobj.RequestedNames()
-
-    list_all = (job_ids is None)
-
-    if list_all:
-      # Since files are added to/removed from the queue atomically, there's no
-      # risk of getting the job ids in an inconsistent state.
-      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
-
-    jobs = []
-
-    for job_id in job_ids:
-      job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
-      if job is not None or not list_all:
-        jobs.append((job_id, job))
-
-    return (qobj, jobs, list_all)
-
-  def QueryJobs(self, fields, qfilter):
-    """Returns a list of jobs in queue.
-
-    @type fields: sequence
-    @param fields: List of wanted fields
-    @type qfilter: None or query2 filter (list)
-    @param qfilter: Query filter
-
-    """
-    (qobj, ctx, _) = self._Query(fields, qfilter)
-
-    return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
-
-  def OldStyleQueryJobs(self, job_ids, fields):
-    """Returns a list of jobs in queue.
-
-    @type job_ids: list
-    @param job_ids: sequence of job identifiers or None for all
-    @type fields: list
-    @param fields: names of fields to return
-    @rtype: list
-    @return: list one element per job, each element being list with
-        the requested fields
-
-    """
-    # backwards compat:
-    job_ids = [int(jid) for jid in job_ids]
-    qfilter = qlang.MakeSimpleFilter("id", job_ids)
-
-    (qobj, ctx, _) = self._Query(fields, qfilter)
-
-    return qobj.OldStyleQuery(ctx, sort_by_name=False)
-
-  @locking.ssynchronized(_LOCK)
-  def PrepareShutdown(self):
-    """Prepare to stop the job queue.
-
-    Returns whether there are any jobs currently running. If the latter is the
-    case, the job queue is not yet ready for shutdown. Once this function
-    returns C{True} L{Shutdown} can be called without interfering with any job.
-
-    @rtype: bool
-    @return: Whether there are any running jobs
-
-    """
-    return self._wpool.HasRunningTasks()
-
-  @locking.ssynchronized(_LOCK)
-  def Shutdown(self):
-    """Stops the job queue.
-
-    This shutdowns all the worker threads an closes the queue.
-
-    """
-    self._wpool.TerminateWorkers()
index a69a1e9..8e61805 100644 (file)
@@ -44,16 +44,21 @@ import time
 from ganeti import mcpu
 from ganeti.server import masterd
 from ganeti.rpc import transport
+from ganeti import serializer
 from ganeti import utils
 from ganeti import pathutils
 from ganeti.utils import livelock
 
+from ganeti.jqueue import _JobProcessor
+
 
 def _GetMasterInfo():
-  """Retrieves the job id and lock file name from the master process
+  """Retrieve job id, lock file name and secret params from the master process
 
   This also closes standard input/output
 
+  @rtype: (int, string, json encoding of a list of dicts)
+
   """
   logging.debug("Opening transport over stdin/out")
   with contextlib.closing(transport.FdTransport((0, 1))) as trans:
@@ -63,7 +68,27 @@ def _GetMasterInfo():
     logging.debug("Reading the livelock name from the master process")
     livelock_name = livelock.LiveLockName(trans.Call(""))
     logging.debug("Got livelock %s", livelock_name)
-  return (job_id, livelock_name)
+    logging.debug("Reading secret parameters from the master process")
+    secret_params = trans.Call("")
+    logging.debug("Got secret parameters.")
+  return (job_id, livelock_name, secret_params)
+
+
+def RestorePrivateValueWrapping(json):
+  """Wrap private values in JSON decoded structure.
+
+  @param json: the json-decoded value to protect.
+
+  """
+  result = []
+
+  for secrets_dict in json:
+    if secrets_dict is None:
+      data = serializer.PrivateDict()
+    else:
+      data = serializer.PrivateDict(secrets_dict)
+    result.append(data)
+  return result
 
 
 def main():
@@ -73,11 +98,15 @@ def main():
   logname = pathutils.GetLogFilename("jobs")
   utils.SetupLogging(logname, "job-startup", debug=debug)
 
-  (job_id, livelock_name) = _GetMasterInfo()
+  (job_id, livelock_name, secret_params_serialized) = _GetMasterInfo()
+
+  secret_params = ""
+  if secret_params_serialized:
+    secret_params_json = serializer.LoadJson(secret_params_serialized)
+    secret_params = RestorePrivateValueWrapping(secret_params_json)
 
   utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
 
-  exit_code = 1
   try:
     logging.debug("Preparing the context and the configuration")
     context = masterd.GanetiContext(livelock_name)
@@ -103,15 +132,30 @@ def main():
       prio_change[0] = True
     signal.signal(signal.SIGUSR1, _User1Handler)
 
-    logging.debug("Picking up job %d", job_id)
-    context.jobqueue.PickupJob(job_id)
-
-    # waiting for the job to finish
-    time.sleep(1)
-    while not context.jobqueue.HasJobBeenFinalized(job_id):
+    job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+
+    job.SetPid(os.getpid())
+
+    if secret_params:
+      for i in range(0, len(secret_params)):
+        if hasattr(job.ops[i].input, "osparams_secret"):
+          job.ops[i].input.osparams_secret = secret_params[i]
+
+    execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode
+    proc = _JobProcessor(context.jobqueue, execfun, job)
+    result = _JobProcessor.DEFER
+    while result != _JobProcessor.FINISHED:
+      result = proc()
+      if result == _JobProcessor.WAITDEP and not cancel[0]:
+        # Normally, the scheduler should avoid starting a job where the
+        # dependencies are not yet finalised. So warn, but wait an continue.
+        logging.warning("Got started despite a dependency not yet finished")
+        time.sleep(5)
       if cancel[0]:
         logging.debug("Got cancel request, cancelling job %d", job_id)
         r = context.jobqueue.CancelJob(job_id)
+        job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+        proc = _JobProcessor(context.jobqueue, execfun, job)
         logging.debug("CancelJob result for job %d: %s", job_id, r)
         cancel[0] = False
       if prio_change[0]:
@@ -122,21 +166,15 @@ def main():
           utils.RemoveFile(fname)
           logging.debug("Changing priority of job %d to %d", job_id, new_prio)
           r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
+          job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+          proc = _JobProcessor(context.jobqueue, execfun, job)
           logging.debug("Result of changing priority of %d to %d: %s", job_id,
                         new_prio, r)
         except Exception: # pylint: disable=W0703
           logging.warning("Informed of priority change, but could not"
                           " read new priority")
         prio_change[0] = False
-      time.sleep(1)
-
-    # wait until the queue finishes
-    logging.debug("Waiting for the queue to finish")
-    while context.jobqueue.PrepareShutdown():
-      time.sleep(1)
-    logging.debug("Shutting the queue down")
-    context.jobqueue.Shutdown()
-    exit_code = 0
+
   except Exception: # pylint: disable=W0703
     logging.exception("Exception when trying to run job %d", job_id)
   finally:
@@ -144,7 +182,7 @@ def main():
     logging.debug("Removing livelock file %s", livelock_name.GetPath())
     os.remove(livelock_name.GetPath())
 
-  sys.exit(exit_code)
+  sys.exit(0)
 
 if __name__ == '__main__':
   main()
index 728a02b..6d58a78 100644 (file)
@@ -232,6 +232,23 @@ class IAReqInstanceAlloc(IARequestBase):
                                          (len(result), self.RequiredNodes()))
 
 
+class IAReqInstanceAllocateSecondary(IARequestBase):
+  """Request to find a secondary node for plain to DRBD conversion.
+
+  """
+  # pylint: disable=E1101
+  MODE = constants.IALLOCATOR_MODE_ALLOCATE_SECONDARY
+  REQ_PARAMS = [
+    _INST_NAME,
+    ]
+  REQ_RESULT = ht.TString
+
+  def GetRequest(self, cfg):
+    return {
+      "name": self.name
+    }
+
+
 class IAReqMultiInstanceAlloc(IARequestBase):
   """An multi instance allocation request.
 
@@ -491,13 +508,10 @@ class IAllocator(object):
 
     if isinstance(self.req, IAReqInstanceAlloc):
       hypervisor_name = self.req.hypervisor
-      node_whitelist = self.req.node_whitelist
     elif isinstance(self.req, IAReqRelocate):
       hypervisor_name = iinfo[self.req.inst_uuid].hypervisor
-      node_whitelist = None
     else:
       hypervisor_name = cluster_info.primary_hypervisor
-      node_whitelist = None
 
     if not disk_template:
       disk_template = cluster_info.enabled_disk_templates[0]
@@ -512,7 +526,7 @@ class IAllocator(object):
 
     data["nodegroups"] = self._ComputeNodeGroupData(cluster_info, ginfo)
 
-    config_ndata = self._ComputeBasicNodeData(cfg, ninfo, node_whitelist)
+    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
     data["nodes"] = self._ComputeDynamicNodeData(
         ninfo, node_data, node_iinfo, i_list, config_ndata, disk_template)
     assert len(data["nodes"]) == len(ninfo), \
@@ -539,7 +553,7 @@ class IAllocator(object):
     return ng
 
   @staticmethod
-  def _ComputeBasicNodeData(cfg, node_cfg, node_whitelist):
+  def _ComputeBasicNodeData(cfg, node_cfg):
     """Compute global node data.
 
     @rtype: dict
@@ -551,9 +565,7 @@ class IAllocator(object):
       "tags": list(ninfo.GetTags()),
       "primary_ip": ninfo.primary_ip,
       "secondary_ip": ninfo.secondary_ip,
-      "offline": (ninfo.offline or
-                  not (node_whitelist is None or
-                       ninfo.name in node_whitelist)),
+      "offline": ninfo.offline,
       "drained": ninfo.drained,
       "master_candidate": ninfo.master_candidate,
       "group": ninfo.group,
@@ -814,6 +826,14 @@ class IAllocator(object):
     self._ComputeClusterData(disk_template=disk_template)
 
     request["type"] = req.MODE
+
+    if isinstance(self.req, IAReqInstanceAlloc):
+      node_whitelist = self.req.node_whitelist
+    else:
+      node_whitelist = None
+    if node_whitelist is not None:
+      request["restrict-to-nodes"] = node_whitelist
+
     self.in_data["request"] = request
 
     self.in_text = serializer.Dump(self.in_data)
index e0289bf..1da0766 100644 (file)
@@ -276,6 +276,17 @@ def _LockList(names):
     return list(names)
 
 
+def _CheckSecretParameters(op):
+  """Check if secret parameters are expected, but missing.
+
+  """
+  if hasattr(op, "osparams_secret") and op.osparams_secret:
+    for secret_param in op.osparams_secret:
+      if op.osparams_secret[secret_param].Get() == constants.REDACTED:
+        raise errors.OpPrereqError("Please re-submit secret parameters to job.",
+                                   errors.ECODE_INVAL)
+
+
 class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = _ComputeDispatchTable()
@@ -289,7 +300,6 @@ class Processor(object):
     @param ec_id: execution context identifier
 
     """
-    self.context = context
     self._ec_id = ec_id
     self._cbs = None
     self.cfg = context.GetConfig(ec_id)
@@ -685,9 +695,10 @@ class Processor(object):
         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
                                      " disabled" % op.OP_ID)
 
-      lu = lu_class(self, op, self.context, self.cfg, self.rpc,
+      lu = lu_class(self, op, self.cfg, self.rpc,
                     self._wconfdcontext, self.wconfd)
       lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
+      _CheckSecretParameters(op)
       lu.ExpandNames()
       assert lu.needed_locks is not None, "needed_locks not set by LU"
 
index 96e7092..e91719e 100644 (file)
@@ -522,7 +522,7 @@ class ConfigData(ConfigObject):
 class NIC(ConfigObject):
   """Config object representing a network card."""
   __slots__ = ["name", "mac", "ip", "network",
-               "nicparams", "netinfo", "pci"] + _UUID
+               "nicparams", "netinfo", "pci", "hvinfo"] + _UUID
 
   @classmethod
   def CheckParameterSyntax(cls, nicparams):
@@ -564,6 +564,7 @@ class Disk(ConfigObject):
     "params",
     "spindles",
     "pci",
+    "hvinfo",
     "serial_no",
     # dynamic_params is special. It depends on the node this instance
     # is sent to, and should not be persisted.
@@ -1674,6 +1675,8 @@ class Cluster(TaggableObject):
     "compression_tools",
     "enabled_user_shutdown",
     "data_collectors",
+    "ssh_key_type",
+    "ssh_key_bits",
     ] + _TIMESTAMPS + _UUID
 
   def UpgradeConfig(self):
@@ -1829,6 +1832,12 @@ class Cluster(TaggableObject):
     if self.enabled_user_shutdown is None:
       self.enabled_user_shutdown = False
 
+    if self.ssh_key_type is None:
+      self.ssh_key_type = constants.SSH_DEFAULT_KEY_TYPE
+
+    if self.ssh_key_bits is None:
+      self.ssh_key_bits = constants.SSH_DEFAULT_KEY_BITS
+
   @property
   def primary_hypervisor(self):
     """The first hypervisor is the primary.
index b6ef576..c1f0aba 100644 (file)
@@ -337,6 +337,8 @@ def GenericCurlConfig(verbose=False, use_signal=False,
       if capath:
         raise Error("cURL linked against GnuTLS has no support for a"
                     " CA path (%s)" % (pycurl.version, ))
+    elif lcsslver.startswith("boringssl"):
+      pass
     else:
       raise NotImplementedError("cURL uses unsupported SSL version '%s'" %
                                 sslver)
index 8271016..47f1433 100644 (file)
@@ -70,7 +70,8 @@ class Transport:
     There are two timeouts used since we might want to wait for a long
     time for a response, but the connect timeout should be lower.
 
-    If not passed, we use a default of 10 and respectively 60 seconds.
+    If not passed, we use the default luxi timeouts from the global
+    constants file.
 
     Note that on reading data, since the timeout applies to an
     invidual receive, it might be that the total duration is longer
index 74b74d1..71fa231 100644 (file)
@@ -513,9 +513,6 @@ _NODE_CALLS = [
     ("checkdict", None, "What to verify"),
     ("cluster_name", None, "Cluster name"),
     ("all_hvparams", None, "Dictionary mapping hypervisor names to hvparams"),
-    ("node_groups", None, "node names mapped to their group uuids"),
-    ("groups_cfg", None,
-      "a dictionary mapping group uuids to their configuration"),
     ], None, None, "Request verification of given parameters"),
   ("node_volumes", MULTI, None, constants.RPC_TMO_FAST, [], None, None,
    "Gets all volumes on node(s)"),
@@ -568,7 +565,10 @@ _NODE_CALLS = [
     ("node_uuids", None, "UUIDs of the nodes whose key is renewed"),
     ("node_names", None, "Names of the nodes whose key is renewed"),
     ("master_candidate_uuids", None, "List of UUIDs of master candidates."),
-    ("potential_master_candidates", None, "Potential master candidates")],
+    ("potential_master_candidates", None, "Potential master candidates"),
+    ("old_key_type", None, "The type of key previously used"),
+    ("new_key_type", None, "The type of key to generate"),
+    ("new_key_bits", None, "The length of the key to generate")],
     None, None, "Renew all SSH key pairs of all nodes nodes."),
   ]
 
@@ -683,9 +683,6 @@ CALLS = {
       ("checkdict", None, "What to verify"),
       ("cluster_name", None, "Cluster name"),
       ("hvparams", None, "Dictionary mapping hypervisor names to hvparams"),
-      ("node_groups", None, "node names mapped to their group uuids"),
-      ("groups_cfg", None,
-       "a dictionary mapping group uuids to their configuration"),
       ], None, None, "Request verification of given parameters"),
     ]),
   "RpcClientConfig": _Prepare([
index be98f69..8f127ba 100644 (file)
@@ -38,26 +38,13 @@ inheritance from parent classes requires it.
 # pylint: disable=C0103
 # C0103: Invalid name ganeti-masterd
 
-import os
-import sys
-import socket
-import time
-import tempfile
 import logging
 
-
 from ganeti import config
 from ganeti import constants
-from ganeti import daemon
 from ganeti import jqueue
-from ganeti import luxi
-import ganeti.rpc.errors as rpcerr
 from ganeti import utils
-from ganeti import errors
-from ganeti import workerpool
 import ganeti.rpc.node as rpc
-import ganeti.rpc.client as rpccl
-from ganeti import ht
 
 
 CLIENT_REQUEST_WORKERS = 16
@@ -66,248 +53,6 @@ EXIT_NOTMASTER = constants.EXIT_NOTMASTER
 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 
-def _LogNewJob(status, info, ops):
-  """Log information about a recently submitted job.
-
-  """
-  op_summary = utils.CommaJoin(op.Summary() for op in ops)
-
-  if status:
-    logging.info("New job with id %s, summary: %s", info, op_summary)
-  else:
-    logging.info("Failed to submit job, reason: '%s', summary: %s",
-                 info, op_summary)
-
-
-class ClientRequestWorker(workerpool.BaseWorker):
-  # pylint: disable=W0221
-  def RunTask(self, server, message, client):
-    """Process the request.
-
-    """
-    client_ops = ClientOps(server)
-
-    try:
-      (method, args, ver) = rpccl.ParseRequest(message)
-    except rpcerr.ProtocolError, err:
-      logging.error("Protocol Error: %s", err)
-      client.close_log()
-      return
-
-    success = False
-    try:
-      # Verify client's version if there was one in the request
-      if ver is not None and ver != constants.LUXI_VERSION:
-        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
-                               (constants.LUXI_VERSION, ver))
-
-      result = client_ops.handle_request(method, args)
-      success = True
-    except errors.GenericError, err:
-      logging.exception("Unexpected exception")
-      success = False
-      result = errors.EncodeException(err)
-    except:
-      logging.exception("Unexpected exception")
-      err = sys.exc_info()
-      result = "Caught exception: %s" % str(err[1])
-
-    try:
-      reply = rpccl.FormatResponse(success, result)
-      client.send_message(reply)
-      # awake the main thread so that it can write out the data.
-      server.awaker.signal()
-    except: # pylint: disable=W0702
-      logging.exception("Send error")
-      client.close_log()
-
-
-class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
-  """Handler for master peers.
-
-  """
-  _MAX_UNHANDLED = 1
-
-  def __init__(self, server, connected_socket, client_address, family):
-    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
-                                                 client_address,
-                                                 constants.LUXI_EOM,
-                                                 family, self._MAX_UNHANDLED)
-    self.server = server
-
-  def handle_message(self, message, _):
-    self.server.request_workers.AddTask((self.server, message, self))
-
-
-class _MasterShutdownCheck(object):
-  """Logic for master daemon shutdown.
-
-  """
-  #: How long to wait between checks
-  _CHECK_INTERVAL = 5.0
-
-  #: How long to wait after all jobs are done (e.g. to give clients time to
-  #: retrieve the job status)
-  _SHUTDOWN_LINGER = 5.0
-
-  def __init__(self):
-    """Initializes this class.
-
-    """
-    self._had_active_jobs = None
-    self._linger_timeout = None
-
-  def __call__(self, jq_prepare_result):
-    """Determines if master daemon is ready for shutdown.
-
-    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
-    @rtype: None or number
-    @return: None if master daemon is ready, timeout if the check must be
-             repeated
-
-    """
-    if jq_prepare_result:
-      # Check again shortly
-      logging.info("Job queue has been notified for shutdown but is still"
-                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
-      self._had_active_jobs = True
-      return self._CHECK_INTERVAL
-
-    if not self._had_active_jobs:
-      # Can shut down as there were no active jobs on the first check
-      return None
-
-    # No jobs are running anymore, but maybe some clients want to collect some
-    # information. Give them a short amount of time.
-    if self._linger_timeout is None:
-      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
-
-    remaining = self._linger_timeout.Remaining()
-
-    logging.info("Job queue no longer busy; shutting down master daemon"
-                 " in %s seconds", remaining)
-
-    # TODO: Should the master daemon socket be closed at this point? Doing so
-    # wouldn't affect existing connections.
-
-    if remaining < 0:
-      return None
-    else:
-      return remaining
-
-
-class MasterServer(daemon.AsyncStreamServer):
-  """Master Server.
-
-  This is the main asynchronous master server. It handles connections to the
-  master socket.
-
-  """
-  family = socket.AF_UNIX
-
-  def __init__(self, address, uid, gid):
-    """MasterServer constructor
-
-    @param address: the unix socket address to bind the MasterServer to
-    @param uid: The uid of the owner of the socket
-    @param gid: The gid of the owner of the socket
-
-    """
-    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
-    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
-    os.chmod(temp_name, 0770)
-    os.chown(temp_name, uid, gid)
-    os.rename(temp_name, address)
-
-    self.awaker = daemon.AsyncAwaker()
-
-    # We'll only start threads once we've forked.
-    self.context = None
-    self.request_workers = None
-
-    self._shutdown_check = None
-
-  def handle_connection(self, connected_socket, client_address):
-    # TODO: add connection count and limit the number of open connections to a
-    # maximum number to avoid breaking for lack of file descriptors or memory.
-    MasterClientHandler(self, connected_socket, client_address, self.family)
-
-  def setup_context(self):
-    self.context = GanetiContext()
-    self.request_workers = workerpool.WorkerPool("ClientReq",
-                                                 CLIENT_REQUEST_WORKERS,
-                                                 ClientRequestWorker)
-
-  def WaitForShutdown(self):
-    """Prepares server for shutdown.
-
-    """
-    if self._shutdown_check is None:
-      self._shutdown_check = _MasterShutdownCheck()
-
-    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
-
-  def server_cleanup(self):
-    """Cleanup the server.
-
-    This involves shutting down the processor threads and the master
-    socket.
-
-    """
-    try:
-      self.close()
-    finally:
-      if self.request_workers:
-        self.request_workers.TerminateWorkers()
-      if self.context:
-        self.context.jobqueue.Shutdown()
-        self.context.livelock.close()
-
-
-class ClientOps(object):
-  """Class holding high-level client operations."""
-  def __init__(self, server):
-    self.server = server
-
-  @staticmethod
-  def _PickupJob(args, queue):
-    logging.info("Picking up new job from queue")
-    (job_id, ) = args
-    queue.PickupJob(job_id)
-    return job_id
-
-  @staticmethod
-  def _ChangeJobPriority(args, queue):
-    (job_id, priority) = args
-    logging.info("Received request to change priority for job %s to %s",
-                 job_id, priority)
-    return queue.ChangeJobPriority(job_id, priority)
-
-  def handle_request(self, method, args): # pylint: disable=R0911
-    context = self.server.context
-    queue = context.jobqueue
-
-    # TODO: Parameter validation
-    if not isinstance(args, (tuple, list)):
-      logging.info("Received invalid arguments of type '%s'", type(args))
-      raise ValueError("Invalid arguments type '%s'" % type(args))
-
-    if method not in luxi.REQ_ALL:
-      logging.info("Received invalid request '%s'", method)
-      raise ValueError("Invalid operation '%s'" % method)
-
-    job_id = None
-    if method == luxi.REQ_PICKUP_JOB:
-      job_id = self._PickupJob(args, queue)
-    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
-      job_id = self._ChangeJobPriority(args, queue)
-    else:
-      logging.info("Request '%s' not supported by masterd", method)
-      raise ValueError("Unsupported operation '%s'" % method)
-
-    return job_id
-
-
 class GanetiContext(object):
   """Context common to all ganeti threads.
 
@@ -366,56 +111,9 @@ class GanetiContext(object):
     # Add it to the configuration
     cfg.AddNode(node, ec_id)
 
-    # If preseeding fails it'll not be added
-    self.jobqueue.AddNode(node)
-
-  def ReaddNode(self, node):
-    """Updates a node that's already in the configuration
-
-    """
-    # Synchronize the queue again
-    self.jobqueue.AddNode(node)
-
   def RemoveNode(self, cfg, node):
     """Removes a node from the configuration and lock manager.
 
     """
     # Remove node from configuration
     cfg.RemoveNode(node.uuid)
-
-    # Notify job queue
-    self.jobqueue.RemoveNode(node.name)
-
-
-def _SetWatcherPause(context, ec_id, until):
-  """Creates or removes the watcher pause file.
-
-  @type context: L{GanetiContext}
-  @param context: Global Ganeti context
-  @type until: None or int
-  @param until: Unix timestamp saying until when the watcher shouldn't run
-
-  """
-  node_names = context.GetConfig(ec_id).GetNodeList()
-
-  if until is None:
-    logging.info("Received request to no longer pause watcher")
-  else:
-    if not ht.TNumber(until):
-      raise TypeError("Duration must be numeric")
-
-    if until < time.time():
-      raise errors.GenericError("Unable to set pause end time in the past")
-
-    logging.info("Received request to pause watcher until %s", until)
-
-  result = context.rpc.call_set_watcher_pause(node_names, until)
-
-  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
-                           for (node_name, nres) in result.items()
-                           if nres.fail_msg and not nres.offline)
-  if errmsg:
-    raise errors.OpExecError("Watcher pause was set where possible, but failed"
-                             " on the following node(s): %s" % errmsg)
-
-  return until
index 880f2e1..a5e05dd 100644 (file)
@@ -821,9 +821,8 @@ class NodeRequestHandler(http.server.HttpServerHandler):
     """Run a verify sequence on this node.
 
     """
-    (what, cluster_name, hvparams, node_groups, groups_cfg) = params
-    return backend.VerifyNode(what, cluster_name, hvparams,
-                              node_groups, groups_cfg)
+    (what, cluster_name, hvparams) = params
+    return backend.VerifyNode(what, cluster_name, hvparams)
 
   @classmethod
   def perspective_node_verify_light(cls, params):
@@ -946,10 +945,11 @@ class NodeRequestHandler(http.server.HttpServerHandler):
 
     """
     (node_uuids, node_names, master_candidate_uuids,
-     potential_master_candidates) = params
-    return backend.RenewSshKeys(node_uuids, node_names,
-                                master_candidate_uuids,
-                                potential_master_candidates)
+     potential_master_candidates, old_key_type, new_key_type,
+     new_key_bits) = params
+    return backend.RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
+                                potential_master_candidates, old_key_type,
+                                new_key_type, new_key_bits)
 
   @staticmethod
   def perspective_node_ssh_key_remove(params):
index 111493e..843d8af 100644 (file)
@@ -39,7 +39,6 @@ import sys
 import errno
 import logging
 
-from ganeti import compat
 from ganeti import errors
 from ganeti import constants
 from ganeti import utils
@@ -50,42 +49,7 @@ from ganeti import pathutils
 SSCONF_LOCK_TIMEOUT = 10
 
 #: Valid ssconf keys
-_VALID_KEYS = compat.UniqueFrozenset([
-  constants.SS_CLUSTER_NAME,
-  constants.SS_CLUSTER_TAGS,
-  constants.SS_FILE_STORAGE_DIR,
-  constants.SS_SHARED_FILE_STORAGE_DIR,
-  constants.SS_GLUSTER_STORAGE_DIR,
-  constants.SS_MASTER_CANDIDATES,
-  constants.SS_MASTER_CANDIDATES_IPS,
-  constants.SS_MASTER_CANDIDATES_CERTS,
-  constants.SS_MASTER_IP,
-  constants.SS_MASTER_NETDEV,
-  constants.SS_MASTER_NETMASK,
-  constants.SS_MASTER_NODE,
-  constants.SS_NODE_LIST,
-  constants.SS_NODE_PRIMARY_IPS,
-  constants.SS_NODE_SECONDARY_IPS,
-  constants.SS_NODE_VM_CAPABLE,
-  constants.SS_OFFLINE_NODES,
-  constants.SS_ONLINE_NODES,
-  constants.SS_PRIMARY_IP_FAMILY,
-  constants.SS_INSTANCE_LIST,
-  constants.SS_RELEASE_VERSION,
-  constants.SS_HYPERVISOR_LIST,
-  constants.SS_MAINTAIN_NODE_HEALTH,
-  constants.SS_UID_POOL,
-  constants.SS_NODEGROUPS,
-  constants.SS_NETWORKS,
-  constants.SS_HVPARAMS_XEN_PVM,
-  constants.SS_HVPARAMS_XEN_FAKE,
-  constants.SS_HVPARAMS_XEN_HVM,
-  constants.SS_HVPARAMS_XEN_KVM,
-  constants.SS_HVPARAMS_XEN_CHROOT,
-  constants.SS_HVPARAMS_XEN_LXC,
-  constants.SS_ENABLED_USER_SHUTDOWN,
-  constants.SS_SSH_PORTS,
-  ])
+_VALID_KEYS = constants.VALID_SS_KEYS
 
 #: Maximum size for ssconf files
 _MAX_SIZE = 128 * 1024
index 7d34f29..a8fe86d 100644 (file)
@@ -37,6 +37,7 @@ import logging
 import os
 import tempfile
 
+from collections import namedtuple
 from functools import partial
 
 from ganeti import utils
@@ -677,16 +678,19 @@ def QueryPubKeyFile(target_uuids, key_file=pathutils.SSH_PUB_KEYS,
   return result
 
 
-def InitSSHSetup(error_fn=errors.OpPrereqError, _homedir_fn=None,
-                 _suffix=""):
+def InitSSHSetup(key_type, key_bits, error_fn=errors.OpPrereqError,
+                 _homedir_fn=None, _suffix=""):
   """Setup the SSH configuration for the node.
 
   This generates a dsa keypair for root, adds the pub key to the
   permitted hosts and adds the hostkey to its own known hosts.
 
+  @param key_type: the type of SSH keypair to be generated
+  @param key_bits: the key length, in bits, to be used
+
   """
-  priv_key, _, auth_keys = GetUserFiles(constants.SSH_LOGIN_USER,
-                                        _homedir_fn=_homedir_fn)
+  priv_key, _, auth_keys = GetUserFiles(constants.SSH_LOGIN_USER, kind=key_type,
+                                        mkdir=True, _homedir_fn=_homedir_fn)
 
   new_priv_key_name = priv_key + _suffix
   new_pub_key_name = priv_key + _suffix + ".pub"
@@ -696,7 +700,7 @@ def InitSSHSetup(error_fn=errors.OpPrereqError, _homedir_fn=None,
       utils.CreateBackup(name)
     utils.RemoveFile(name)
 
-  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
+  result = utils.RunCmd(["ssh-keygen", "-b", str(key_bits), "-t", key_type,
                          "-f", new_priv_key_name,
                          "-q", "-N", ""])
   if result.failed:
@@ -706,16 +710,18 @@ def InitSSHSetup(error_fn=errors.OpPrereqError, _homedir_fn=None,
   AddAuthorizedKey(auth_keys, utils.ReadFile(new_pub_key_name))
 
 
-def InitPubKeyFile(master_uuid, key_file=pathutils.SSH_PUB_KEYS):
+def InitPubKeyFile(master_uuid, key_type, key_file=pathutils.SSH_PUB_KEYS):
   """Creates the public key file and adds the master node's SSH key.
 
   @type master_uuid: str
   @param master_uuid: the master node's UUID
+  @type key_type: one of L{constants.SSHK_ALL}
+  @param key_type: the type of ssh key to be used
   @type key_file: str
   @param key_file: name of the file containing the public keys
 
   """
-  _, pub_key, _ = GetUserFiles(constants.SSH_LOGIN_USER)
+  _, pub_key, _ = GetUserFiles(constants.SSH_LOGIN_USER, kind=key_type)
   ClearPubKeyFile(key_file=key_file)
   key = utils.ReadFile(pub_key)
   AddPublicKey(master_uuid, key, key_file=key_file)
@@ -1069,7 +1075,7 @@ def RunSshCmdWithStdin(cluster_name, node, basecmd, port, data,
 
 def ReadRemoteSshPubKeys(pub_key_file, node, cluster_name, port, ask_key,
                          strict_host_check):
-  """Fetches the public DSA SSH key from a node via SSH.
+  """Fetches a public SSH key from a node via SSH.
 
   @type pub_key_file: string
   @param pub_key_file: a tuple consisting of the file name of the public DSA key
@@ -1087,7 +1093,47 @@ def ReadRemoteSshPubKeys(pub_key_file, node, cluster_name, port, ask_key,
 
   result = utils.RunCmd(ssh_cmd)
   if result.failed:
-    raise errors.OpPrereqError("Could not fetch a public DSA SSH key from node"
+    raise errors.OpPrereqError("Could not fetch a public SSH key (%s) from node"
                                " '%s': ran command '%s', failure reason: '%s'."
-                               % (node, cmd, result.fail_reason))
+                               % (pub_key_file, node, cmd, result.fail_reason),
+                               errors.ECODE_INVAL)
   return result.stdout
+
+
+# Update gnt-cluster.rst when changing which combinations are valid.
+KeyBitInfo = namedtuple('KeyBitInfo', ['default', 'validation_fn'])
+SSH_KEY_VALID_BITS = {
+  constants.SSHK_DSA: KeyBitInfo(1024, lambda b: b == 1024),
+  constants.SSHK_RSA: KeyBitInfo(2048, lambda b: b >= 768),
+  constants.SSHK_ECDSA: KeyBitInfo(384, lambda b: b in [256, 384, 521]),
+}
+
+
+def DetermineKeyBits(key_type, key_bits, old_key_type, old_key_bits):
+  """Checks the key bits to be used for a given key type, or provides defaults.
+
+  @type key_type: one of L{constants.SSHK_ALL}
+  @param key_type: The key type to use.
+  @type key_bits: positive int or None
+  @param key_bits: The number of bits to use, if supplied by user.
+  @type old_key_type: one of L{constants.SSHK_ALL} or None
+  @param old_key_type: The previously used key type, if any.
+  @type old_key_bits: positive int or None
+  @param old_key_bits: The previously used number of bits, if any.
+
+  @rtype: positive int
+  @return: The number of bits to use.
+
+  """
+  if key_bits is None:
+    if old_key_type is not None and old_key_type == key_type:
+      key_bits = old_key_bits
+    else:
+      key_bits = SSH_KEY_VALID_BITS[key_type].default
+
+  if not SSH_KEY_VALID_BITS[key_type].validation_fn(key_bits):
+    raise errors.OpPrereqError("Invalid key type and bit size combination:"
+                               " %s with %s bits" % (key_type, key_bits),
+                               errors.ECODE_INVAL)
+
+  return key_bits
index e071b79..14e2e20 100644 (file)
@@ -59,11 +59,11 @@ from ganeti.utils import version
 #: Target major version we will upgrade to
 TARGET_MAJOR = 2
 #: Target minor version we will upgrade to
-TARGET_MINOR = 15
+TARGET_MINOR = 16
 #: Target major version for downgrade
 DOWNGRADE_MAJOR = 2
 #: Target minor version for downgrade
-DOWNGRADE_MINOR = 14
+DOWNGRADE_MINOR = 15
 
 # map of legacy device types
 # (mapping differing old LD_* constants to new DT_* constants)
@@ -183,8 +183,8 @@ class CfgUpgrade(object):
       self._Downgrade(config_major, config_minor, config_version,
                       config_revision)
 
-    # Upgrade from 2.{0..14} to 2.15
-    elif config_major == 2 and config_minor in range(0, 15):
+    # Upgrade from 2.{0..15} to 2.16
+    elif config_major == 2 and config_minor in range(0, 16):
       if config_revision != 0:
         logging.warning("Config revision is %s, not 0", config_revision)
       if not self.UpgradeAll():
@@ -307,24 +307,33 @@ class CfgUpgrade(object):
     cluster = self.config_data.get("cluster", None)
     if cluster is None:
       raise Error("Cannot find cluster")
+
     ipolicy = cluster.setdefault("ipolicy", None)
     if ipolicy:
       self.UpgradeIPolicy(ipolicy, constants.IPOLICY_DEFAULTS, False)
     ial_params = cluster.get("default_iallocator_params", None)
+
     if not ial_params:
       cluster["default_iallocator_params"] = {}
+
     if not "candidate_certs" in cluster:
       cluster["candidate_certs"] = {}
+
     cluster["instance_communication_network"] = \
       cluster.get("instance_communication_network", "")
+
     cluster["install_image"] = \
       cluster.get("install_image", "")
+
     cluster["zeroing_image"] = \
       cluster.get("zeroing_image", "")
+
     cluster["compression_tools"] = \
       cluster.get("compression_tools", constants.IEC_DEFAULT_TOOLS)
+
     if "enabled_user_shutdown" not in cluster:
       cluster["enabled_user_shutdown"] = False
+
     cluster["data_collectors"] = cluster.get("data_collectors", {})
     for name in constants.DATA_COLLECTOR_NAMES:
       cluster["data_collectors"][name] = \
@@ -332,6 +341,14 @@ class CfgUpgrade(object):
             name, dict(active=True,
                        interval=constants.MOND_TIME_INTERVAL * 1e6))
 
+    # These parameters are set to pre-2.16 default values, which
+    # differ from post-2.16 default values
+    if "ssh_key_type" not in cluster:
+      cluster["ssh_key_type"] = constants.SSHK_DSA
+
+    if "ssh_key_bits" not in cluster:
+      cluster["ssh_key_bits"] = 1024
+
   @OrFail("Upgrading groups")
   def UpgradeGroups(self):
     cl_ipolicy = self.config_data["cluster"].get("ipolicy")
@@ -701,19 +718,42 @@ class CfgUpgrade(object):
 
   # DOWNGRADE ------------------------------------------------------------
 
-  @classmethod
-  def DowngradeCollectors(cls, collectors):
-    if constants.DATA_COLLECTOR_XEN_CPU_LOAD in collectors:
-      del collectors[constants.DATA_COLLECTOR_XEN_CPU_LOAD]
+  @OrFail("Removing SSH parameters")
+  def DowngradeSshKeyParams(self):
+    """Removes the SSH key type and bits parameters from the config.
+
+    Also fails if these have been changed from values appropriate in lower
+    Ganeti versions.
 
-  def DowngradeCluster(self, cluster):
-    self.DowngradeCollectors(cluster["data_collectors"])
+    """
+    # pylint: disable=E1103
+    # Because config_data is a dictionary which has the get method.
+    cluster = self.config_data.get("cluster", None)
+    if cluster is None:
+      raise Error("Can't find the cluster entry in the configuration")
+
+    def _FetchAndDelete(key):
+      val = cluster.get(key, None)
+      if key in cluster:
+        del cluster[key]
+      return val
+
+    ssh_key_type = _FetchAndDelete("ssh_key_type")
+    _FetchAndDelete("ssh_key_bits")
+
+    if ssh_key_type is not None and ssh_key_type != "dsa":
+      raise Error("The current Ganeti setup is using non-DSA SSH keys, and"
+                  " versions below 2.16 do not support these. To downgrade,"
+                  " please perform a gnt-cluster renew-crypto using the "
+                  " --new-ssh-keys and --ssh-key-type=dsa options, generating"
+                  " DSA keys that older versions can also use.")
 
   def DowngradeAll(self):
-    self.DowngradeCluster(self.config_data["cluster"])
     self.config_data["version"] = version.BuildVersion(DOWNGRADE_MAJOR,
                                                        DOWNGRADE_MINOR, 0)
-    return True
+
+    self.DowngradeSshKeyParams()
+    return not self.errors
 
   def _ComposePaths(self):
     # We need to keep filenames locally because they might be renamed between
index 3297025..60fe169 100644 (file)
@@ -198,11 +198,13 @@ def LoadData(raw, data_check):
   return result
 
 
-def GenerateRootSshKeys(error_fn, _suffix="", _homedir_fn=None):
+def GenerateRootSshKeys(key_type, key_bits, error_fn, _suffix="",
+                        _homedir_fn=None):
   """Generates root's SSH keys for this node.
 
   """
-  ssh.InitSSHSetup(error_fn=error_fn, _homedir_fn=_homedir_fn, _suffix=_suffix)
+  ssh.InitSSHSetup(key_type, key_bits, error_fn=error_fn,
+                   _homedir_fn=_homedir_fn, _suffix=_suffix)
 
 
 def GenerateClientCertificate(
index 82a35dc..fa45a58 100644 (file)
@@ -50,7 +50,7 @@ from ganeti.tools import common
 _SSH_KEY_LIST_ITEM = \
   ht.TAnd(ht.TIsLength(3),
           ht.TItems([
-            ht.TElemOf(constants.SSHK_ALL),
+            ht.TSshKeyType,
             ht.Comment("public")(ht.TNonEmptyString),
             ht.Comment("private")(ht.TNonEmptyString),
           ]))
@@ -64,6 +64,8 @@ _DATA_CHECK = ht.TStrictDict(False, True, {
   constants.SSHS_SSH_ROOT_KEY: _SSH_KEY_LIST,
   constants.SSHS_SSH_AUTHORIZED_KEYS:
     ht.TDictOf(ht.TNonEmptyString, ht.TListOf(ht.TNonEmptyString)),
+  constants.SSHS_SSH_KEY_TYPE: ht.TSshKeyType,
+  constants.SSHS_SSH_KEY_BITS: ht.TPositive,
   })
 
 
@@ -172,7 +174,10 @@ def UpdateSshRoot(data, dry_run, _homedir_fn=None):
   if dry_run:
     logging.info("This is a dry run, not replacing the SSH keys.")
   else:
-    common.GenerateRootSshKeys(error_fn=JoinError, _homedir_fn=_homedir_fn)
+    ssh_key_type = data.get(constants.SSHS_SSH_KEY_TYPE)
+    ssh_key_bits = data.get(constants.SSHS_SSH_KEY_BITS)
+    common.GenerateRootSshKeys(ssh_key_type, ssh_key_bits, error_fn=JoinError,
+                               _homedir_fn=_homedir_fn)
 
   if authorized_keys:
     if dry_run:
index f9d1b6d..b37972e 100644 (file)
@@ -62,7 +62,13 @@ _DATA_CHECK = ht.TStrictDict(False, True, {
     ht.TItems(
       [ht.TElemOf(constants.SSHS_ACTIONS),
        ht.TDictOf(ht.TNonEmptyString, ht.TListOf(ht.TNonEmptyString))]),
-  constants.SSHS_GENERATE: ht.TDictOf(ht.TNonEmptyString, ht.TString),
+  constants.SSHS_GENERATE:
+    ht.TItems(
+      [ht.TSshKeyType, # The type of key to generate
+       ht.TPositive, # The number of bits in the key
+       ht.TString]), # The suffix
+  constants.SSHS_SSH_KEY_TYPE: ht.TSshKeyType,
+  constants.SSHS_SSH_KEY_BITS: ht.TPositive,
   })
 
 
@@ -190,11 +196,12 @@ def GenerateRootSshKeys(data, dry_run):
   """
   generate_info = data.get(constants.SSHS_GENERATE)
   if generate_info:
-    suffix = generate_info[constants.SSHS_SUFFIX]
+    key_type, key_bits, suffix = generate_info
     if dry_run:
       logging.info("This is a dry run, not generating any files.")
     else:
-      common.GenerateRootSshKeys(SshUpdateError, _suffix=suffix)
+      common.GenerateRootSshKeys(key_type, key_bits, SshUpdateError,
+                                 _suffix=suffix)
 
 
 def Main():
index bdd9761..ce89869 100644 (file)
@@ -65,6 +65,7 @@ from ganeti.utils.process import *
 from ganeti.utils.retry import *
 from ganeti.utils.security import *
 from ganeti.utils.storage import *
+from ganeti.utils.tags import *
 from ganeti.utils.text import *
 from ganeti.utils.wrapper import *
 from ganeti.utils.version import *
similarity index 68%
copy from test/py/cmdlib/testsupport/pathutils_mock.py
copy to lib/utils/tags.py
index 9e59dc4..1353901 100644 (file)
 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
+"""Utility functions for tag related operations
 
-"""Support for mocking the ssh module"""
+"""
 
+from ganeti import constants
 
-from cmdlib.testsupport.util import patchModule
 
+def GetExclusionPrefixes(ctags):
+  """Extract the exclusion tag prefixes from the cluster tags
+
+  """
+  prefixes = set([])
+  for tag in ctags:
+    if tag.startswith(constants.EX_TAGS_PREFIX):
+      prefixes.add(tag[len(constants.EX_TAGS_PREFIX):])
+  return prefixes
 
-# pylint: disable=C0103
-def patchPathutils(module_under_test):
-  """Patches the L{ganeti.pathutils} module for tests.
 
-  This function is meant to be used as a decorator for test methods.
+def IsGoodTag(prefixes, tag):
+  """Decide if a string is a tag
 
-  @type module_under_test: string
-  @param module_under_test: the module within cmdlib which is tested. The
-        "ganeti.cmdlib" prefix is optional.
+  @param prefixes: set of prefixes that would indicate
+      the tag being suitable
+  @param tag: the tag in question
 
   """
-  return patchModule(module_under_test, "pathutils")
+  for prefix in prefixes:
+    if tag.startswith(prefix):
+      return True
+  return False
index dd0766e..41778f6 100644 (file)
@@ -43,6 +43,8 @@ from ganeti.rpc import errors
 
 
 class Client(cl.AbstractStubClient, stub.ClientRpcStub):
+  # R0904: Too many public methods
+  # pylint: disable=R0904
   """High-level WConfD client implementation.
 
   This uses a backing Transport-like class on top of which it
index f34677a..9b0374c 100644 (file)
@@ -206,6 +206,8 @@ INIT
 | [\--zeroing-image *image*]
 | [\--compression-tools [*tool*, [*tool*]]]
 | [\--user-shutdown {yes \| no}]
+| [\--ssh-key-type *type*]
+| [\--ssh-key-bits *bits*]
 | {*clustername*}
 
 This commands is only run once initially on the first node of the
@@ -271,7 +273,10 @@ The ``--no-etc-hosts`` option allows you to initialize the cluster
 without modifying the /etc/hosts file.
 
 The ``--no-ssh-init`` option allows you to initialize the cluster
-without creating or distributing SSH key pairs.
+without creating or distributing SSH key pairs. This also sets the
+cluster-wide configuration parameter ``modify ssh setup`` to False.
+When adding nodes, Ganeti will consider this parameter to determine
+whether to create and distributed SSH key pairs on new nodes as well.
 
 The ``--file-storage-dir``, ``--shared-file-storage-dir`` and
 ``--gluster-storage-dir`` options allow you set the directory to use for
@@ -632,6 +637,18 @@ of testing whether the executable exists. These requirements are
 compatible with the gzip command line options, allowing many tools to
 be easily wrapped and used.
 
+The ``--ssh-key-type`` and ``--ssh-key-bits`` options determine the
+properties of the SSH keys Ganeti generates and uses to execute
+commands on nodes. The supported types are currently 'dsa', 'rsa', and
+'ecdsa'. The supported bit sizes vary across keys, reflecting the
+options **ssh-keygen**\(1) exposes. These are currently:
+
+- dsa: 1024 bits
+- rsa: >=768 bits
+- ecdsa: 256, 384, or 521 bits
+
+Ganeti defaults to using 2048-bit RSA keys.
+
 MASTER-FAILOVER
 ~~~~~~~~~~~~~~~
 
@@ -864,6 +881,7 @@ RENEW-CRYPTO
 | \--spice-ca-certificate *spice-ca-cert*]
 | [\--new-ssh-keys] [\--no-ssh-key-check]
 | [\--new-cluster-domain-secret] [\--cluster-domain-secret *filename*]
+| [\--ssh-key-type *type*] | [\--ssh-key-bits *bits*]
 
 This command will stop all Ganeti daemons in the cluster and start
 them again once the new certificates and keys are replicated. The
@@ -905,6 +923,10 @@ cluster domain secret, and ``--cluster-domain-secret`` reads the
 secret from a file. The cluster domain secret is used to sign
 information exchanged between separate clusters via a third party.
 
+The options ``--ssh-key-type`` and ``ssh-key-bits`` determine the
+properties of the disk types used. They are described in more detail
+in the ``init`` option description.
+
 REPAIR-DISK-SIZES
 ~~~~~~~~~~~~~~~~~
 
@@ -1010,11 +1032,14 @@ List of error codes:
 VERIFY-DISKS
 ~~~~~~~~~~~~
 
-**verify-disks**
+**verify-disks** [\--node-group *nodegroup*]
 
 The command checks which instances have degraded DRBD disks and
 activates the disks of those instances.
 
+With ``--node-group``, restrict the verification to those nodes and
+instances that live in the named group.
+
 This command is run from the **ganeti-watcher** tool, which also
 has a different, complementary algorithm for doing this check.
 Together, these two should ensure that DRBD disks are kept
index 6614c12..e934d0e 100644 (file)
@@ -99,6 +99,13 @@ TEST-JOBQUEUE
 Executes a few tests on the job queue. This command might generate
 failed jobs deliberately.
 
+TEST_OSPARAMS
+~~~~~~~~~~~~~
+
+**test-osparams** {--os-parameters-secret *param*=*value*... }
+
+Tests secret os parameter transmission.
+
 LOCKS
 ~~~~~
 
index a29fd79..caad1d0 100644 (file)
@@ -333,6 +333,25 @@ vif\_type
     - ioemu
     - vif
 
+scsi\_controller\_type
+    Valid for the KVM hypervisor.
+
+    This parameter specifies which type of SCSI controller to use.
+    The possible options are:
+
+    - lsi [default]
+    - megasas
+    - virtio-scsi-pci
+
+kvm\_pci\_reservations
+    Valid for the KVM hypervisor.
+
+    The nubmer of PCI slots that QEMU will manage implicitly. By default Ganeti
+    will let QEMU use the first 12 slots (i.e. PCI slots 0-11) on its own and
+    will start adding disks and NICs from the 13rd slot (i.e. PCI slot 12)
+    onwards. So by default one can add 20 PCI devices (32 - 12). To support more
+    than that, this hypervisor parameter should be set accordingly (e.g. to 8).
+
 disk\_type
     Valid for the Xen HVM and KVM hypervisors.
 
index 0940d7f..bf3fff3 100644 (file)
@@ -62,13 +62,15 @@ The ``-g (--node-group)`` option is used to add the new node into a
 specific node group, specified by UUID or name. If only one node group
 exists you can skip this option, otherwise it's mandatory.
 
-The ``--no-node-setup`` option prevents Ganeti from performing the
-initial SSH setup on the new node. This means that Ganeti will not
-touch the SSH keys or the ``authorized_keys`` file of the node at
-all. Using this option, it lies in the administrators responsibility
-to ensure SSH connectivity between the hosts by other means. Note,
-that the equivalent of this option in ``gnt-cluster init`` is called
-``--no-ssh-init``.
+The ``--no-node-setup`` option that used to prevent Ganeti from performing
+the initial SSH setup on the new node is no longer valid. Instead,
+Ganeti consideres the ``modify ssh setup`` configuration parameter
+(which is set using ``--no-ssh-init`` during cluster initialization)
+to determine whether or not to do the SSH setup on a new node or not.
+If this parameter is set to ``False``, Ganeti will not touch the SSH
+keys or the ``authorized_keys`` file of the node at all. Using this option,
+it lies in the administrators responsibility to ensure SSH connectivity
+between the hosts by other means.
 
 The ``vm_capable``, ``master_capable``, ``ndparams``, ``diskstate`` and
 ``hvstate`` options are described in **ganeti**\(7), and are used to set
index 67408db..fd38baf 100644 (file)
@@ -116,6 +116,15 @@ The options that can be passed to the program are as follows:
   for possible allocations. In this way a useful decission can be made
   even in overloaded clusters.
 
+\--no-capacity-checks
+  Normally, hail will only consider those allocations where all instances
+  of a node can immediately restarted should that node fail. With this
+  option given, hail will check only N+1 redundancy for DRBD instances.
+
+\--restrict-allocation-to
+  Only consider alloctions on the specified nodes. This overrides any
+  restrictions given in the allocation request.
+
 \--simulate *description*
   Backend specification: similar to the **-t** option, this allows
   overriding the cluster data with a simulated cluster. For details
index fa93660..960c3f4 100644 (file)
@@ -10,6 +10,7 @@ SYNOPSIS
 --------
 
 **harep** [ [**-L** | **\--luxi** ] = *socket* ] [ --job-delay = *seconds* ]
+[ --dry-run ]
 
 **harep** \--version
 
@@ -18,7 +19,8 @@ DESCRIPTION
 
 Harep is the Ganeti auto-repair tool. It is able to detect that an instance is
 broken and to generate a sequence of jobs that will fix it, in accordance to the
-policies set by the administrator.
+policies set by the administrator. At the moment, only repairs for instances
+using the disk templates ``plain`` or ``drbd`` are supported.
 
 Harep is able to recognize what state an instance is in (healthy, suspended,
 needs repair, repair disallowed, pending repair, repair failed)
@@ -41,10 +43,13 @@ contain. The possible tags share the common structure::
 
 where ``<type>`` can have the following values:
 
-* ``fix-storage``: allow disk replacement or fix the backend without affecting the instance
-  itself (broken DRBD secondary)
-* ``migrate``: allow instance migration
-* ``failover``: allow instance reboot on the secondary
+* ``fix-storage``: allow disk replacement or fix the backend without affecting
+  the instance itself (broken DRBD secondary)
+* ``migrate``: allow instance migration. Note, however, that current harep does
+  not submit migrate jobs; so, currently, this permission level is equivalent to
+  ``fix-storage``.
+* ``failover``: allow instance reboot on the secondary; this action is taken, if
+  the primary node is offline.
 * ``reinstall``: allow disks to be recreated and the instance to be reinstalled
 
 Each element in the list of tags, includes all the authorizations of the
@@ -74,7 +79,14 @@ nodes being marked as offline by the administrator.
 Also harep currently works only for instances with the ``drbd`` and
 ``plain`` disk templates.
 
-Both these issues will be addressed by a new maintenance daemon in
+Using the data model of **htools**\(1), harep cannot distinguish between drained
+and offline nodes. In particular, it will (permission provided) failover
+instances also in situations where a migration would have been enough.
+In particular, handling of node draining is better done using **hbal**\(1),
+which will always submit migration jobs, however is the permission to fall
+back to failover.
+
+These issues will be addressed by a new maintenance daemon in
 future Ganeti versions, which will supersede harep.
 
 
@@ -90,6 +102,12 @@ The options that can be passed to the program are as follows:
   insert this much delay before the execution of repair jobs to allow the tool
   to continue processing instances.
 
+\--dry-run
+  only show which operations would be carried out, but do nothing, even on
+  instances where tags grant the appropriate permissions. Note that harep
+  keeps the state of repair operations in instance tags; therefore, only
+  the operations of the next round of actions can be inspected.
+
 .. vim: set textwidth=72 :
 .. Local Variables:
 .. mode: rst
index 8cc4a72..ec2e3d1 100644 (file)
@@ -139,6 +139,10 @@ following components:
 - standard deviation of the CPU load provided by MonD
 - the count of instances with primary and secondary in the same failure
   domain
+- the count of instances sharing the same exclusion tags which primary
+  instances placed in the same failure domain
+- the overall sum of dissatisfied desired locations among all cluster
+  instances
 
 The free memory and free disk values help ensure that all nodes are
 somewhat balanced in their resource usage. The reserved memory helps
@@ -147,8 +151,8 @@ instances, and that no node keeps too much memory reserved for
 N+1. And finally, the N+1 percentage helps guide the algorithm towards
 eliminating N+1 failures, if possible.
 
-Except for the N+1 failures, offline instances counts, and failure
-domain violation counts, we use the
+Except for the N+1 failures, offline instances counts, failure
+domain violation counts and desired locations count, we use the
 standard deviation since when used with values within a fixed range
 (we use percents expressed as values between zero and one) it gives
 consistent results across all metrics (there are some small issues
@@ -186,10 +190,10 @@ heuristic, instances from nodes with high CPU load will tend to move to
 nodes with less CPU load.
 
 On a perfectly balanced cluster (all nodes the same size, all
-instances the same size and spread across the nodes equally), the
-values for all metrics would be zero, with the exception of the total
-percentage of reserved memory. This doesn't happen too often in
-practice :)
+instances the same size and spread across the nodes equally,
+all desired locations satisfied), the values for all metrics
+would be zero, with the exception of the total percentage of
+reserved memory. This doesn't happen too often in practice :)
 
 OFFLINE INSTANCES
 ~~~~~~~~~~~~~~~~~
@@ -200,6 +204,21 @@ wrong calculations. For this reason, the algorithm subtracts the
 memory size of down instances from the free node memory of their
 primary node, in effect simulating the startup of such instances.
 
+DESIRED LOCATION TAGS
+~~~~~~~~~~~~~~~~~~~~~
+
+Sometimes, administrators want specific instances located in a particular,
+typically geographic, location. To suppoer this desired location tags are
+introduced.
+
+If the cluster is tagged *htools:desiredlocation:x* then tags starting with
+*x* are desired location tags. Instances can be assigned tags of the form *x*
+that means that instance wants to be placed on a node tagged with a location
+tag *x*. (That means that cluster should be tagged *htools:nlocation:x* too).
+
+Instance pinning is just heuristics, not a hard enforced requirement;
+it will only be achieved by the cluster metrics favouring such placements.
+
 EXCLUSION TAGS
 ~~~~~~~~~~~~~~
 
@@ -260,9 +279,10 @@ cluster tags *htools:nlocation:a*, *htools:nlocation:b*, etc
   This make make node tags of the form *a:\**, *b:\**, etc be considered
   to have a common cause of failure.
 
-Instances with primary and secondary node having a common cause of failure are
-considered badly placed. While such placements are always allowed, they count
-heavily towards the cluster score.
+Instances with primary and secondary node having a common cause of failure and
+instances sharing the same exclusion tag with primary nodes having a common
+failure are considered badly placed. While such placements are always allowed,
+they count heavily towards the cluster score.
 
 OPTIONS
 -------
index 1259f25..543d180 100644 (file)
@@ -26,7 +26,7 @@ Algorithm options:
 **[ \--min-disk *disk-ratio* ]**
 **[ -O *name...* ]**
 **[ \--independent-groups ]**
-
+**[ \--no-capacity-checks ]**
 
 Request options:
 
@@ -246,6 +246,11 @@ The options that can be passed to the program are as follows:
   tends to overestimate the capacity, as instances still have to be
   moved away from the existing not N+1 happy nodes.
 
+\--no-capacity-checks
+  Normally, hspace will only consider those allocations where all instances
+  of a node can immediately restarted should that node fail. With this
+  option given, hspace will check only N+1 redundancy for DRBD instances.
+
 -l *rounds*, \--max-length=*rounds*
   Restrict the number of instance allocations to this length. This is
   not very useful in practice, but can be used for testing hspace
index 84468b2..55f397c 100755 (executable)
@@ -1063,6 +1063,9 @@ def RunQa():
     "instance-add-restricted-by-disktemplates",
     qa_instance.TestInstanceCreationRestrictedByDiskTemplates)
 
+  RunTestIf("instance-add-osparams", qa_instance.TestInstanceAddOsParams)
+  RunTestIf("instance-add-osparams", qa_instance.TestSecretOsParams)
+
   # Test removing instance with offline drbd secondary
   if qa_config.TestEnabled(["instance-remove-drbd-offline",
                             "instance-add-drbd-disk"]):
index ac1d3a8..2199d00 100644 (file)
@@ -1195,6 +1195,63 @@ def _AssertSsconfCertFiles():
                       " '%s'." % (node, first_node))
 
 
+def _TestSSHKeyChanges(master_node):
+  """Tests a lot of SSH key type- and size- related functionality.
+
+  @type master_node: L{qa_config._QaNode}
+  @param master_node: The cluster master.
+
+  """
+  # Helper fn to avoid specifying base params too many times
+  def _RenewWithParams(new_params, verify=True, fail=False):
+    AssertCommand(["gnt-cluster", "renew-crypto", "--new-ssh-keys", "-f",
+                   "--no-ssh-key-check"] + new_params, fail=fail)
+    if not fail and verify:
+      AssertCommand(["gnt-cluster", "verify"])
+
+  # First test the simplest change
+  _RenewWithParams([])
+
+  # And stop here if vcluster
+  (vcluster_master, _) = qa_config.GetVclusterSettings()
+  if vcluster_master:
+    print "Skipping further SSH key replacement checks for vcluster"
+    return
+
+  # And the actual tests
+  with qa_config.AcquireManyNodesCtx(1, exclude=[master_node]) as nodes:
+    node_name = nodes[0].primary
+
+    # Another helper function for checking whether a specific key can log in
+    def _CheckLoginWithKey(key_path, fail=False):
+      AssertCommand(["ssh", "-oIdentityFile=%s" % key_path, "-oBatchMode=yes",
+                     "-oStrictHostKeyChecking=no", "-oIdentitiesOnly=yes",
+                     "-F/dev/null", node_name, "true"],
+                    fail=fail, forward_agent=False)
+
+    _RenewWithParams(["--ssh-key-type=dsa"])
+    _CheckLoginWithKey("/root/.ssh/id_dsa")
+    # Stash the key for now
+    old_key_backup = qa_utils.BackupFile(master_node.primary,
+                                         "/root/.ssh/id_dsa")
+
+    try:
+      _RenewWithParams(["--ssh-key-type=rsa"])
+      _CheckLoginWithKey("/root/.ssh/id_rsa")
+      # And check that we cannot log in with the old key
+      _CheckLoginWithKey(old_key_backup, fail=True)
+    finally:
+      AssertCommand(["rm", "-f", old_key_backup])
+
+    _RenewWithParams(["--ssh-key-bits=4096"])
+    _RenewWithParams(["--ssh-key-bits=521"], fail=True)
+
+    # Restore the cluster to its pristine state, skipping the verify as we did
+    # way too many already
+    _RenewWithParams(["--ssh-key-type=rsa", "--ssh-key-bits=2048"],
+                     verify=False)
+
+
 def TestClusterRenewCrypto():
   """gnt-cluster renew-crypto"""
   master = qa_config.GetMasterNode()
@@ -1266,9 +1323,8 @@ def TestClusterRenewCrypto():
     _AssertSsconfCertFiles()
     AssertCommand(["gnt-cluster", "verify"])
 
-    # Only renew SSH keys
-    AssertCommand(["gnt-cluster", "renew-crypto", "--force",
-                   "--new-ssh-keys", "--no-ssh-key-check"])
+    # Comprehensively test various types of SSH key changes
+    _TestSSHKeyChanges(master)
 
     # Restore RAPI certificate
     AssertCommand(["gnt-cluster", "renew-crypto", "--force",
@@ -1371,7 +1427,7 @@ def TestUpgrade():
 
   This tests the 'gnt-cluster upgrade' command by flipping
   between the current and a different version of Ganeti.
-  To also recover subtile points in the configuration up/down
+  To also recover subtle points in the configuration up/down
   grades, instances are left over both upgrades.
 
   """
@@ -1394,6 +1450,14 @@ def TestUpgrade():
     nodes = qa_config.AcquireManyNodes(n)
     live_instances.append(cf(nodes))
 
+  # 2.16 only - prior to performing a downgrade, we have to make sure that the
+  # SSH keys used are such that the lower version can still use them,
+  # regardless of cluster defaults.
+  if constants.VERSION_MINOR != 16:
+    raise qa_error.Error("Please remove the key type downgrade code in 2.17")
+  AssertCommand(["gnt-cluster", "renew-crypto", "--no-ssh-key-check", "-f",
+                 "--new-ssh-keys", "--ssh-key-type=dsa"])
+
   AssertRedirectedCommand(["gnt-cluster", "upgrade", "--to", other_version])
   AssertRedirectedCommand(["gnt-cluster", "verify"])
 
index da4381e..3650052 100644 (file)
@@ -47,6 +47,7 @@ import qa_daemon
 import qa_utils
 import qa_error
 
+from qa_filters import stdout_of
 from qa_utils import AssertCommand, AssertEqual, AssertIn
 from qa_utils import InstanceCheck, INST_DOWN, INST_UP, FIRST_ARG, RETURN_VALUE
 from qa_instance_utils import CheckSsconfInstanceList, \
@@ -1519,6 +1520,70 @@ def TestInstanceCommunication(instance, master):
   print result_output
 
 
+def _TestRedactionOfSecretOsParams(node, cmd, secret_keys):
+  """Tests redaction of secret os parameters
+
+  """
+  AssertCommand(["gnt-cluster", "modify", "--max-running-jobs", "1"])
+  debug_delay_id = int(stdout_of(["gnt-debug", "delay", "--print-jobid",
+                       "--submit", "300"]))
+  cmd_jid = int(stdout_of(cmd))
+  job_file_abspath = "%s/job-%s" % (pathutils.QUEUE_DIR, cmd_jid)
+  job_file = qa_utils.MakeNodePath(node, job_file_abspath)
+
+  for k in secret_keys:
+    grep_cmd = ["grep", "\"%s\":\"<redacted>\"" % k, job_file]
+    AssertCommand(grep_cmd)
+
+  AssertCommand(["gnt-job", "cancel", "--kill", "--yes-do-it",
+                str(debug_delay_id)])
+  AssertCommand(["gnt-cluster", "modify", "--max-running-jobs", "20"])
+  AssertCommand(["gnt-job", "wait", str(cmd_jid)])
+
+
+def TestInstanceAddOsParams():
+  """Tests instance add with secret os parameters"""
+
+  if not qa_config.IsTemplateSupported(constants.DT_PLAIN):
+    return
+
+  master = qa_config.GetMasterNode()
+  instance = qa_config.AcquireInstance()
+
+  secret_keys = ["param1", "param2"]
+  cmd = (["gnt-instance", "add",
+          "--os-type=%s" % qa_config.get("os"),
+          "--disk-template=%s" % constants.DT_PLAIN,
+          "--os-parameters-secret",
+          "param1=secret1,param2=secret2",
+          "--node=%s" % master.primary] +
+          GetGenericAddParameters(instance, constants.DT_PLAIN))
+  cmd.append("--submit")
+  cmd.append("--print-jobid")
+  cmd.append(instance.name)
+
+  _TestRedactionOfSecretOsParams(master.primary, cmd, secret_keys)
+
+  TestInstanceRemove(instance)
+  instance.Release()
+
+
+def TestSecretOsParams():
+  """Tests secret os parameter transmission"""
+
+  master = qa_config.GetMasterNode()
+  secret_keys = ["param1", "param2"]
+  cmd = (["gnt-debug", "test-osparams", "--os-parameters-secret",
+         "param1=secret1,param2=secret2", "--submit", "--print-jobid"])
+  _TestRedactionOfSecretOsParams(master.primary, cmd, secret_keys)
+
+  cmd_output = stdout_of(["gnt-debug", "test-osparams",
+                         "--os-parameters-secret",
+                         "param1=secret1,param2=secret2"])
+  AssertIn("\'param1\': \'secret1\'", cmd_output)
+  AssertIn("\'param2\': \'secret2\'", cmd_output)
+
+
 available_instance_tests = [
   ("instance-add-plain-disk", constants.DT_PLAIN,
    TestInstanceAddWithPlainDisk, 1),
index 3dfe03f..a519b22 100644 (file)
@@ -175,7 +175,8 @@ def _PrintCommandOutput(stdout, stderr):
     print >> sys.stderr, stderr.rstrip('\n')
 
 
-def AssertCommand(cmd, fail=False, node=None, log_cmd=True, max_seconds=None):
+def AssertCommand(cmd, fail=False, node=None, log_cmd=True, forward_agent=True,
+                  max_seconds=None):
   """Checks that a remote command succeeds.
 
   @param cmd: either a string (the command to execute) or a list (to
@@ -188,6 +189,10 @@ def AssertCommand(cmd, fail=False, node=None, log_cmd=True, max_seconds=None):
       dict or a string)
   @param log_cmd: if False, the command won't be logged (simply passed to
       StartSSH)
+  @type forward_agent: boolean
+  @param forward_agent: whether to forward the agent when starting the SSH
+                        session or not, sometimes useful for crypto-related
+                        operations which can use a key they should not
   @type max_seconds: double
   @param max_seconds: fail if the command takes more than C{max_seconds}
       seconds
@@ -206,7 +211,8 @@ def AssertCommand(cmd, fail=False, node=None, log_cmd=True, max_seconds=None):
     cmdstr = utils.ShellQuoteArgs(cmd)
 
   start = datetime.datetime.now()
-  popen = StartSSH(nodename, cmdstr, log_cmd=log_cmd)
+  popen = StartSSH(nodename, cmdstr, log_cmd=log_cmd,
+                   forward_agent=forward_agent)
   # Run the command
   stdout, stderr = popen.communicate()
   rcode = popen.returncode
@@ -263,7 +269,7 @@ def AssertRedirectedCommand(cmd, fail=False, node=None, log_cmd=True):
 
 
 def GetSSHCommand(node, cmd, strict=True, opts=None, tty=False,
-                  use_multiplexer=True):
+                  use_multiplexer=True, forward_agent=True):
   """Builds SSH command to be executed.
 
   @type node: string
@@ -279,6 +285,8 @@ def GetSSHCommand(node, cmd, strict=True, opts=None, tty=False,
   @param tty: if we should use tty; if None, will be auto-detected
   @type use_multiplexer: boolean
   @param use_multiplexer: if the multiplexer for the node should be used
+  @type forward_agent: boolean
+  @param forward_agent: whether to forward the ssh agent or not
 
   """
   args = ["ssh", "-oEscapeChar=none", "-oBatchMode=yes", "-lroot"]
@@ -289,9 +297,14 @@ def GetSSHCommand(node, cmd, strict=True, opts=None, tty=False,
   if tty:
     args.append("-t")
 
+  # Multiplexers we use right now forward agents, so even if we ought to be
+  # using one, ignore it if agent forwarding is disabled.
+  if not forward_agent:
+    use_multiplexer = False
+
   args.append("-oStrictHostKeyChecking=%s" % ("yes" if strict else "no", ))
   args.append("-oClearAllForwardings=yes")
-  args.append("-oForwardAgent=yes")
+  args.append("-oForwardAgent=%s" % ("yes" if forward_agent else "no", ))
   if opts:
     args.extend(opts)
   if node in _MULTIPLEXERS and use_multiplexer:
@@ -335,12 +348,13 @@ def StartLocalCommand(cmd, _nolog_opts=False, log_cmd=True, **kwargs):
   return subprocess.Popen(cmd, shell=False, **kwargs)
 
 
-def StartSSH(node, cmd, strict=True, log_cmd=True):
+def StartSSH(node, cmd, strict=True, log_cmd=True, forward_agent=True):
   """Starts SSH.
 
   """
-  return StartLocalCommand(GetSSHCommand(node, cmd, strict=strict),
-                           _nolog_opts=True, log_cmd=log_cmd,
+  ssh_command = GetSSHCommand(node, cmd, strict=strict,
+                              forward_agent=forward_agent)
+  return StartLocalCommand(ssh_command, _nolog_opts=True, log_cmd=log_cmd,
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
 
index ace1df7..6a61cf2 100644 (file)
@@ -38,6 +38,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 module Ganeti.ConstantUtils where
 
 import Data.Char (ord)
+import Data.Monoid (Monoid(..))
 import Data.Set (Set)
 import qualified Data.Set as Set (difference, fromList, toList, union)
 
@@ -62,6 +63,10 @@ instance PyValue PythonNone where
 newtype FrozenSet a = FrozenSet { unFrozenSet :: Set a }
   deriving (Eq, Ord, Show)
 
+instance (Ord a) => Monoid (FrozenSet a) where
+  mempty = FrozenSet mempty
+  mappend (FrozenSet s) (FrozenSet t) = FrozenSet (mappend s t)
+
 -- | Converts a Haskell 'Set' into a Python 'frozenset'
 --
 -- This instance was supposed to be for 'Set' instead of 'FrozenSet'.
index 6452962..db5e565 100644 (file)
@@ -49,6 +49,7 @@ import Control.Arrow ((***),(&&&))
 import Data.List ((\\))
 import Data.Map (Map)
 import qualified Data.Map as Map (empty, fromList, keys, insert)
+import Data.Monoid
 
 import qualified AutoConf
 import Ganeti.ConstantUtils (PythonChar(..), FrozenSet, Protocol(..),
@@ -68,6 +69,7 @@ import Ganeti.Confd.Types (ConfdRequestType(..), ConfdReqField(..),
                            ConfdReplyStatus(..), ConfdNodeRole(..),
                            ConfdErrorType(..))
 import qualified Ganeti.Confd.Types as Types
+import qualified Ganeti.HTools.Tags.Constants as Tags
 
 {-# ANN module "HLint: ignore Use camelCase" #-}
 
@@ -1695,6 +1697,12 @@ hvKvmPath = "kvm_path"
 hvKvmDiskAio :: String
 hvKvmDiskAio = "disk_aio"
 
+hvKvmScsiControllerType :: String
+hvKvmScsiControllerType = "scsi_controller_type"
+
+hvKvmPciReservations :: String
+hvKvmPciReservations = "kvm_pci_reservations"
+
 hvKvmSpiceAudioCompr :: String
 hvKvmSpiceAudioCompr = "spice_playback_compression"
 
@@ -1901,6 +1909,8 @@ hvsParameterTypes = Map.fromList
   , (hvKvmMigrationCaps,                VTypeString)
   , (hvKvmPath,                         VTypeString)
   , (hvKvmDiskAio,                      VTypeString)
+  , (hvKvmScsiControllerType,           VTypeString)
+  , (hvKvmPciReservations,              VTypeInt)
   , (hvKvmSpiceAudioCompr,              VTypeBool)
   , (hvKvmSpiceBind,                    VTypeString)
   , (hvKvmSpiceIpVersion,               VTypeInt)
@@ -2641,6 +2651,12 @@ vncBasePort = 5900
 vncDefaultBindAddress :: String
 vncDefaultBindAddress = ip4AddressAny
 
+qemuPciSlots :: Int
+qemuPciSlots = 32
+
+qemuDefaultPciReservations :: Int
+qemuDefaultPciReservations = 12
+
 -- * NIC types
 
 htNicE1000 :: String
@@ -2725,6 +2741,25 @@ htDiskScsi = "scsi"
 htDiskSd :: String
 htDiskSd = "sd"
 
+htDiskScsiGeneric :: String
+htDiskScsiGeneric = "scsi-generic"
+
+htDiskScsiBlock :: String
+htDiskScsiBlock = "scsi-block"
+
+htDiskScsiCd :: String
+htDiskScsiCd = "scsi-cd"
+
+htDiskScsiHd :: String
+htDiskScsiHd = "scsi-hd"
+
+htScsiDeviceTypes :: FrozenSet String
+htScsiDeviceTypes =
+  ConstantUtils.mkSet [htDiskScsiGeneric,
+                       htDiskScsiBlock,
+                       htDiskScsiCd,
+                       htDiskScsiHd]
+
 htHvmValidDiskTypes :: FrozenSet String
 htHvmValidDiskTypes = ConstantUtils.mkSet [htDiskIoemu, htDiskParavirtual]
 
@@ -2735,7 +2770,28 @@ htKvmValidDiskTypes =
                        htDiskParavirtual,
                        htDiskPflash,
                        htDiskScsi,
-                       htDiskSd]
+                       htDiskSd,
+                       htDiskScsiGeneric,
+                       htDiskScsiBlock,
+                       htDiskScsiHd,
+                       htDiskScsiCd]
+
+-- * SCSI controller types
+
+htScsiControllerLsi :: String
+htScsiControllerLsi = "lsi"
+
+htScsiControllerVirtio :: String
+htScsiControllerVirtio = "virtio-scsi-pci"
+
+htScsiControllerMegasas :: String
+htScsiControllerMegasas = "megasas"
+
+htKvmValidScsiControllerTypes :: FrozenSet String
+htKvmValidScsiControllerTypes =
+  ConstantUtils.mkSet [htScsiControllerLsi,
+                       htScsiControllerVirtio,
+                       htScsiControllerMegasas]
 
 htCacheDefault :: String
 htCacheDefault = "default"
@@ -3089,6 +3145,12 @@ cvEnoden1 =
    Types.cVErrorCodeToRaw CvENODEN1,
    "Not enough memory to accommodate instance failovers")
 
+cvEextags :: (String, String, String)
+cvEextags =
+  ("node",
+   Types.cVErrorCodeToRaw CvEEXTAGS,
+   "Instances with same exclusion tag on the