Remove extra newline
[ganeti-github.git] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = compat.UniqueFrozenset([
63 constants.INSTST_ERRORDOWN,
64 ])
65 HELPLESS_STATES = compat.UniqueFrozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
68 ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72 #: Number of seconds to wait between starting child processes for node groups
73 CHILD_PROCESS_DELAY = 1.0
74
75 #: How many seconds to wait for instance status file lock
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77
78
79 class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
81
82
83 def ShouldPause():
84 """Check whether we should pause.
85
86 """
87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89
90 def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes.
92
93 """
94 # on master or not, try to start the node daemon
95 utils.EnsureDaemon(constants.NODED)
96 # start confd as well. On non candidates it will be in disabled mode.
97 if constants.ENABLE_CONFD:
98 utils.EnsureDaemon(constants.CONFD)
99 # start mond as well: all nodes need monitoring
100 if constants.ENABLE_MOND:
101 utils.EnsureDaemon(constants.MOND)
102
103
104 def RunWatcherHooks():
105 """Run the watcher hooks.
106
107 """
108 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109 constants.HOOKS_NAME_WATCHER)
110 if not os.path.isdir(hooks_dir):
111 return
112
113 try:
114 results = utils.RunParts(hooks_dir)
115 except Exception, err: # pylint: disable=W0703
116 logging.exception("RunParts %s failed: %s", hooks_dir, err)
117 return
118
119 for (relname, status, runresult) in results:
120 if status == constants.RUNPARTS_SKIP:
121 logging.debug("Watcher hook %s: skipped", relname)
122 elif status == constants.RUNPARTS_ERR:
123 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124 elif status == constants.RUNPARTS_RUN:
125 if runresult.failed:
126 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127 relname, runresult.exit_code, runresult.output)
128 else:
129 logging.debug("Watcher hook %s: success (output: %s)", relname,
130 runresult.output)
131 else:
132 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133 status)
134
135
136 class Instance(object):
137 """Abstraction for a Virtual Machine instance.
138
139 """
140 def __init__(self, name, status, autostart, snodes):
141 self.name = name
142 self.status = status
143 self.autostart = autostart
144 self.snodes = snodes
145
146 def Restart(self, cl):
147 """Encapsulates the start of an instance.
148
149 """
150 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151 cli.SubmitOpCode(op, cl=cl)
152
153 def ActivateDisks(self, cl):
154 """Encapsulates the activation of all disks of an instance.
155
156 """
157 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158 cli.SubmitOpCode(op, cl=cl)
159
160
161 class Node:
162 """Data container representing cluster node.
163
164 """
165 def __init__(self, name, bootid, offline, secondaries):
166 """Initializes this class.
167
168 """
169 self.name = name
170 self.bootid = bootid
171 self.offline = offline
172 self.secondaries = secondaries
173
174
175 def _CheckInstances(cl, notepad, instances):
176 """Make a pass over the list of instances, restarting downed ones.
177
178 """
179 notepad.MaintainInstanceList(instances.keys())
180
181 started = set()
182
183 for inst in instances.values():
184 if inst.status in BAD_STATES:
185 n = notepad.NumberOfRestartAttempts(inst.name)
186
187 if n > MAXTRIES:
188 logging.warning("Not restarting instance '%s', retries exhausted",
189 inst.name)
190 continue
191
192 if n == MAXTRIES:
193 notepad.RecordRestartAttempt(inst.name)
194 logging.error("Could not restart instance '%s' after %s attempts,"
195 " giving up", inst.name, MAXTRIES)
196 continue
197
198 try:
199 logging.info("Restarting instance '%s' (attempt #%s)",
200 inst.name, n + 1)
201 inst.Restart(cl)
202 except Exception: # pylint: disable=W0703
203 logging.exception("Error while restarting instance '%s'", inst.name)
204 else:
205 started.add(inst.name)
206
207 notepad.RecordRestartAttempt(inst.name)
208
209 else:
210 if notepad.NumberOfRestartAttempts(inst.name):
211 notepad.RemoveInstance(inst.name)
212 if inst.status not in HELPLESS_STATES:
213 logging.info("Restart of instance '%s' succeeded", inst.name)
214
215 return started
216
217
218 def _CheckDisks(cl, notepad, nodes, instances, started):
219 """Check all nodes for restarted ones.
220
221 """
222 check_nodes = []
223
224 for node in nodes.values():
225 old = notepad.GetNodeBootID(node.name)
226 if not node.bootid:
227 # Bad node, not returning a boot id
228 if not node.offline:
229 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230 node.name)
231 continue
232
233 if old != node.bootid:
234 # Node's boot ID has changed, probably through a reboot
235 check_nodes.append(node)
236
237 if check_nodes:
238 # Activate disks for all instances with any of the checked nodes as a
239 # secondary node.
240 for node in check_nodes:
241 for instance_name in node.secondaries:
242 try:
243 inst = instances[instance_name]
244 except KeyError:
245 logging.info("Can't find instance '%s', maybe it was ignored",
246 instance_name)
247 continue
248
249 if not inst.autostart:
250 logging.info("Skipping disk activation for non-autostart"
251 " instance '%s'", inst.name)
252 continue
253
254 if inst.name in started:
255 # we already tried to start the instance, which should have
256 # activated its drives (if they can be at all)
257 logging.debug("Skipping disk activation for instance '%s' as"
258 " it was already started", inst.name)
259 continue
260
261 try:
262 logging.info("Activating disks for instance '%s'", inst.name)
263 inst.ActivateDisks(cl)
264 except Exception: # pylint: disable=W0703
265 logging.exception("Error while activating disks for instance '%s'",
266 inst.name)
267
268 # Keep changed boot IDs
269 for node in check_nodes:
270 notepad.SetNodeBootID(node.name, node.bootid)
271
272
273 def _CheckForOfflineNodes(nodes, instance):
274 """Checks if given instances has any secondary in offline status.
275
276 @param instance: The instance object
277 @return: True if any of the secondary is offline, False otherwise
278
279 """
280 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281
282
283 def _VerifyDisks(cl, uuid, nodes, instances):
284 """Run a per-group "gnt-cluster verify-disks".
285
286 """
287 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
288 ((_, offline_disk_instances, _), ) = \
289 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
290 cl.ArchiveJob(job_id)
291
292 if not offline_disk_instances:
293 # nothing to do
294 logging.debug("Verify-disks reported no offline disks, nothing to do")
295 return
296
297 logging.debug("Will activate disks for instance(s) %s",
298 utils.CommaJoin(offline_disk_instances))
299
300 # We submit only one job, and wait for it. Not optimal, but this puts less
301 # load on the job queue.
302 job = []
303 for name in offline_disk_instances:
304 try:
305 inst = instances[name]
306 except KeyError:
307 logging.info("Can't find instance '%s', maybe it was ignored", name)
308 continue
309
310 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
311 logging.info("Skipping instance '%s' because it is in a helpless state"
312 " or has offline secondaries", name)
313 continue
314
315 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
316
317 if job:
318 job_id = cli.SendJob(job, cl=cl)
319
320 try:
321 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
322 except Exception: # pylint: disable=W0703
323 logging.exception("Error while activating disks")
324
325
326 def IsRapiResponding(hostname):
327 """Connects to RAPI port and does a simple test.
328
329 Connects to RAPI port of hostname and does a simple test. At this time, the
330 test is GetVersion.
331
332 @type hostname: string
333 @param hostname: hostname of the node to connect to.
334 @rtype: bool
335 @return: Whether RAPI is working properly
336
337 """
338 curl_config = rapi.client.GenericCurlConfig()
339 rapi_client = rapi.client.GanetiRapiClient(hostname,
340 curl_config_fn=curl_config)
341 try:
342 master_version = rapi_client.GetVersion()
343 except rapi.client.CertificateError, err:
344 logging.warning("RAPI certificate error: %s", err)
345 return False
346 except rapi.client.GanetiApiError, err:
347 logging.warning("RAPI error: %s", err)
348 return False
349 else:
350 logging.debug("Reported RAPI version %s", master_version)
351 return master_version == constants.RAPI_VERSION
352
353
354 def ParseOptions():
355 """Parse the command line options.
356
357 @return: (options, args) as from OptionParser.parse_args()
358
359 """
360 parser = OptionParser(description="Ganeti cluster watcher",
361 usage="%prog [-d]",
362 version="%%prog (ganeti) %s" %
363 constants.RELEASE_VERSION)
364
365 parser.add_option(cli.DEBUG_OPT)
366 parser.add_option(cli.NODEGROUP_OPT)
367 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
368 help="Autoarchive jobs older than this age (default"
369 " 6 hours)")
370 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
371 action="store_true", help="Ignore cluster pause setting")
372 parser.add_option("--wait-children", dest="wait_children",
373 action="store_true", help="Wait for child processes")
374 parser.add_option("--no-wait-children", dest="wait_children",
375 action="store_false",
376 help="Don't wait for child processes")
377 # See optparse documentation for why default values are not set by options
378 parser.set_defaults(wait_children=True)
379 options, args = parser.parse_args()
380 options.job_age = cli.ParseTimespec(options.job_age)
381
382 if args:
383 parser.error("No arguments expected")
384
385 return (options, args)
386
387
388 def _WriteInstanceStatus(filename, data):
389 """Writes the per-group instance status file.
390
391 The entries are sorted.
392
393 @type filename: string
394 @param filename: Path to instance status file
395 @type data: list of tuple; (instance name as string, status as string)
396 @param data: Instance name and status
397
398 """
399 logging.debug("Updating instance status file '%s' with %s instances",
400 filename, len(data))
401
402 utils.WriteFile(filename,
403 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
404 sorted(data))))
405
406
407 def _UpdateInstanceStatus(filename, instances):
408 """Writes an instance status file from L{Instance} objects.
409
410 @type filename: string
411 @param filename: Path to status file
412 @type instances: list of L{Instance}
413
414 """
415 _WriteInstanceStatus(filename, [(inst.name, inst.status)
416 for inst in instances])
417
418
419 def _ReadInstanceStatus(filename):
420 """Reads an instance status file.
421
422 @type filename: string
423 @param filename: Path to status file
424 @rtype: tuple; (None or number, list of lists containing instance name and
425 status)
426 @return: File's mtime and instance status contained in the file; mtime is
427 C{None} if file can't be read
428
429 """
430 logging.debug("Reading per-group instance status from '%s'", filename)
431
432 statcb = utils.FileStatHelper()
433 try:
434 content = utils.ReadFile(filename, preread=statcb)
435 except EnvironmentError, err:
436 if err.errno == errno.ENOENT:
437 logging.error("Can't read '%s', does not exist (yet)", filename)
438 else:
439 logging.exception("Unable to read '%s', ignoring", filename)
440 return (None, None)
441 else:
442 return (statcb.st.st_mtime, [line.split(None, 1)
443 for line in content.splitlines()])
444
445
446 def _MergeInstanceStatus(filename, pergroup_filename, groups):
447 """Merges all per-group instance status files into a global one.
448
449 @type filename: string
450 @param filename: Path to global instance status file
451 @type pergroup_filename: string
452 @param pergroup_filename: Path to per-group status files, must contain "%s"
453 to be replaced with group UUID
454 @type groups: sequence
455 @param groups: UUIDs of known groups
456
457 """
458 # Lock global status file in exclusive mode
459 lock = utils.FileLock.Open(filename)
460 try:
461 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
462 except errors.LockError, err:
463 # All per-group processes will lock and update the file. None of them
464 # should take longer than 10 seconds (the value of
465 # INSTANCE_STATUS_LOCK_TIMEOUT).
466 logging.error("Can't acquire lock on instance status file '%s', not"
467 " updating: %s", filename, err)
468 return
469
470 logging.debug("Acquired exclusive lock on '%s'", filename)
471
472 data = {}
473
474 # Load instance status from all groups
475 for group_uuid in groups:
476 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
477
478 if mtime is not None:
479 for (instance_name, status) in instdata:
480 data.setdefault(instance_name, []).append((mtime, status))
481
482 # Select last update based on file mtime
483 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
484 for (instance_name, status) in data.items()]
485
486 # Write the global status file. Don't touch file after it's been
487 # updated--there is no lock anymore.
488 _WriteInstanceStatus(filename, inststatus)
489
490
491 def GetLuxiClient(try_restart):
492 """Tries to connect to the master daemon.
493
494 @type try_restart: bool
495 @param try_restart: Whether to attempt to restart the master daemon
496
497 """
498 try:
499 return cli.GetClient()
500 except errors.OpPrereqError, err:
501 # this is, from cli.GetClient, a not-master case
502 raise NotMasterError("Not on master node (%s)" % err)
503
504 except luxi.NoMasterError, err:
505 if not try_restart:
506 raise
507
508 logging.warning("Master daemon seems to be down (%s), trying to restart",
509 err)
510
511 if not utils.EnsureDaemon(constants.MASTERD):
512 raise errors.GenericError("Can't start the master daemon")
513
514 # Retry the connection
515 return cli.GetClient()
516
517
518 def _StartGroupChildren(cl, wait):
519 """Starts a new instance of the watcher for every node group.
520
521 """
522 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
523 for arg in sys.argv)
524
525 result = cl.QueryGroups([], ["name", "uuid"], False)
526
527 children = []
528
529 for (idx, (name, uuid)) in enumerate(result):
530 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
531
532 if idx > 0:
533 # Let's not kill the system
534 time.sleep(CHILD_PROCESS_DELAY)
535
536 logging.debug("Spawning child for group '%s' (%s), arguments %s",
537 name, uuid, args)
538
539 try:
540 # TODO: Should utils.StartDaemon be used instead?
541 pid = os.spawnv(os.P_NOWAIT, args[0], args)
542 except Exception: # pylint: disable=W0703
543 logging.exception("Failed to start child for group '%s' (%s)",
544 name, uuid)
545 else:
546 logging.debug("Started with PID %s", pid)
547 children.append(pid)
548
549 if wait:
550 for pid in children:
551 logging.debug("Waiting for child PID %s", pid)
552 try:
553 result = utils.RetryOnSignal(os.waitpid, pid, 0)
554 except EnvironmentError, err:
555 result = str(err)
556
557 logging.debug("Child PID %s exited with status %s", pid, result)
558
559
560 def _ArchiveJobs(cl, age):
561 """Archives old jobs.
562
563 """
564 (arch_count, left_count) = cl.AutoArchiveJobs(age)
565 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
566
567
568 def _CheckMaster(cl):
569 """Ensures current host is master node.
570
571 """
572 (master, ) = cl.QueryConfigValues(["master_node"])
573 if master != netutils.Hostname.GetSysName():
574 raise NotMasterError("This is not the master node")
575
576
577 @UsesRapiClient
578 def _GlobalWatcher(opts):
579 """Main function for global watcher.
580
581 At the end child processes are spawned for every node group.
582
583 """
584 StartNodeDaemons()
585 RunWatcherHooks()
586
587 # Run node maintenance in all cases, even if master, so that old masters can
588 # be properly cleaned up
589 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
590 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
591
592 try:
593 client = GetLuxiClient(True)
594 except NotMasterError:
595 # Don't proceed on non-master nodes
596 return constants.EXIT_SUCCESS
597
598 # we are on master now
599 utils.EnsureDaemon(constants.RAPI)
600
601 # If RAPI isn't responding to queries, try one restart
602 logging.debug("Attempting to talk to remote API on %s",
603 constants.IP4_ADDRESS_LOCALHOST)
604 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
605 logging.warning("Couldn't get answer from remote API, restaring daemon")
606 utils.StopDaemon(constants.RAPI)
607 utils.EnsureDaemon(constants.RAPI)
608 logging.debug("Second attempt to talk to remote API")
609 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
610 logging.fatal("RAPI is not responding")
611 logging.debug("Successfully talked to remote API")
612
613 _CheckMaster(client)
614 _ArchiveJobs(client, opts.job_age)
615
616 # Spawn child processes for all node groups
617 _StartGroupChildren(client, opts.wait_children)
618
619 return constants.EXIT_SUCCESS
620
621
622 def _GetGroupData(cl, uuid):
623 """Retrieves instances and nodes per node group.
624
625 """
626 job = [
627 # Get all primary instances in group
628 opcodes.OpQuery(what=constants.QR_INSTANCE,
629 fields=["name", "status", "admin_state", "snodes",
630 "pnode.group.uuid", "snodes.group.uuid"],
631 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
632 use_locking=True),
633
634 # Get all nodes in group
635 opcodes.OpQuery(what=constants.QR_NODE,
636 fields=["name", "bootid", "offline"],
637 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
638 use_locking=True),
639 ]
640
641 job_id = cl.SubmitJob(job)
642 results = map(objects.QueryResponse.FromDict,
643 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
644 cl.ArchiveJob(job_id)
645
646 results_data = map(operator.attrgetter("data"), results)
647
648 # Ensure results are tuples with two values
649 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
650
651 # Extract values ignoring result status
652 (raw_instances, raw_nodes) = [[map(compat.snd, values)
653 for values in res]
654 for res in results_data]
655
656 secondaries = {}
657 instances = []
658
659 # Load all instances
660 for (name, status, autostart, snodes, pnode_group_uuid,
661 snodes_group_uuid) in raw_instances:
662 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
663 logging.error("Ignoring split instance '%s', primary group %s, secondary"
664 " groups %s", name, pnode_group_uuid,
665 utils.CommaJoin(snodes_group_uuid))
666 else:
667 instances.append(Instance(name, status, autostart, snodes))
668
669 for node in snodes:
670 secondaries.setdefault(node, set()).add(name)
671
672 # Load all nodes
673 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
674 for (name, bootid, offline) in raw_nodes]
675
676 return (dict((node.name, node) for node in nodes),
677 dict((inst.name, inst) for inst in instances))
678
679
680 def _LoadKnownGroups():
681 """Returns a list of all node groups known by L{ssconf}.
682
683 """
684 groups = ssconf.SimpleStore().GetNodegroupList()
685
686 result = list(line.split(None, 1)[0] for line in groups
687 if line.strip())
688
689 if not compat.all(map(utils.UUID_RE.match, result)):
690 raise errors.GenericError("Ssconf contains invalid group UUID")
691
692 return result
693
694
695 def _GroupWatcher(opts):
696 """Main function for per-group watcher process.
697
698 """
699 group_uuid = opts.nodegroup.lower()
700
701 if not utils.UUID_RE.match(group_uuid):
702 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
703 " got '%s'" %
704 (cli.NODEGROUP_OPT_NAME, group_uuid))
705
706 logging.info("Watcher for node group '%s'", group_uuid)
707
708 known_groups = _LoadKnownGroups()
709
710 # Check if node group is known
711 if group_uuid not in known_groups:
712 raise errors.GenericError("Node group '%s' is not known by ssconf" %
713 group_uuid)
714
715 # Group UUID has been verified and should not contain any dangerous
716 # characters
717 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
718 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
719
720 logging.debug("Using state file %s", state_path)
721
722 # Global watcher
723 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
724 if not statefile:
725 return constants.EXIT_FAILURE
726
727 notepad = state.WatcherState(statefile) # pylint: disable=E0602
728 try:
729 # Connect to master daemon
730 client = GetLuxiClient(False)
731
732 _CheckMaster(client)
733
734 (nodes, instances) = _GetGroupData(client, group_uuid)
735
736 # Update per-group instance status file
737 _UpdateInstanceStatus(inst_status_path, instances.values())
738
739 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
740 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
741 known_groups)
742
743 started = _CheckInstances(client, notepad, instances)
744 _CheckDisks(client, notepad, nodes, instances, started)
745 _VerifyDisks(client, group_uuid, nodes, instances)
746 except Exception, err:
747 logging.info("Not updating status file due to failure: %s", err)
748 raise
749 else:
750 # Save changes for next run
751 notepad.Save(state_path)
752
753 return constants.EXIT_SUCCESS
754
755
756 def Main():
757 """Main function.
758
759 """
760 (options, _) = ParseOptions()
761
762 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
763 debug=options.debug, stderr_logging=options.debug)
764
765 if ShouldPause() and not options.ignore_pause:
766 logging.debug("Pause has been set, exiting")
767 return constants.EXIT_SUCCESS
768
769 # Try to acquire global watcher lock in shared mode
770 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
771 try:
772 lock.Shared(blocking=False)
773 except (EnvironmentError, errors.LockError), err:
774 logging.error("Can't acquire lock on %s: %s",
775 pathutils.WATCHER_LOCK_FILE, err)
776 return constants.EXIT_SUCCESS
777
778 if options.nodegroup is None:
779 fn = _GlobalWatcher
780 else:
781 # Per-nodegroup watcher
782 fn = _GroupWatcher
783
784 try:
785 return fn(options)
786 except (SystemExit, KeyboardInterrupt):
787 raise
788 except NotMasterError:
789 logging.debug("Not master, exiting")
790 return constants.EXIT_NOTMASTER
791 except errors.ResolverError, err:
792 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
793 return constants.EXIT_NODESETUP_ERROR
794 except errors.JobQueueFull:
795 logging.error("Job queue is full, can't query cluster state")
796 except errors.JobQueueDrainError:
797 logging.error("Job queue is drained, can't maintain cluster state")
798 except Exception, err:
799 logging.exception(str(err))
800 return constants.EXIT_FAILURE
801
802 return constants.EXIT_SUCCESS