801d64047f29ac0c499af2e44fa8a14ec8bd5268
[ganeti-github.git] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 import operator
36 import errno
37 from optparse import OptionParser
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import ht
52 from ganeti import pathutils
53
54 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.rapi.client import UsesRapiClient
56
57 from ganeti.watcher import nodemaint
58 from ganeti.watcher import state
59
60
61 MAXTRIES = 5
62 BAD_STATES = compat.UniqueFrozenset([
63 constants.INSTST_ERRORDOWN,
64 ])
65 HELPLESS_STATES = compat.UniqueFrozenset([
66 constants.INSTST_NODEDOWN,
67 constants.INSTST_NODEOFFLINE,
68 ])
69 NOTICE = "NOTICE"
70 ERROR = "ERROR"
71
72 #: Number of seconds to wait between starting child processes for node groups
73 CHILD_PROCESS_DELAY = 1.0
74
75 #: How many seconds to wait for instance status file lock
76 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77
78
79 class NotMasterError(errors.GenericError):
80 """Exception raised when this host is not the master."""
81
82
83 def ShouldPause():
84 """Check whether we should pause.
85
86 """
87 return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88
89
90 def StartNodeDaemons():
91 """Start all the daemons that should be running on all nodes.
92
93 """
94 # on master or not, try to start the node daemon
95 utils.EnsureDaemon(constants.NODED)
96 # start confd as well. On non candidates it will be in disabled mode.
97 if constants.ENABLE_CONFD:
98 utils.EnsureDaemon(constants.CONFD)
99 # start mond as well: all nodes need monitoring
100 if constants.ENABLE_MOND:
101 utils.EnsureDaemon(constants.MOND)
102
103
104
105 def RunWatcherHooks():
106 """Run the watcher hooks.
107
108 """
109 hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
110 constants.HOOKS_NAME_WATCHER)
111 if not os.path.isdir(hooks_dir):
112 return
113
114 try:
115 results = utils.RunParts(hooks_dir)
116 except Exception, err: # pylint: disable=W0703
117 logging.exception("RunParts %s failed: %s", hooks_dir, err)
118 return
119
120 for (relname, status, runresult) in results:
121 if status == constants.RUNPARTS_SKIP:
122 logging.debug("Watcher hook %s: skipped", relname)
123 elif status == constants.RUNPARTS_ERR:
124 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
125 elif status == constants.RUNPARTS_RUN:
126 if runresult.failed:
127 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
128 relname, runresult.exit_code, runresult.output)
129 else:
130 logging.debug("Watcher hook %s: success (output: %s)", relname,
131 runresult.output)
132 else:
133 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
134 status)
135
136
137 class Instance(object):
138 """Abstraction for a Virtual Machine instance.
139
140 """
141 def __init__(self, name, status, autostart, snodes):
142 self.name = name
143 self.status = status
144 self.autostart = autostart
145 self.snodes = snodes
146
147 def Restart(self, cl):
148 """Encapsulates the start of an instance.
149
150 """
151 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
152 cli.SubmitOpCode(op, cl=cl)
153
154 def ActivateDisks(self, cl):
155 """Encapsulates the activation of all disks of an instance.
156
157 """
158 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
159 cli.SubmitOpCode(op, cl=cl)
160
161
162 class Node:
163 """Data container representing cluster node.
164
165 """
166 def __init__(self, name, bootid, offline, secondaries):
167 """Initializes this class.
168
169 """
170 self.name = name
171 self.bootid = bootid
172 self.offline = offline
173 self.secondaries = secondaries
174
175
176 def _CheckInstances(cl, notepad, instances):
177 """Make a pass over the list of instances, restarting downed ones.
178
179 """
180 notepad.MaintainInstanceList(instances.keys())
181
182 started = set()
183
184 for inst in instances.values():
185 if inst.status in BAD_STATES:
186 n = notepad.NumberOfRestartAttempts(inst.name)
187
188 if n > MAXTRIES:
189 logging.warning("Not restarting instance '%s', retries exhausted",
190 inst.name)
191 continue
192
193 if n == MAXTRIES:
194 notepad.RecordRestartAttempt(inst.name)
195 logging.error("Could not restart instance '%s' after %s attempts,"
196 " giving up", inst.name, MAXTRIES)
197 continue
198
199 try:
200 logging.info("Restarting instance '%s' (attempt #%s)",
201 inst.name, n + 1)
202 inst.Restart(cl)
203 except Exception: # pylint: disable=W0703
204 logging.exception("Error while restarting instance '%s'", inst.name)
205 else:
206 started.add(inst.name)
207
208 notepad.RecordRestartAttempt(inst.name)
209
210 else:
211 if notepad.NumberOfRestartAttempts(inst.name):
212 notepad.RemoveInstance(inst.name)
213 if inst.status not in HELPLESS_STATES:
214 logging.info("Restart of instance '%s' succeeded", inst.name)
215
216 return started
217
218
219 def _CheckDisks(cl, notepad, nodes, instances, started):
220 """Check all nodes for restarted ones.
221
222 """
223 check_nodes = []
224
225 for node in nodes.values():
226 old = notepad.GetNodeBootID(node.name)
227 if not node.bootid:
228 # Bad node, not returning a boot id
229 if not node.offline:
230 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
231 node.name)
232 continue
233
234 if old != node.bootid:
235 # Node's boot ID has changed, probably through a reboot
236 check_nodes.append(node)
237
238 if check_nodes:
239 # Activate disks for all instances with any of the checked nodes as a
240 # secondary node.
241 for node in check_nodes:
242 for instance_name in node.secondaries:
243 try:
244 inst = instances[instance_name]
245 except KeyError:
246 logging.info("Can't find instance '%s', maybe it was ignored",
247 instance_name)
248 continue
249
250 if not inst.autostart:
251 logging.info("Skipping disk activation for non-autostart"
252 " instance '%s'", inst.name)
253 continue
254
255 if inst.name in started:
256 # we already tried to start the instance, which should have
257 # activated its drives (if they can be at all)
258 logging.debug("Skipping disk activation for instance '%s' as"
259 " it was already started", inst.name)
260 continue
261
262 try:
263 logging.info("Activating disks for instance '%s'", inst.name)
264 inst.ActivateDisks(cl)
265 except Exception: # pylint: disable=W0703
266 logging.exception("Error while activating disks for instance '%s'",
267 inst.name)
268
269 # Keep changed boot IDs
270 for node in check_nodes:
271 notepad.SetNodeBootID(node.name, node.bootid)
272
273
274 def _CheckForOfflineNodes(nodes, instance):
275 """Checks if given instances has any secondary in offline status.
276
277 @param instance: The instance object
278 @return: True if any of the secondary is offline, False otherwise
279
280 """
281 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
282
283
284 def _VerifyDisks(cl, uuid, nodes, instances):
285 """Run a per-group "gnt-cluster verify-disks".
286
287 """
288 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
289 ((_, offline_disk_instances, _), ) = \
290 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
291 cl.ArchiveJob(job_id)
292
293 if not offline_disk_instances:
294 # nothing to do
295 logging.debug("Verify-disks reported no offline disks, nothing to do")
296 return
297
298 logging.debug("Will activate disks for instance(s) %s",
299 utils.CommaJoin(offline_disk_instances))
300
301 # We submit only one job, and wait for it. Not optimal, but this puts less
302 # load on the job queue.
303 job = []
304 for name in offline_disk_instances:
305 try:
306 inst = instances[name]
307 except KeyError:
308 logging.info("Can't find instance '%s', maybe it was ignored", name)
309 continue
310
311 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
312 logging.info("Skipping instance '%s' because it is in a helpless state"
313 " or has offline secondaries", name)
314 continue
315
316 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
317
318 if job:
319 job_id = cli.SendJob(job, cl=cl)
320
321 try:
322 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
323 except Exception: # pylint: disable=W0703
324 logging.exception("Error while activating disks")
325
326
327 def IsRapiResponding(hostname):
328 """Connects to RAPI port and does a simple test.
329
330 Connects to RAPI port of hostname and does a simple test. At this time, the
331 test is GetVersion.
332
333 @type hostname: string
334 @param hostname: hostname of the node to connect to.
335 @rtype: bool
336 @return: Whether RAPI is working properly
337
338 """
339 curl_config = rapi.client.GenericCurlConfig()
340 rapi_client = rapi.client.GanetiRapiClient(hostname,
341 curl_config_fn=curl_config)
342 try:
343 master_version = rapi_client.GetVersion()
344 except rapi.client.CertificateError, err:
345 logging.warning("RAPI certificate error: %s", err)
346 return False
347 except rapi.client.GanetiApiError, err:
348 logging.warning("RAPI error: %s", err)
349 return False
350 else:
351 logging.debug("Reported RAPI version %s", master_version)
352 return master_version == constants.RAPI_VERSION
353
354
355 def ParseOptions():
356 """Parse the command line options.
357
358 @return: (options, args) as from OptionParser.parse_args()
359
360 """
361 parser = OptionParser(description="Ganeti cluster watcher",
362 usage="%prog [-d]",
363 version="%%prog (ganeti) %s" %
364 constants.RELEASE_VERSION)
365
366 parser.add_option(cli.DEBUG_OPT)
367 parser.add_option(cli.NODEGROUP_OPT)
368 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
369 help="Autoarchive jobs older than this age (default"
370 " 6 hours)")
371 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
372 action="store_true", help="Ignore cluster pause setting")
373 parser.add_option("--wait-children", dest="wait_children",
374 action="store_true", help="Wait for child processes")
375 parser.add_option("--no-wait-children", dest="wait_children",
376 action="store_false",
377 help="Don't wait for child processes")
378 # See optparse documentation for why default values are not set by options
379 parser.set_defaults(wait_children=True)
380 options, args = parser.parse_args()
381 options.job_age = cli.ParseTimespec(options.job_age)
382
383 if args:
384 parser.error("No arguments expected")
385
386 return (options, args)
387
388
389 def _WriteInstanceStatus(filename, data):
390 """Writes the per-group instance status file.
391
392 The entries are sorted.
393
394 @type filename: string
395 @param filename: Path to instance status file
396 @type data: list of tuple; (instance name as string, status as string)
397 @param data: Instance name and status
398
399 """
400 logging.debug("Updating instance status file '%s' with %s instances",
401 filename, len(data))
402
403 utils.WriteFile(filename,
404 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
405 sorted(data))))
406
407
408 def _UpdateInstanceStatus(filename, instances):
409 """Writes an instance status file from L{Instance} objects.
410
411 @type filename: string
412 @param filename: Path to status file
413 @type instances: list of L{Instance}
414
415 """
416 _WriteInstanceStatus(filename, [(inst.name, inst.status)
417 for inst in instances])
418
419
420 def _ReadInstanceStatus(filename):
421 """Reads an instance status file.
422
423 @type filename: string
424 @param filename: Path to status file
425 @rtype: tuple; (None or number, list of lists containing instance name and
426 status)
427 @return: File's mtime and instance status contained in the file; mtime is
428 C{None} if file can't be read
429
430 """
431 logging.debug("Reading per-group instance status from '%s'", filename)
432
433 statcb = utils.FileStatHelper()
434 try:
435 content = utils.ReadFile(filename, preread=statcb)
436 except EnvironmentError, err:
437 if err.errno == errno.ENOENT:
438 logging.error("Can't read '%s', does not exist (yet)", filename)
439 else:
440 logging.exception("Unable to read '%s', ignoring", filename)
441 return (None, None)
442 else:
443 return (statcb.st.st_mtime, [line.split(None, 1)
444 for line in content.splitlines()])
445
446
447 def _MergeInstanceStatus(filename, pergroup_filename, groups):
448 """Merges all per-group instance status files into a global one.
449
450 @type filename: string
451 @param filename: Path to global instance status file
452 @type pergroup_filename: string
453 @param pergroup_filename: Path to per-group status files, must contain "%s"
454 to be replaced with group UUID
455 @type groups: sequence
456 @param groups: UUIDs of known groups
457
458 """
459 # Lock global status file in exclusive mode
460 lock = utils.FileLock.Open(filename)
461 try:
462 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
463 except errors.LockError, err:
464 # All per-group processes will lock and update the file. None of them
465 # should take longer than 10 seconds (the value of
466 # INSTANCE_STATUS_LOCK_TIMEOUT).
467 logging.error("Can't acquire lock on instance status file '%s', not"
468 " updating: %s", filename, err)
469 return
470
471 logging.debug("Acquired exclusive lock on '%s'", filename)
472
473 data = {}
474
475 # Load instance status from all groups
476 for group_uuid in groups:
477 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
478
479 if mtime is not None:
480 for (instance_name, status) in instdata:
481 data.setdefault(instance_name, []).append((mtime, status))
482
483 # Select last update based on file mtime
484 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
485 for (instance_name, status) in data.items()]
486
487 # Write the global status file. Don't touch file after it's been
488 # updated--there is no lock anymore.
489 _WriteInstanceStatus(filename, inststatus)
490
491
492 def GetLuxiClient(try_restart):
493 """Tries to connect to the master daemon.
494
495 @type try_restart: bool
496 @param try_restart: Whether to attempt to restart the master daemon
497
498 """
499 try:
500 return cli.GetClient()
501 except errors.OpPrereqError, err:
502 # this is, from cli.GetClient, a not-master case
503 raise NotMasterError("Not on master node (%s)" % err)
504
505 except luxi.NoMasterError, err:
506 if not try_restart:
507 raise
508
509 logging.warning("Master daemon seems to be down (%s), trying to restart",
510 err)
511
512 if not utils.EnsureDaemon(constants.MASTERD):
513 raise errors.GenericError("Can't start the master daemon")
514
515 # Retry the connection
516 return cli.GetClient()
517
518
519 def _StartGroupChildren(cl, wait):
520 """Starts a new instance of the watcher for every node group.
521
522 """
523 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
524 for arg in sys.argv)
525
526 result = cl.QueryGroups([], ["name", "uuid"], False)
527
528 children = []
529
530 for (idx, (name, uuid)) in enumerate(result):
531 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
532
533 if idx > 0:
534 # Let's not kill the system
535 time.sleep(CHILD_PROCESS_DELAY)
536
537 logging.debug("Spawning child for group '%s' (%s), arguments %s",
538 name, uuid, args)
539
540 try:
541 # TODO: Should utils.StartDaemon be used instead?
542 pid = os.spawnv(os.P_NOWAIT, args[0], args)
543 except Exception: # pylint: disable=W0703
544 logging.exception("Failed to start child for group '%s' (%s)",
545 name, uuid)
546 else:
547 logging.debug("Started with PID %s", pid)
548 children.append(pid)
549
550 if wait:
551 for pid in children:
552 logging.debug("Waiting for child PID %s", pid)
553 try:
554 result = utils.RetryOnSignal(os.waitpid, pid, 0)
555 except EnvironmentError, err:
556 result = str(err)
557
558 logging.debug("Child PID %s exited with status %s", pid, result)
559
560
561 def _ArchiveJobs(cl, age):
562 """Archives old jobs.
563
564 """
565 (arch_count, left_count) = cl.AutoArchiveJobs(age)
566 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
567
568
569 def _CheckMaster(cl):
570 """Ensures current host is master node.
571
572 """
573 (master, ) = cl.QueryConfigValues(["master_node"])
574 if master != netutils.Hostname.GetSysName():
575 raise NotMasterError("This is not the master node")
576
577
578 @UsesRapiClient
579 def _GlobalWatcher(opts):
580 """Main function for global watcher.
581
582 At the end child processes are spawned for every node group.
583
584 """
585 StartNodeDaemons()
586 RunWatcherHooks()
587
588 # Run node maintenance in all cases, even if master, so that old masters can
589 # be properly cleaned up
590 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
591 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
592
593 try:
594 client = GetLuxiClient(True)
595 except NotMasterError:
596 # Don't proceed on non-master nodes
597 return constants.EXIT_SUCCESS
598
599 # we are on master now
600 utils.EnsureDaemon(constants.RAPI)
601
602 # If RAPI isn't responding to queries, try one restart
603 logging.debug("Attempting to talk to remote API on %s",
604 constants.IP4_ADDRESS_LOCALHOST)
605 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
606 logging.warning("Couldn't get answer from remote API, restaring daemon")
607 utils.StopDaemon(constants.RAPI)
608 utils.EnsureDaemon(constants.RAPI)
609 logging.debug("Second attempt to talk to remote API")
610 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
611 logging.fatal("RAPI is not responding")
612 logging.debug("Successfully talked to remote API")
613
614 _CheckMaster(client)
615 _ArchiveJobs(client, opts.job_age)
616
617 # Spawn child processes for all node groups
618 _StartGroupChildren(client, opts.wait_children)
619
620 return constants.EXIT_SUCCESS
621
622
623 def _GetGroupData(cl, uuid):
624 """Retrieves instances and nodes per node group.
625
626 """
627 job = [
628 # Get all primary instances in group
629 opcodes.OpQuery(what=constants.QR_INSTANCE,
630 fields=["name", "status", "admin_state", "snodes",
631 "pnode.group.uuid", "snodes.group.uuid"],
632 qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
633 use_locking=True),
634
635 # Get all nodes in group
636 opcodes.OpQuery(what=constants.QR_NODE,
637 fields=["name", "bootid", "offline"],
638 qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
639 use_locking=True),
640 ]
641
642 job_id = cl.SubmitJob(job)
643 results = map(objects.QueryResponse.FromDict,
644 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
645 cl.ArchiveJob(job_id)
646
647 results_data = map(operator.attrgetter("data"), results)
648
649 # Ensure results are tuples with two values
650 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
651
652 # Extract values ignoring result status
653 (raw_instances, raw_nodes) = [[map(compat.snd, values)
654 for values in res]
655 for res in results_data]
656
657 secondaries = {}
658 instances = []
659
660 # Load all instances
661 for (name, status, autostart, snodes, pnode_group_uuid,
662 snodes_group_uuid) in raw_instances:
663 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
664 logging.error("Ignoring split instance '%s', primary group %s, secondary"
665 " groups %s", name, pnode_group_uuid,
666 utils.CommaJoin(snodes_group_uuid))
667 else:
668 instances.append(Instance(name, status, autostart, snodes))
669
670 for node in snodes:
671 secondaries.setdefault(node, set()).add(name)
672
673 # Load all nodes
674 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
675 for (name, bootid, offline) in raw_nodes]
676
677 return (dict((node.name, node) for node in nodes),
678 dict((inst.name, inst) for inst in instances))
679
680
681 def _LoadKnownGroups():
682 """Returns a list of all node groups known by L{ssconf}.
683
684 """
685 groups = ssconf.SimpleStore().GetNodegroupList()
686
687 result = list(line.split(None, 1)[0] for line in groups
688 if line.strip())
689
690 if not compat.all(map(utils.UUID_RE.match, result)):
691 raise errors.GenericError("Ssconf contains invalid group UUID")
692
693 return result
694
695
696 def _GroupWatcher(opts):
697 """Main function for per-group watcher process.
698
699 """
700 group_uuid = opts.nodegroup.lower()
701
702 if not utils.UUID_RE.match(group_uuid):
703 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
704 " got '%s'" %
705 (cli.NODEGROUP_OPT_NAME, group_uuid))
706
707 logging.info("Watcher for node group '%s'", group_uuid)
708
709 known_groups = _LoadKnownGroups()
710
711 # Check if node group is known
712 if group_uuid not in known_groups:
713 raise errors.GenericError("Node group '%s' is not known by ssconf" %
714 group_uuid)
715
716 # Group UUID has been verified and should not contain any dangerous
717 # characters
718 state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
719 inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
720
721 logging.debug("Using state file %s", state_path)
722
723 # Global watcher
724 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
725 if not statefile:
726 return constants.EXIT_FAILURE
727
728 notepad = state.WatcherState(statefile) # pylint: disable=E0602
729 try:
730 # Connect to master daemon
731 client = GetLuxiClient(False)
732
733 _CheckMaster(client)
734
735 (nodes, instances) = _GetGroupData(client, group_uuid)
736
737 # Update per-group instance status file
738 _UpdateInstanceStatus(inst_status_path, instances.values())
739
740 _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
741 pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
742 known_groups)
743
744 started = _CheckInstances(client, notepad, instances)
745 _CheckDisks(client, notepad, nodes, instances, started)
746 _VerifyDisks(client, group_uuid, nodes, instances)
747 except Exception, err:
748 logging.info("Not updating status file due to failure: %s", err)
749 raise
750 else:
751 # Save changes for next run
752 notepad.Save(state_path)
753
754 return constants.EXIT_SUCCESS
755
756
757 def Main():
758 """Main function.
759
760 """
761 (options, _) = ParseOptions()
762
763 utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
764 debug=options.debug, stderr_logging=options.debug)
765
766 if ShouldPause() and not options.ignore_pause:
767 logging.debug("Pause has been set, exiting")
768 return constants.EXIT_SUCCESS
769
770 # Try to acquire global watcher lock in shared mode
771 lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
772 try:
773 lock.Shared(blocking=False)
774 except (EnvironmentError, errors.LockError), err:
775 logging.error("Can't acquire lock on %s: %s",
776 pathutils.WATCHER_LOCK_FILE, err)
777 return constants.EXIT_SUCCESS
778
779 if options.nodegroup is None:
780 fn = _GlobalWatcher
781 else:
782 # Per-nodegroup watcher
783 fn = _GroupWatcher
784
785 try:
786 return fn(options)
787 except (SystemExit, KeyboardInterrupt):
788 raise
789 except NotMasterError:
790 logging.debug("Not master, exiting")
791 return constants.EXIT_NOTMASTER
792 except errors.ResolverError, err:
793 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
794 return constants.EXIT_NODESETUP_ERROR
795 except errors.JobQueueFull:
796 logging.error("Job queue is full, can't query cluster state")
797 except errors.JobQueueDrainError:
798 logging.error("Job queue is drained, can't maintain cluster state")
799 except Exception, err:
800 logging.exception(str(err))
801 return constants.EXIT_FAILURE
802
803 return constants.EXIT_SUCCESS