5a557c898be268084becc8082ac0c4e1c8999fcb
[ganeti-github.git] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Tool to restart erroneously downed virtual machines.
32
33 This program and set of classes implement a watchdog to restart
34 virtual machines in a Ganeti cluster that have crashed or been killed
35 by a node reboot. Run from cron or similar.
36
37 """
38
39 import os
40 import os.path
41 import sys
42 import signal
43 import time
44 import logging
45 import errno
46 from optparse import OptionParser
47
48 from ganeti import utils
49 from ganeti import wconfd
50 from ganeti import constants
51 from ganeti import compat
52 from ganeti import errors
53 from ganeti import opcodes
54 from ganeti import cli
55 import ganeti.rpc.errors as rpcerr
56 from ganeti import rapi
57 from ganeti import netutils
58 from ganeti import qlang
59 from ganeti import ssconf
60 from ganeti import ht
61 from ganeti import pathutils
62
63 import ganeti.rapi.client # pylint: disable=W0611
64 from ganeti.rapi.client import UsesRapiClient
65
66 from ganeti.watcher import nodemaint
67 from ganeti.watcher import state
68
69
70 MAXTRIES = 5
71 BAD_STATES = compat.UniqueFrozenset([
72 constants.INSTST_ERRORDOWN,
73 ])
74 HELPLESS_STATES = compat.UniqueFrozenset([
75 constants.INSTST_NODEDOWN,
76 constants.INSTST_NODEOFFLINE,
77 ])
78 NOTICE = "NOTICE"
79 ERROR = "ERROR"
80
81 #: Number of seconds to wait between starting child processes for node groups
82 CHILD_PROCESS_DELAY = 1.0
83
84 #: How many seconds to wait for instance status file lock
85 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
86
87
88 class NotMasterError(errors.GenericError):
89 """Exception raised when this host is not the master."""
90
91
92 def ShouldPause():
93 """Check whether we should pause.
94
95 """
96 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
97
98
99 def StartNodeDaemons():
100 """Start all the daemons that should be running on all nodes.
101
102 """
103 # on master or not, try to start the node daemon
104 utils.EnsureDaemon(constants.NODED)
105 # start confd as well. On non candidates it will be in disabled mode.
106 utils.EnsureDaemon(constants.CONFD)
107 # start mond as well: all nodes need monitoring
108 if constants.ENABLE_MOND:
109 utils.EnsureDaemon(constants.MOND)
110 # start kvmd, which will quit if not needed to run
111 utils.EnsureDaemon(constants.KVMD)
112
113
114 def RunWatcherHooks():
115 """Run the watcher hooks.
116
117 """
118 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
119 constants.HOOKS_NAME_WATCHER)
120 if not os.path.isdir(hooks_dir):
121 return
122
123 try:
124 results = utils.RunParts(hooks_dir)
125 except Exception, err: # pylint: disable=W0703
126 logging.exception("RunParts %s failed: %s", hooks_dir, err)
127 return
128
129 for (relname, status, runresult) in results:
130 if status == constants.RUNPARTS_SKIP:
131 logging.debug("Watcher hook %s: skipped", relname)
132 elif status == constants.RUNPARTS_ERR:
133 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
134 elif status == constants.RUNPARTS_RUN:
135 if runresult.failed:
136 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
137 relname, runresult.exit_code, runresult.output)
138 else:
139 logging.debug("Watcher hook %s: success (output: %s)", relname,
140 runresult.output)
141 else:
142 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
143 status)
144
145
146 class Instance(object):
147 """Abstraction for a Virtual Machine instance.
148
149 """
150 def __init__(self, name, status, config_state, config_state_source,
151 disks_active, snodes, disk_template):
152 self.name = name
153 self.status = status
154 self.config_state = config_state
155 self.config_state_source = config_state_source
156 self.disks_active = disks_active
157 self.snodes = snodes
158 self.disk_template = disk_template
159
160 def Restart(self, cl):
161 """Encapsulates the start of an instance.
162
163 """
164 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
165 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
166 "Restarting instance %s" % self.name,
167 utils.EpochNano())]
168 cli.SubmitOpCode(op, cl=cl)
169
170 def ActivateDisks(self, cl):
171 """Encapsulates the activation of all disks of an instance.
172
173 """
174 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
175 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
176 "Activating disks for instance %s" % self.name,
177 utils.EpochNano())]
178 cli.SubmitOpCode(op, cl=cl)
179
180 def NeedsCleanup(self):
181 """Determines whether the instance needs cleanup.
182
183 Determines whether the instance needs cleanup after having been
184 shutdown by the user.
185
186 @rtype: bool
187 @return: True if the instance needs cleanup, False otherwise.
188
189 """
190 return self.status == constants.INSTST_USERDOWN and \
191 self.config_state != constants.ADMINST_DOWN
192
193
194 class Node(object):
195 """Data container representing cluster node.
196
197 """
198 def __init__(self, name, bootid, offline, secondaries):
199 """Initializes this class.
200
201 """
202 self.name = name
203 self.bootid = bootid
204 self.offline = offline
205 self.secondaries = secondaries
206
207
208 def _CleanupInstance(cl, notepad, inst, locks):
209 n = notepad.NumberOfCleanupAttempts(inst.name)
210
211 if inst.name in locks:
212 logging.info("Not cleaning up instance '%s', instance is locked",
213 inst.name)
214 return
215
216 if n > MAXTRIES:
217 logging.warning("Not cleaning up instance '%s', retries exhausted",
218 inst.name)
219 return
220
221 logging.info("Instance '%s' was shutdown by the user, cleaning up instance",
222 inst.name)
223 op = opcodes.OpInstanceShutdown(instance_name=inst.name,
224 admin_state_source=constants.USER_SOURCE)
225
226 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
227 "Cleaning up instance %s" % inst.name,
228 utils.EpochNano())]
229 try:
230 cli.SubmitOpCode(op, cl=cl)
231 if notepad.NumberOfCleanupAttempts(inst.name):
232 notepad.RemoveInstance(inst.name)
233 except Exception: # pylint: disable=W0703
234 logging.exception("Error while cleaning up instance '%s'", inst.name)
235 notepad.RecordCleanupAttempt(inst.name)
236
237
238 def _CheckInstances(cl, notepad, instances, locks):
239 """Make a pass over the list of instances, restarting downed ones.
240
241 """
242 notepad.MaintainInstanceList(instances.keys())
243
244 started = set()
245
246 for inst in instances.values():
247 if inst.NeedsCleanup():
248 _CleanupInstance(cl, notepad, inst, locks)
249 elif inst.status in BAD_STATES:
250 n = notepad.NumberOfRestartAttempts(inst.name)
251
252 if n > MAXTRIES:
253 logging.warning("Not restarting instance '%s', retries exhausted",
254 inst.name)
255 continue
256
257 if n == MAXTRIES:
258 notepad.RecordRestartAttempt(inst.name)
259 logging.error("Could not restart instance '%s' after %s attempts,"
260 " giving up", inst.name, MAXTRIES)
261 continue
262
263 try:
264 logging.info("Restarting instance '%s' (attempt #%s)",
265 inst.name, n + 1)
266 inst.Restart(cl)
267 except Exception: # pylint: disable=W0703
268 logging.exception("Error while restarting instance '%s'", inst.name)
269 else:
270 started.add(inst.name)
271
272 notepad.RecordRestartAttempt(inst.name)
273
274 else:
275 if notepad.NumberOfRestartAttempts(inst.name):
276 notepad.RemoveInstance(inst.name)
277 if inst.status not in HELPLESS_STATES:
278 logging.info("Restart of instance '%s' succeeded", inst.name)
279
280 return started
281
282
283 def _CheckDisks(cl, notepad, nodes, instances, started):
284 """Check all nodes for restarted ones.
285
286 """
287 check_nodes = []
288
289 for node in nodes.values():
290 old = notepad.GetNodeBootID(node.name)
291 if not node.bootid:
292 # Bad node, not returning a boot id
293 if not node.offline:
294 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
295 node.name)
296 continue
297
298 if old != node.bootid:
299 # Node's boot ID has changed, probably through a reboot
300 check_nodes.append(node)
301
302 if check_nodes:
303 # Activate disks for all instances with any of the checked nodes as a
304 # secondary node.
305 for node in check_nodes:
306 for instance_name in node.secondaries:
307 try:
308 inst = instances[instance_name]
309 except KeyError:
310 logging.info("Can't find instance '%s', maybe it was ignored",
311 instance_name)
312 continue
313
314 if not inst.disks_active:
315 logging.info("Skipping disk activation for instance with not"
316 " activated disks '%s'", inst.name)
317 continue
318
319 if inst.name in started:
320 # we already tried to start the instance, which should have
321 # activated its drives (if they can be at all)
322 logging.debug("Skipping disk activation for instance '%s' as"
323 " it was already started", inst.name)
324 continue
325
326 try:
327 logging.info("Activating disks for instance '%s'", inst.name)
328 inst.ActivateDisks(cl)
329 except Exception: # pylint: disable=W0703
330 logging.exception("Error while activating disks for instance '%s'",
331 inst.name)
332
333 # Keep changed boot IDs
334 for node in check_nodes:
335 notepad.SetNodeBootID(node.name, node.bootid)
336
337
338 def _CheckForOfflineNodes(nodes, instance):
339 """Checks if given instances has any secondary in offline status.
340
341 @param instance: The instance object
342 @return: True if any of the secondary is offline, False otherwise
343
344 """
345 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
346
347
348 def _GetPendingVerifyDisks(cl, uuid):
349 """Checks if there are any currently running or pending group verify jobs and
350 if so, returns their id.
351
352 """
353 qfilter = qlang.MakeSimpleFilter("status",
354 frozenset([constants.JOB_STATUS_RUNNING,
355 constants.JOB_STATUS_QUEUED,
356 constants.JOB_STATUS_WAITING]))
357 qresult = cl.Query(constants.QR_JOB, ["id", "summary"], qfilter)
358
359 ids = [jobid for ((_, jobid), (_, (job, ))) in qresult.data
360 if job == ("GROUP_VERIFY_DISKS(%s)" % uuid)]
361 return ids
362
363
364 def _VerifyDisks(cl, uuid, nodes, instances, is_strict):
365 """Run a per-group "gnt-cluster verify-disks".
366
367 """
368
369 existing_jobs = _GetPendingVerifyDisks(cl, uuid)
370 if existing_jobs:
371 logging.info("There are verify disks jobs already pending (%s), skipping "
372 "VerifyDisks step for %s.",
373 utils.CommaJoin(existing_jobs), uuid)
374 return
375
376 op = opcodes.OpGroupVerifyDisks(
377 group_name=uuid, priority=constants.OP_PRIO_LOW, is_strict=is_strict)
378 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
379 "Verifying disks of group %s" % uuid,
380 utils.EpochNano())]
381 job_id = cl.SubmitJob([op])
382 ((_, offline_disk_instances, _), ) = \
383 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
384 cl.ArchiveJob(job_id)
385
386 if not offline_disk_instances:
387 # nothing to do
388 logging.debug("Verify-disks reported no offline disks, nothing to do")
389 return
390
391 logging.debug("Will activate disks for instance(s) %s",
392 utils.CommaJoin(offline_disk_instances))
393
394 # We submit only one job, and wait for it. Not optimal, but this puts less
395 # load on the job queue.
396 job = []
397 for name in offline_disk_instances:
398 try:
399 inst = instances[name]
400 except KeyError:
401 logging.info("Can't find instance '%s', maybe it was ignored", name)
402 continue
403
404 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
405 logging.info("Skipping instance '%s' because it is in a helpless state"
406 " or has offline secondaries", name)
407 continue
408
409 op = opcodes.OpInstanceActivateDisks(instance_name=name)
410 op.reason = [(constants.OPCODE_REASON_SRC_WATCHER,
411 "Activating disks for instance %s" % name,
412 utils.EpochNano())]
413 job.append(op)
414
415 if job:
416 job_id = cli.SendJob(job, cl=cl)
417
418 try:
419 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
420 except Exception: # pylint: disable=W0703
421 logging.exception("Error while activating disks")
422
423
424 def IsRapiResponding(hostname):
425 """Connects to RAPI port and does a simple test.
426
427 Connects to RAPI port of hostname and does a simple test. At this time, the
428 test is GetVersion.
429
430 If RAPI responds with error code "401 Unauthorized", the test is successful,
431 because the aim of this function is to assess whether RAPI is responding, not
432 if it is accessible.
433
434 @type hostname: string
435 @param hostname: hostname of the node to connect to.
436 @rtype: bool
437 @return: Whether RAPI is working properly
438
439 """
440 curl_config = rapi.client.GenericCurlConfig()
441 rapi_client = rapi.client.GanetiRapiClient(hostname,
442 curl_config_fn=curl_config)
443 try:
444 master_version = rapi_client.GetVersion()
445 except rapi.client.CertificateError, err:
446 logging.warning("RAPI certificate error: %s", err)
447 return False
448 except rapi.client.GanetiApiError, err:
449 if err.code == 401:
450 # Unauthorized, but RAPI is alive and responding
451 return True
452 else:
453 logging.warning("RAPI error: %s", err)
454 return False
455 else:
456 logging.debug("Reported RAPI version %s", master_version)
457 return master_version == constants.RAPI_VERSION
458
459
460 def IsWconfdResponding():
461 """Probes an echo RPC to WConfD.
462
463 """
464 probe_string = "ganeti watcher probe %d" % time.time()
465
466 try:
467 result = wconfd.Client().Echo(probe_string)
468 except Exception, err: # pylint: disable=W0703
469 logging.warning("WConfd connection error: %s", err)
470 return False
471
472 if result != probe_string:
473 logging.warning("WConfd echo('%s') returned '%s'", probe_string, result)
474 return False
475
476 return True
477
478
479 def ParseOptions():
480 """Parse the command line options.
481
482 @return: (options, args) as from OptionParser.parse_args()
483
484 """
485 parser = OptionParser(description="Ganeti cluster watcher",
486 usage="%prog [-d]",
487 version="%%prog (ganeti) %s" %
488 constants.RELEASE_VERSION)
489
490 parser.add_option(cli.DEBUG_OPT)
491 parser.add_option(cli.NODEGROUP_OPT)
492 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
493 help="Autoarchive jobs older than this age (default"
494 " 6 hours)")
495 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
496 action="store_true", help="Ignore cluster pause setting")
497 parser.add_option("--wait-children", dest="wait_children",
498 action="store_true", help="Wait for child processes")
499 parser.add_option("--no-wait-children", dest="wait_children",
500 action="store_false",
501 help="Don't wait for child processes")
502 parser.add_option("--no-verify-disks", dest="no_verify_disks", default=False,
503 action="store_true", help="Do not verify disk status")
504 parser.add_option("--no-strict", dest="no_strict",
505 default=False, action="store_true",
506 help="Do not run group verify in strict mode")
507 parser.add_option("--rapi-ip", dest="rapi_ip",
508 default=constants.IP4_ADDRESS_LOCALHOST,
509 help="Use this IP to talk to RAPI.")
510 # See optparse documentation for why default values are not set by options
511 parser.set_defaults(wait_children=True)
512 options, args = parser.parse_args()
513 options.job_age = cli.ParseTimespec(options.job_age)
514
515 if args:
516 parser.error("No arguments expected")
517
518 return (options, args)
519
520
521 def _WriteInstanceStatus(filename, data):
522 """Writes the per-group instance status file.
523
524 The entries are sorted.
525
526 @type filename: string
527 @param filename: Path to instance status file
528 @type data: list of tuple; (instance name as string, status as string)
529 @param data: Instance name and status
530
531 """
532 logging.debug("Updating instance status file '%s' with %s instances",
533 filename, len(data))
534
535 utils.WriteFile(filename,
536 data="\n".join("%s %s" % (n, s) for (n, s) in sorted(data)))
537
538
539 def _UpdateInstanceStatus(filename, instances):
540 """Writes an instance status file from L{Instance} objects.
541
542 @type filename: string
543 @param filename: Path to status file
544 @type instances: list of L{Instance}
545
546 """
547 _WriteInstanceStatus(filename, [(inst.name, inst.status)
548 for inst in instances])
549
550
551 def _ReadInstanceStatus(filename):
552 """Reads an instance status file.
553
554 @type filename: string
555 @param filename: Path to status file
556 @rtype: tuple; (None or number, list of lists containing instance name and
557 status)
558 @return: File's mtime and instance status contained in the file; mtime is
559 C{None} if file can't be read
560
561 """
562 logging.debug("Reading per-group instance status from '%s'", filename)
563
564 statcb = utils.FileStatHelper()
565 try:
566 content = utils.ReadFile(filename, preread=statcb)
567 except EnvironmentError, err:
568 if err.errno == errno.ENOENT:
569 logging.error("Can't read '%s', does not exist (yet)", filename)
570 else:
571 logging.exception("Unable to read '%s', ignoring", filename)
572 return (None, None)
573 else:
574 return (statcb.st.st_mtime, [line.split(None, 1)
575 for line in content.splitlines()])
576
577
578 def _MergeInstanceStatus(filename, pergroup_filename, groups):
579 """Merges all per-group instance status files into a global one.
580
581 @type filename: string
582 @param filename: Path to global instance status file
583 @type pergroup_filename: string
584 @param pergroup_filename: Path to per-group status files, must contain "%s"
585 to be replaced with group UUID
586 @type groups: sequence
587 @param groups: UUIDs of known groups
588
589 """
590 # Lock global status file in exclusive mode
591 lock = utils.FileLock.Open(filename)
592 try:
593 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
594 except errors.LockError, err:
595 # All per-group processes will lock and update the file. None of them
596 # should take longer than 10 seconds (the value of
597 # INSTANCE_STATUS_LOCK_TIMEOUT).
598 logging.error("Can't acquire lock on instance status file '%s', not"
599 " updating: %s", filename, err)
600 return
601
602 logging.debug("Acquired exclusive lock on '%s'", filename)
603
604 data = {}
605
606 # Load instance status from all groups
607 for group_uuid in groups:
608 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
609
610 if mtime is not None:
611 for (instance_name, status) in instdata:
612 data.setdefault(instance_name, []).append((mtime, status))
613
614 # Select last update based on file mtime
615 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
616 for (instance_name, status) in data.items()]
617
618 # Write the global status file. Don't touch file after it's been
619 # updated--there is no lock anymore.
620 _WriteInstanceStatus(filename, inststatus)
621
622
623 def GetLuxiClient(try_restart):
624 """Tries to connect to the luxi daemon.
625
626 @type try_restart: bool
627 @param try_restart: Whether to attempt to restart the master daemon
628
629 """
630 try:
631 return cli.GetClient()
632 except errors.OpPrereqError, err:
633 # this is, from cli.GetClient, a not-master case
634 raise NotMasterError("Not on master node (%s)" % err)
635
636 except (rpcerr.NoMasterError, rpcerr.TimeoutError), err:
637 if not try_restart:
638 raise
639
640 logging.warning("Luxi daemon seems to be down (%s), trying to restart",
641 err)
642
643 if not utils.EnsureDaemon(constants.LUXID):
644 raise errors.GenericError("Can't start the master daemon")
645
646 # Retry the connection
647 return cli.GetClient()
648
649
650 def _StartGroupChildren(cl, wait):
651 """Starts a new instance of the watcher for every node group.
652
653 """
654 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
655 for arg in sys.argv)
656
657 result = cl.QueryGroups([], ["name", "uuid"], False)
658
659 children = []
660
661 for (idx, (name, uuid)) in enumerate(result):
662 if idx > 0:
663 # Let's not kill the system
664 time.sleep(CHILD_PROCESS_DELAY)
665
666 logging.debug("Spawning child for group %r (%s).", name, uuid)
667
668 signal.signal(signal.SIGCHLD, signal.SIG_IGN)
669 try:
670 pid = os.fork()
671 except OSError:
672 logging.exception("Failed to fork for group %r (%s)", name, uuid)
673
674 if pid == 0:
675 (options, _) = ParseOptions()
676 options.nodegroup = uuid
677 _GroupWatcher(options)
678 return
679 else:
680 logging.debug("Started with PID %s", pid)
681 children.append(pid)
682
683 if wait:
684 for child in children:
685 logging.debug("Waiting for child PID %s", child)
686 try:
687 result = utils.RetryOnSignal(os.waitpid, child, 0)
688 except EnvironmentError, err:
689 result = str(err)
690 logging.debug("Child PID %s exited with status %s", child, result)
691
692
693 def _ArchiveJobs(cl, age):
694 """Archives old jobs.
695
696 """
697 (arch_count, left_count) = cl.AutoArchiveJobs(age)
698 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
699
700
701 def _CheckMaster(cl):
702 """Ensures current host is master node.
703
704 """
705 (master, ) = cl.QueryConfigValues(["master_node"])
706 if master != netutils.Hostname.GetSysName():
707 raise NotMasterError("This is not the master node")
708
709
710 @UsesRapiClient
711 def _GlobalWatcher(opts):
712 """Main function for global watcher.
713
714 At the end child processes are spawned for every node group.
715
716 """
717 StartNodeDaemons()
718 RunWatcherHooks()
719
720 # Run node maintenance in all cases, even if master, so that old masters can
721 # be properly cleaned up
722 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
723 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
724
725 try:
726 client = GetLuxiClient(True)
727 except NotMasterError:
728 # Don't proceed on non-master nodes
729 return constants.EXIT_SUCCESS
730
731 # we are on master now
732 utils.EnsureDaemon(constants.RAPI)
733 utils.EnsureDaemon(constants.WCONFD)
734 utils.EnsureDaemon(constants.MAINTD)
735
736 # If RAPI isn't responding to queries, try one restart
737 logging.debug("Attempting to talk to remote API on %s",
738 opts.rapi_ip)
739 if not IsRapiResponding(opts.rapi_ip):
740 logging.warning("Couldn't get answer from remote API, restaring daemon")
741 utils.StopDaemon(constants.RAPI)
742 utils.EnsureDaemon(constants.RAPI)
743 logging.debug("Second attempt to talk to remote API")
744 if not IsRapiResponding(opts.rapi_ip):
745 logging.fatal("RAPI is not responding")
746 logging.debug("Successfully talked to remote API")
747
748 # If WConfD isn't responding to queries, try one restart
749 logging.debug("Attempting to talk to WConfD")
750 if not IsWconfdResponding():
751 logging.warning("WConfD not responsive, restarting daemon")
752 utils.StopDaemon(constants.WCONFD)
753 utils.EnsureDaemon(constants.WCONFD)
754 logging.debug("Second attempt to talk to WConfD")
755 if not IsWconfdResponding():
756 logging.fatal("WConfD is not responding")
757
758 _CheckMaster(client)
759 _ArchiveJobs(client, opts.job_age)
760
761 # Spawn child processes for all node groups
762 _StartGroupChildren(client, opts.wait_children)
763
764 return constants.EXIT_SUCCESS
765
766
767 def _GetGroupData(qcl, uuid):
768 """Retrieves instances and nodes per node group.
769
770 """
771 locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None)
772
773 prefix = "instance/"
774 prefix_len = len(prefix)
775
776 locked_instances = set()
777
778 for [[_, name], [_, lock]] in locks.data:
779 if name.startswith(prefix) and lock:
780 locked_instances.add(name[prefix_len:])
781
782 queries = [
783 (constants.QR_INSTANCE,
784 ["name", "status", "admin_state", "admin_state_source", "disks_active",
785 "snodes", "pnode.group.uuid", "snodes.group.uuid", "disk_template"],
786 [qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
787 (constants.QR_NODE,
788 ["name", "bootid", "offline"],
789 [qlang.OP_EQUAL, "group.uuid", uuid]),
790 ]
791
792 results_data = [
793 qcl.Query(what, field, qfilter).data
794 for (what, field, qfilter) in queries
795 ]
796
797 # Ensure results are tuples with two values
798 assert compat.all(
799 ht.TListOf(ht.TListOf(ht.TIsLength(2)))(d) for d in results_data)
800
801 # Extract values ignoring result status
802 (raw_instances, raw_nodes) = [[map(compat.snd, values)
803 for values in res]
804 for res in results_data]
805
806 secondaries = {}
807 instances = []
808
809 # Load all instances
810 for (name, status, config_state, config_state_source, disks_active, snodes,
811 pnode_group_uuid, snodes_group_uuid, disk_template) in raw_instances:
812 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
813 logging.error("Ignoring split instance '%s', primary group %s, secondary"
814 " groups %s", name, pnode_group_uuid,
815 utils.CommaJoin(snodes_group_uuid))
816 else:
817 instances.append(Instance(name, status, config_state, config_state_source,
818 disks_active, snodes, disk_template))
819
820 for node in snodes:
821 secondaries.setdefault(node, set()).add(name)
822
823 # Load all nodes
824 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
825 for (name, bootid, offline) in raw_nodes]
826
827 return (dict((node.name, node) for node in nodes),
828 dict((inst.name, inst) for inst in instances),
829 locked_instances)
830
831
832 def _LoadKnownGroups():
833 """Returns a list of all node groups known by L{ssconf}.
834
835 """
836 groups = ssconf.SimpleStore().GetNodegroupList()
837
838 result = list(line.split(None, 1)[0] for line in groups
839 if line.strip())
840
841 if not compat.all(utils.UUID_RE.match(r) for r in result):
842 raise errors.GenericError("Ssconf contains invalid group UUID")
843
844 return result
845
846
847 def _GroupWatcher(opts):
848 """Main function for per-group watcher process.
849
850 """
851 group_uuid = opts.nodegroup.lower()
852
853 if not utils.UUID_RE.match(group_uuid):
854 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
855 " got '%s'" %
856 (cli.NODEGROUP_OPT_NAME, group_uuid))
857
858 logging.info("Watcher for node group '%s'", group_uuid)
859
860 known_groups = _LoadKnownGroups()
861
862 # Check if node group is known
863 if group_uuid not in known_groups:
864 raise errors.GenericError("Node group '%s' is not known by ssconf" %
865 group_uuid)
866
867 # Group UUID has been verified and should not contain any dangerous
868 # characters
869 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
870 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
871
872 logging.debug("Using state file %s", state_path)
873
874 # Group watcher file lock
875 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
876 if not statefile:
877 return constants.EXIT_FAILURE
878
879 notepad = state.WatcherState(statefile) # pylint: disable=E0602
880 try:
881 # Connect to master daemon
882 client = GetLuxiClient(False)
883
884 _CheckMaster(client)
885
886 (nodes, instances, locks) = _GetGroupData(client, group_uuid)
887
888 # Update per-group instance status file
889 _UpdateInstanceStatus(inst_status_path, instances.values())
890
891 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
892 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
893 known_groups)
894
895 started = _CheckInstances(client, notepad, instances, locks)
896 _CheckDisks(client, notepad, nodes, instances, started)
897 except Exception, err:
898 logging.info("Not updating status file due to failure: %s", err)
899 raise
900 else:
901 # Save changes for next run
902 notepad.Save(state_path)
903 notepad.Close()
904
905 # Check if the nodegroup only has ext storage type
906 only_ext = compat.all(i.disk_template == constants.DT_EXT
907 for i in instances.values())
908
909 # We skip current NodeGroup verification if there are only external storage
910 # devices. Currently we provide an interface for external storage provider
911 # for disk verification implementations, however current ExtStorageDevice
912 # does not provide an API for this yet.
913 #
914 # This check needs to be revisited if ES_ACTION_VERIFY on ExtStorageDevice
915 # is implemented.
916 if not opts.no_verify_disks and not only_ext:
917 is_strict = not opts.no_strict
918 _VerifyDisks(client, group_uuid, nodes, instances, is_strict=is_strict)
919
920 return constants.EXIT_SUCCESS
921
922
923 def Main():
924 """Main function.
925
926 """
927 (options, _) = ParseOptions()
928
929 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
930 debug=options.debug, stderr_logging=options.debug)
931
932 if ShouldPause() and not options.ignore_pause:
933 logging.debug("Pause has been set, exiting")
934 return constants.EXIT_SUCCESS
935
936 # Try to acquire global watcher lock in shared mode.
937 # In case we are in the global watcher process, this lock will be held by all
938 # children processes (one for each nodegroup) and will only be released when
939 # all of them have finished running.
940 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
941 try:
942 lock.Shared(blocking=False)
943 except (EnvironmentError, errors.LockError), err:
944 logging.error("Can't acquire lock on %s: %s",
945 pathutils.WATCHER_LOCK_FILE, err)
946 return constants.EXIT_SUCCESS
947 if options.nodegroup is None:
948 fn = _GlobalWatcher
949 else:
950 # Per-nodegroup watcher
951 fn = _GroupWatcher
952
953 try:
954 return fn(options)
955 except (SystemExit, KeyboardInterrupt):
956 raise
957 except NotMasterError:
958 logging.debug("Not master, exiting")
959 return constants.EXIT_NOTMASTER
960 except errors.ResolverError, err:
961 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
962 return constants.EXIT_NODESETUP_ERROR
963 except errors.JobQueueFull:
964 logging.error("Job queue is full, can't query cluster state")
965 except errors.JobQueueDrainError:
966 logging.error("Job queue is drained, can't maintain cluster state")
967 except Exception, err:
968 logging.exception(str(err))
969 return constants.EXIT_FAILURE
970
971 return constants.EXIT_SUCCESS