from ganeti import errors
from ganeti import pathutils
from ganeti import utils
-from ganeti import serializer
from ganeti import ht
from ganeti import ssh
-from ganeti import ssconf
+from ganeti.tools import common
_SSH_KEY_LIST_ITEM = \
(opts, args) = parser.parse_args()
- return VerifyOptions(parser, opts, args)
-
-
-def VerifyOptions(parser, opts, args):
- """Verifies options and arguments for correctness.
-
- """
- if args:
- parser.error("No arguments are expected")
-
- return opts
+ return common.VerifyOptions(parser, opts, args)
def _VerifyCertificate(cert_pem, _check_fn=utils.CheckNodeCertificate):
_verify_fn(cert)
-def VerifyClusterName(data, _verify_fn=ssconf.VerifyClusterName):
- """Verifies cluster name.
-
- @type data: dict
-
- """
- name = data.get(constants.SSHS_CLUSTER_NAME)
- if name:
- _verify_fn(name)
- else:
- raise JoinError("Cluster name must be specified")
-
-
def _UpdateKeyFiles(keys, dry_run, keyfiles):
"""Updates SSH key files.
utils.AddAuthorizedKey(auth_keys_file, public_key)
-def LoadData(raw):
- """Parses and verifies input data.
-
- @rtype: dict
-
- """
- return serializer.LoadAndVerifyJson(raw, _DATA_CHECK)
-
-
def Main():
"""Main routine.
utils.SetupToolLogging(opts.debug, opts.verbose)
try:
- data = LoadData(sys.stdin.read())
+ data = common.LoadData(sys.stdin.read(), _DATA_CHECK)
# Check if input data is correct
- VerifyClusterName(data)
+ common.VerifyClusterName(data, JoinError)
VerifyCertificate(data)
# Update SSH files
import time
from ganeti import constants
+from ganeti import errors
+from ganeti import serializer
from ganeti import utils
from ganeti.tools import common
self.assertEqual(client_cert.get_subject().CN, my_node_name)
+class TestLoadData(unittest.TestCase):
+
+ def testNoJson(self):
+ self.assertRaises(errors.ParseError, common.LoadData, Exception, "")
+ self.assertRaises(errors.ParseError, common.LoadData, Exception, "}")
+
+ def testInvalidDataStructure(self):
+ raw = serializer.DumpJson({
+ "some other thing": False,
+ })
+ self.assertRaises(errors.ParseError, common.LoadData, Exception, raw)
+
+ raw = serializer.DumpJson([])
+ self.assertRaises(errors.ParseError, common.LoadData, Exception, raw)
+
+ def testValidData(self):
+ raw = serializer.DumpJson({})
+ self.assertEqual(common.LoadData(raw, Exception), {})
+
+
+class TestVerifyClusterName(unittest.TestCase):
+
+ class MyException(Exception):
+ pass
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ unittest.TestCase.tearDown(self)
+ shutil.rmtree(self.tmpdir)
+
+ def testNoName(self):
+ self.assertRaises(self.MyException, common.VerifyClusterName,
+ {}, self.MyException, _verify_fn=NotImplemented)
+
+ @staticmethod
+ def _FailingVerify(name):
+ assert name == "cluster.example.com"
+ raise errors.GenericError()
+
+ def testFailingVerification(self):
+ data = {
+ constants.SSHS_CLUSTER_NAME: "cluster.example.com",
+ }
+
+ self.assertRaises(errors.GenericError, common.VerifyClusterName,
+ data, Exception, _verify_fn=self._FailingVerify)
+
+
if __name__ == "__main__":
testutils.GanetiTestProgram()
import shutil
import tempfile
import os.path
-import OpenSSL
from ganeti import errors
from ganeti import constants
-from ganeti import serializer
from ganeti import pathutils
from ganeti import compat
from ganeti import utils
_JoinError = prepare_node_join.JoinError
-class TestLoadData(unittest.TestCase):
- def testNoJson(self):
- self.assertRaises(errors.ParseError, prepare_node_join.LoadData, "")
- self.assertRaises(errors.ParseError, prepare_node_join.LoadData, "}")
-
- def testInvalidDataStructure(self):
- raw = serializer.DumpJson({
- "some other thing": False,
- })
- self.assertRaises(errors.ParseError, prepare_node_join.LoadData, raw)
-
- raw = serializer.DumpJson([])
- self.assertRaises(errors.ParseError, prepare_node_join.LoadData, raw)
-
- def testValidData(self):
- raw = serializer.DumpJson({})
- self.assertEqual(prepare_node_join.LoadData(raw), {})
-
-
class TestVerifyCertificate(testutils.GanetiTestCase):
def setUp(self):
testutils.GanetiTestCase.setUp(self)
prepare_node_join._VerifyCertificate(cert_pem, _check_fn=self._Check)
-class TestVerifyClusterName(unittest.TestCase):
- def setUp(self):
- unittest.TestCase.setUp(self)
- self.tmpdir = tempfile.mkdtemp()
-
- def tearDown(self):
- unittest.TestCase.tearDown(self)
- shutil.rmtree(self.tmpdir)
-
- def testNoName(self):
- self.assertRaises(_JoinError, prepare_node_join.VerifyClusterName,
- {}, _verify_fn=NotImplemented)
-
- @staticmethod
- def _FailingVerify(name):
- assert name == "cluster.example.com"
- raise errors.GenericError()
-
- def testFailingVerification(self):
- data = {
- constants.SSHS_CLUSTER_NAME: "cluster.example.com",
- }
-
- self.assertRaises(errors.GenericError, prepare_node_join.VerifyClusterName,
- data, _verify_fn=self._FailingVerify)
-
-
class TestUpdateSshDaemon(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)