Projet

Général

Profil

Paste
Télécharger au format
Statistiques
| Branche: | Révision:

root / plugins / celery / celery_tasks_states @ 17f78427

Historique | Voir | Annoter | Télécharger (3,39 ko)

1
#!/usr/bin/env python
2
"""=cut
3
=head1 NAME
4

    
5
celery_tasks_states - Munin plugin to monitor the number of Celery tasks in each state.
6

    
7
=head1 REQUIREMENTS
8

    
9
 - Python
10
 - celery (http://celeryproject.org/)
11
 - celerymon (http://github.com/ask/celerymon)
12

    
13
Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option
14

    
15
=head1 CONFIGURATION
16

    
17
Default configuration:
18

    
19
  [celery_tasks_states]
20
	 env.api_url http://localhost:8989
21
	 env.workers all
22

    
23
If workers variable is not set or set to "all", task number for all the workers is monitored.
24

    
25
You can optionally set the workers variable to the string of hostnames you want to monitor separated by a comma.
26

    
27
For example:
28

    
29
  [celery_tasks]
30
	 env.workers localhost,foo.bar.net,bar.foo.net
31

    
32
This would only monitor the number of tasks for the workers with the hostnames "localhost", "foo.bar.net" and "bar.foo.net"
33

    
34
=head1 MAGIC MARKERS
35

    
36
  #%# family=manual
37
  #%# capabilities=autoconf
38

    
39
=head1 AUTHOR
40

    
41
Tomaz Muraus (http://github.com/Kami/munin-celery)
42

    
43
=head1 LICENSE
44

    
45
GPLv2
46

    
47
=cut"""
48

    
49
import os
50
import sys
51
import urllib
52

    
53
try:
54
	import json
55
except:
56
	import simplejson as json
57

    
58
API_URL = 'http://localhost:8989'
59
URL_ENDPOINTS = {
60
		'workers': '/api/worker/',
61
		'worker_tasks': '/api/worker/%s/tasks',
62
		'tasks': '/api/task/',
63
		'task_names': '/api/task/name/',
64
		'task_details': '/api/task/name/%s',
65
}
66
TASK_STATES = (
67
			'PENDING',
68
			'RECEIVED',
69
			'STARTED',
70
			'SUCCESS',
71
			'FAILURE',
72
			'REVOKED',
73
			'RETRY'
74
)
75

    
76
def get_data(what, api_url, *args):
77
	try:
78
		request = urllib.urlopen('%s%s' % (api_url, \
79
										   URL_ENDPOINTS[what] % (args)))
80
		response = request.read()
81
		return json.loads(response)
82
	except IOError:
83
		print 'Could not connect to the celerymon webserver'
84
		sys.exit(-1)
85

    
86
def check_web_server_status(api_url):
87
	try:
88
		request = urllib.urlopen(api_url)
89
		response = request.read()
90
	except IOError:
91
		print 'Could not connect to the celerymon webserver'
92
		sys.exit(-1)
93

    
94
def clean_state_name(state_name):
95
	return state_name.lower()
96

    
97
# Config
98
def print_config(workers = None):
99
	if workers:
100
		print 'graph_title Celery tasks in each state [workers = %s]' % (', ' . join(workers))
101
	else:
102
		print 'graph_title Celery tasks in each state'
103
	print 'graph_args --lower-limit 0'
104
	print 'graph_scale no'
105
	print 'graph_vlabel tasks per ${graph_period}'
106
	print 'graph_category cloud'
107

    
108
	for name in TASK_STATES:
109
		name = clean_state_name(name)
110
		print '%s.label %s' % (name, name)
111
		print '%s.type DERIVE' % (name)
112
		print '%s.min 0' % (name)
113
		print '%s.info number of %s tasks' % (name, name)
114

    
115
# Values
116
def print_values(workers = None, api_url = None):
117
	data = get_data('tasks', api_url)
118

    
119
	counters = dict([(key, 0) for key in TASK_STATES])
120
	for task_name, task_data in data:
121
		state = task_data['state']
122
		hostname = task_data['worker']['hostname']
123

    
124
		if workers and hostname not in workers:
125
			continue
126

    
127
		counters[state] += 1
128

    
129
	for name in TASK_STATES:
130
		name_cleaned = clean_state_name(name)
131
		value = counters[name]
132
		print '%s.value %d' % (name_cleaned, value)
133

    
134
if __name__ == '__main__':
135
	workers = os.environ.get('workers', 'all')
136
	api_url = os.environ.get('api_url', API_URL)
137

    
138
	check_web_server_status(api_url)
139

    
140
	if workers in [None, '', 'all']:
141
		workers = None
142
	else:
143
		workers = workers.split(',')
144

    
145
	if len(sys.argv) > 1:
146
		if sys.argv[1] == 'config':
147
			print_config(workers)
148
		elif sys.argv[1] == 'autoconf':
149
			print 'yes'
150
	else:
151
		print_values(workers, api_url)
152