vigiboard / vigiboard / tests / functional / plugins / test_details_plugin.py @ b373a5de
History | View | Annotate | Download (9.23 KB)
1 |
# -*- coding: utf-8 -*-
|
---|---|
2 |
# Copyright (C) 2006-2011 CS-SI
|
3 |
# License: GNU GPL v2 <http://www.gnu.org/licenses/gpl-2.0.html>
|
4 |
|
5 |
"""
|
6 |
Teste le formulaire donnant les liens vers les outils extérieurs
|
7 |
et les données de l'historique.
|
8 |
"""
|
9 |
from nose.tools import assert_true, assert_equal |
10 |
from datetime import datetime |
11 |
import transaction |
12 |
|
13 |
from vigiboard.tests import TestController |
14 |
from vigilo.models.session import DBSession |
15 |
from vigilo.models.tables import SupItemGroup, User, UserGroup, \ |
16 |
Permission, DataPermission, StateName, \ |
17 |
LowLevelService, Event, CorrEvent, Host |
18 |
|
19 |
def insert_deps(return_service): |
20 |
"""
|
21 |
Insère les dépendances nécessaires aux tests.
|
22 |
|
23 |
@param return_service: Indique si les événements générés
|
24 |
concernent un hôte (False) ou un service de bas niveau (True).
|
25 |
@type return_service: C{bool}
|
26 |
@return: Renvoie un tuple avec le groupe d'hôte créé,
|
27 |
l'identifiant du L{CorrEvent} généré et enfin,
|
28 |
l'identifiant de l'L{Event} généré.
|
29 |
@rtype: C{tuple}
|
30 |
"""
|
31 |
timestamp = datetime.now() |
32 |
|
33 |
hostgroup = SupItemGroup(name=u'foo', parent=None) |
34 |
DBSession.add(hostgroup) |
35 |
|
36 |
host = Host( |
37 |
name=u'bar',
|
38 |
checkhostcmd=u'',
|
39 |
description=u'',
|
40 |
hosttpl=u'',
|
41 |
address=u'127.0.0.1',
|
42 |
snmpport=42,
|
43 |
snmpcommunity=u'public',
|
44 |
snmpversion=u'3',
|
45 |
weight=42,
|
46 |
) |
47 |
DBSession.add(host) |
48 |
DBSession.flush() |
49 |
|
50 |
hostgroup.supitems.append(host) |
51 |
DBSession.flush() |
52 |
|
53 |
servicegroup = SupItemGroup(name=u'bar', parent=None) |
54 |
DBSession.add(servicegroup) |
55 |
|
56 |
service = LowLevelService( |
57 |
host=host, |
58 |
command=u'',
|
59 |
weight=42,
|
60 |
servicename=u'baz',
|
61 |
) |
62 |
DBSession.add(service) |
63 |
DBSession.flush() |
64 |
|
65 |
servicegroup.supitems.append(service) |
66 |
DBSession.flush() |
67 |
|
68 |
event = Event( |
69 |
timestamp=timestamp, |
70 |
current_state=StateName.statename_to_value(u'WARNING'),
|
71 |
message=u'Hello world',
|
72 |
) |
73 |
if return_service:
|
74 |
event.supitem = service |
75 |
else:
|
76 |
event.supitem = host |
77 |
DBSession.add(event) |
78 |
DBSession.flush() |
79 |
|
80 |
correvent = CorrEvent( |
81 |
impact=42,
|
82 |
priority=42,
|
83 |
trouble_ticket=None,
|
84 |
status=u'None',
|
85 |
occurrence=42,
|
86 |
timestamp_active=timestamp, |
87 |
cause=event, |
88 |
) |
89 |
correvent.events.append(event) |
90 |
DBSession.add(correvent) |
91 |
DBSession.flush() |
92 |
|
93 |
usergroup = UserGroup.by_group_name(u'users_with_access')
|
94 |
DBSession.add(DataPermission( |
95 |
usergroup=usergroup, |
96 |
group=hostgroup, |
97 |
access=u'r',
|
98 |
)) |
99 |
DBSession.flush() |
100 |
|
101 |
transaction.commit() |
102 |
correvent = DBSession.query(CorrEvent).first() |
103 |
event = DBSession.query(Event).first() |
104 |
|
105 |
return (hostgroup, correvent.idcorrevent, event.idevent)
|
106 |
|
107 |
class TestDetailsPlugin(TestController): |
108 |
"""Teste le dialogue pour l'accès aux historiques."""
|
109 |
def setUp(self): |
110 |
super(TestDetailsPlugin, self).setUp() |
111 |
perm = Permission.by_permission_name(u'vigiboard-access')
|
112 |
|
113 |
user = User( |
114 |
user_name=u'access',
|
115 |
fullname=u'',
|
116 |
email=u'user.has@access',
|
117 |
) |
118 |
usergroup = UserGroup(group_name=u'users_with_access')
|
119 |
usergroup.permissions.append(perm) |
120 |
user.usergroups.append(usergroup) |
121 |
DBSession.add(user) |
122 |
DBSession.add(usergroup) |
123 |
DBSession.flush() |
124 |
|
125 |
user = User( |
126 |
user_name=u'no_access',
|
127 |
fullname=u'',
|
128 |
email=u'user.has.no@access',
|
129 |
) |
130 |
usergroup = UserGroup(group_name=u'users_without_access')
|
131 |
usergroup.permissions.append(perm) |
132 |
user.usergroups.append(usergroup) |
133 |
DBSession.add(user) |
134 |
DBSession.add(usergroup) |
135 |
DBSession.flush() |
136 |
|
137 |
transaction.commit() |
138 |
|
139 |
def test_details_plugin_LLS_alert_when_allowed(self): |
140 |
"""Dialogue des détails avec un LLS et les bons droits."""
|
141 |
hostgroup, idcorrevent, idcause = insert_deps(True)
|
142 |
|
143 |
response = self.app.post('/plugin_json', { |
144 |
'idcorrevent': idcorrevent,
|
145 |
'plugin_name': 'details', |
146 |
}, extra_environ={'REMOTE_USER': 'access'}) |
147 |
json = response.json |
148 |
|
149 |
# Le contenu de "eventdetails" varie facilement.
|
150 |
# On le teste séparément.
|
151 |
json.pop('eventdetails', None) |
152 |
assert_true('eventdetails' in response.json) |
153 |
|
154 |
assert_equal(json, { |
155 |
"idcorrevent": idcorrevent,
|
156 |
"idcause": idcause,
|
157 |
"service": "baz", |
158 |
"peak_state": "WARNING", |
159 |
"current_state": "WARNING", |
160 |
"host": "bar", |
161 |
"initial_state": "WARNING" |
162 |
}) |
163 |
|
164 |
def test_details_plugin_LLS_alert_when_manager(self): |
165 |
"""Dialogue des détails avec un LLS en tant que manager."""
|
166 |
hostgroup, idcorrevent, idcause = insert_deps(True)
|
167 |
|
168 |
response = self.app.post('/plugin_json', { |
169 |
'idcorrevent': idcorrevent,
|
170 |
'plugin_name': 'details', |
171 |
}, extra_environ={'REMOTE_USER': 'manager'}) |
172 |
json = response.json |
173 |
|
174 |
# Le contenu de "eventdetails" varie facilement.
|
175 |
# On le teste séparément.
|
176 |
json.pop('eventdetails', None) |
177 |
assert_true('eventdetails' in response.json) |
178 |
|
179 |
assert_equal(json, { |
180 |
"idcorrevent": idcorrevent,
|
181 |
"idcause": idcause,
|
182 |
"service": "baz", |
183 |
"peak_state": "WARNING", |
184 |
"current_state": "WARNING", |
185 |
"host": "bar", |
186 |
"initial_state": "WARNING" |
187 |
}) |
188 |
|
189 |
def test_details_plugin_host_alert_when_allowed(self): |
190 |
"""Dialogue des détails avec un hôte et les bons droits."""
|
191 |
hostgroup, idcorrevent, idcause = insert_deps(False)
|
192 |
|
193 |
response = self.app.post('/plugin_json', { |
194 |
'idcorrevent': idcorrevent,
|
195 |
'plugin_name': 'details', |
196 |
}, extra_environ={'REMOTE_USER': 'access'}) |
197 |
json = response.json |
198 |
|
199 |
# Le contenu de "eventdetails" varie facilement.
|
200 |
# On le teste séparément.
|
201 |
json.pop('eventdetails', None) |
202 |
assert_true('eventdetails' in response.json) |
203 |
|
204 |
assert_equal(json, { |
205 |
"idcorrevent": idcorrevent,
|
206 |
"idcause": idcause,
|
207 |
"service": None, |
208 |
"peak_state": "WARNING", |
209 |
"current_state": "WARNING", |
210 |
"host": "bar", |
211 |
"initial_state": "WARNING" |
212 |
}) |
213 |
|
214 |
def test_details_plugin_host_alert_when_manager(self): |
215 |
"""Dialogue des détails avec un hôte en tant que manager."""
|
216 |
hostgroup, idcorrevent, idcause = insert_deps(False)
|
217 |
|
218 |
response = self.app.post('/plugin_json', { |
219 |
'idcorrevent': idcorrevent,
|
220 |
'plugin_name': 'details', |
221 |
}, extra_environ={'REMOTE_USER': 'manager'}) |
222 |
json = response.json |
223 |
|
224 |
# Le contenu de "eventdetails" varie facilement.
|
225 |
# On le teste séparément.
|
226 |
json.pop('eventdetails', None) |
227 |
assert_true('eventdetails' in response.json) |
228 |
|
229 |
assert_equal(json, { |
230 |
"idcorrevent": idcorrevent,
|
231 |
"idcause": idcause,
|
232 |
"service": None, |
233 |
"peak_state": "WARNING", |
234 |
"current_state": "WARNING", |
235 |
"host": "bar", |
236 |
"initial_state": "WARNING" |
237 |
}) |
238 |
|
239 |
def test_details_plugin_LLS_when_forbidden(self): |
240 |
"""Dialogue des détails avec un LLS et des droits insuffisants."""
|
241 |
idcorrevent = insert_deps(True)[1] |
242 |
|
243 |
# Le contrôleur renvoie une erreur 404 (HTTPNotFound)
|
244 |
# lorsque l'utilisateur n'a pas les permissions nécessaires sur
|
245 |
# les données ou qu'aucun événement correspondant n'est trouvé.
|
246 |
self.app.post('/plugin_json', { |
247 |
'idcorrevent': idcorrevent,
|
248 |
'plugin_name': 'details', |
249 |
}, extra_environ={'REMOTE_USER': 'no_access'}, |
250 |
status=404)
|
251 |
|
252 |
def test_details_plugin_host_when_forbidden(self): |
253 |
"""Dialogue des détails avec un hôte et des droits insuffisants."""
|
254 |
idcorrevent = insert_deps(False)[1] |
255 |
|
256 |
# Le contrôleur renvoie une erreur 404 (HTTPNotFound)
|
257 |
# lorsque l'utilisateur n'a pas les permissions nécessaires sur
|
258 |
# les données ou qu'aucun événement correspondant n'est trouvé.
|
259 |
self.app.post('/plugin_json', { |
260 |
'idcorrevent': idcorrevent,
|
261 |
'plugin_name': 'details', |
262 |
}, extra_environ={'REMOTE_USER': 'no_access'}, |
263 |
status=404)
|
264 |
|
265 |
def test_details_plugin_LLS_anonymous(self): |
266 |
"""Dialogue des détails avec un LLS et en anonyme."""
|
267 |
idcorrevent = insert_deps(True)[1] |
268 |
|
269 |
# Le contrôleur renvoie une erreur 401 (HTTPUnauthorized)
|
270 |
# lorsque l'utilisateur n'est pas authentifié.
|
271 |
self.app.post('/plugin_json', { |
272 |
'idcorrevent': idcorrevent,
|
273 |
'plugin_name': 'details', |
274 |
}, status=401)
|
275 |
|
276 |
def test_details_plugin_host_anonymous(self): |
277 |
"""Dialogue des détails avec un hôte et en anonyme."""
|
278 |
idcorrevent = insert_deps(False)[1] |
279 |
|
280 |
# Le contrôleur renvoie une erreur 401 (HTTPUnauthorized)
|
281 |
# lorsque l'utilisateur n'est pas authentifié.
|
282 |
self.app.post('/plugin_json', { |
283 |
'idcorrevent': idcorrevent,
|
284 |
'plugin_name': 'details', |
285 |
}, status=401)
|