vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ 0bd9c069
History | View | Annotate | Download (9.19 KB)
1 |
# -*- coding: utf-8 -*-
|
---|---|
2 |
""" Test du plugin listant les services de haut niveau impactés. """
|
3 |
|
4 |
from datetime import datetime |
5 |
import transaction |
6 |
from nose.tools import assert_equal |
7 |
|
8 |
from vigilo.models.session import DBSession |
9 |
from vigilo.models.tables import Permission, DataPermission, StateName, \ |
10 |
SupItemGroup, Host, HighLevelService, \ |
11 |
Event, CorrEvent, ImpactedPath, ImpactedHLS, \ |
12 |
User, UserGroup |
13 |
from vigiboard.tests import TestController |
14 |
|
15 |
def populate_DB(): |
16 |
""" Peuple la base de données. """
|
17 |
|
18 |
# On ajoute un groupe d'hôtes
|
19 |
hostmanagers = SupItemGroup(name=u'managersgroup')
|
20 |
DBSession.add(hostmanagers) |
21 |
DBSession.flush() |
22 |
|
23 |
# On lui octroie les permissions
|
24 |
usergroup = UserGroup.by_group_name(u'users_with_access')
|
25 |
DBSession.add(DataPermission( |
26 |
group=hostmanagers, |
27 |
usergroup=usergroup, |
28 |
access=u'r',
|
29 |
)) |
30 |
DBSession.flush() |
31 |
|
32 |
# On crée un hôte de test.
|
33 |
host = Host( |
34 |
name = u'host',
|
35 |
checkhostcmd = u'halt',
|
36 |
snmpcommunity = u'public',
|
37 |
hosttpl = u'/dev/null',
|
38 |
address = u'192.168.1.1',
|
39 |
snmpport = 42,
|
40 |
weight = 42,
|
41 |
) |
42 |
DBSession.add(host) |
43 |
|
44 |
# On affecte cet hôte au groupe précédemment créé.
|
45 |
hostmanagers.supitems.append(host) |
46 |
DBSession.flush() |
47 |
|
48 |
# On ajoute un évènement causé par cet hôte.
|
49 |
event1 = Event( |
50 |
supitem = host, |
51 |
message = u'foo',
|
52 |
current_state = StateName.statename_to_value(u'WARNING'),
|
53 |
timestamp = datetime.now(), |
54 |
) |
55 |
DBSession.add(event1) |
56 |
DBSession.flush() |
57 |
|
58 |
# On ajoute un évènement corrélé causé par cet évènement 'brut'.
|
59 |
aggregate = CorrEvent( |
60 |
idcause = event1.idevent, |
61 |
timestamp_active = datetime.now(), |
62 |
priority = 1,
|
63 |
status = u'None')
|
64 |
aggregate.events.append(event1) |
65 |
DBSession.add(aggregate) |
66 |
DBSession.flush() |
67 |
|
68 |
transaction.commit() |
69 |
return aggregate
|
70 |
|
71 |
def add_paths(path_number, path_length, idsupitem): |
72 |
"""
|
73 |
Ajoute path_number chemins de services de haut niveau impactés
|
74 |
dans la base de donnée. Leur longeur sera égale à path_length.
|
75 |
La 3ème valeur passée en paramètre est l'id du supitem impactant.
|
76 |
|
77 |
path_number * path_length services de
|
78 |
haut niveau sont créés dans l'opération.
|
79 |
"""
|
80 |
|
81 |
# Création de services de haut niveau dans la BDD.
|
82 |
hls_template = { |
83 |
'message': u'Bar', |
84 |
'warning_threshold': 60, |
85 |
'critical_threshold': 80, |
86 |
'weight': None, |
87 |
'priority': 2, |
88 |
} |
89 |
|
90 |
# Création des chemins de services de haut niveau impactés.
|
91 |
for j in range(path_number): |
92 |
|
93 |
# On crée le chemin en lui-même
|
94 |
path = ImpactedPath(idsupitem = idsupitem) |
95 |
DBSession.add(path) |
96 |
DBSession.flush() |
97 |
|
98 |
# Pour chaque étage du chemin,
|
99 |
for i in range(path_length): |
100 |
# on ajoute un service de haut niveau dans la BDD,
|
101 |
hls = HighLevelService( |
102 |
servicename = u'HLS' + str(j + 1) + str(i + 1), |
103 |
**hls_template) |
104 |
DBSession.add(hls) |
105 |
# et on ajoute un étage au chemin contenant ce service.
|
106 |
DBSession.add( |
107 |
ImpactedHLS( |
108 |
path = path, |
109 |
hls = hls, |
110 |
distance = i + 1,
|
111 |
)) |
112 |
|
113 |
DBSession.flush() |
114 |
transaction.commit() |
115 |
|
116 |
|
117 |
class TestHLSPlugin(TestController): |
118 |
"""
|
119 |
Classe de test du contrôleur listant les services
|
120 |
de haut niveau impactés par un évènement corrélé.
|
121 |
"""
|
122 |
def setUp(self): |
123 |
super(TestHLSPlugin, self).setUp() |
124 |
perm = Permission.by_permission_name(u'vigiboard-access')
|
125 |
|
126 |
user = User( |
127 |
user_name=u'access',
|
128 |
fullname=u'',
|
129 |
email=u'user.has@access',
|
130 |
) |
131 |
usergroup = UserGroup( |
132 |
group_name=u'users_with_access',
|
133 |
) |
134 |
usergroup.permissions.append(perm) |
135 |
user.usergroups.append(usergroup) |
136 |
DBSession.add(user) |
137 |
DBSession.add(usergroup) |
138 |
DBSession.flush() |
139 |
|
140 |
user = User( |
141 |
user_name=u'no_access',
|
142 |
fullname=u'',
|
143 |
email=u'user.has.no@access',
|
144 |
) |
145 |
usergroup = UserGroup( |
146 |
group_name=u'users_without_access',
|
147 |
) |
148 |
usergroup.permissions.append(perm) |
149 |
user.usergroups.append(usergroup) |
150 |
DBSession.add(user) |
151 |
DBSession.add(usergroup) |
152 |
DBSession.flush() |
153 |
|
154 |
self.aggregate = populate_DB()
|
155 |
|
156 |
def test_no_impacted_hls(self): |
157 |
"""
|
158 |
Retour du plugin HLS pour 0 HLS impacté
|
159 |
Teste la valeur de retour du plugin lorsque
|
160 |
aucun service de haut niveau n'est impacté.
|
161 |
"""
|
162 |
|
163 |
# On peuple la base de données avant le test.
|
164 |
DBSession.add(self.aggregate)
|
165 |
add_paths(0, 0, self.aggregate.events[0].idsupitem) |
166 |
DBSession.add(self.aggregate)
|
167 |
|
168 |
### 1er cas : l'utilisateur n'est pas connecté.
|
169 |
# On vérifie que le plugin retourne bien une erreur 401.
|
170 |
resp = self.app.post(
|
171 |
'/get_plugin_value',
|
172 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
173 |
"plugin_name" : "hls"}, |
174 |
status = 401,)
|
175 |
|
176 |
### 2ème cas : l'utilisateur n'a pas les
|
177 |
### droits sur l'hôte ayant causé le correvent.
|
178 |
# On vérifie que le plugin retourne bien une erreur 404.
|
179 |
resp = self.app.post(
|
180 |
'/get_plugin_value',
|
181 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
182 |
"plugin_name" : "hls"}, |
183 |
status = 404,
|
184 |
extra_environ={'REMOTE_USER': 'no_access'}) |
185 |
|
186 |
### 3ème cas : l'utilisateur a cette fois les droits.
|
187 |
resp = self.app.post(
|
188 |
'/get_plugin_value',
|
189 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
190 |
"plugin_name" : "hls"}, |
191 |
extra_environ={'REMOTE_USER': 'access'}) |
192 |
# On vérifie que le plugin ne retourne toujours rien.
|
193 |
assert_equal(resp.json, {"services": []})
|
194 |
|
195 |
def test_1_impacted_hls_path(self): |
196 |
"""
|
197 |
Retour du plugin HLS pour 1 chemin impacté
|
198 |
Teste la valeur de retour du plugin lorsqu'un
|
199 |
chemin de services de haut niveau est impacté.
|
200 |
"""
|
201 |
|
202 |
# On peuple la base de données avant le test.
|
203 |
DBSession.add(self.aggregate)
|
204 |
add_paths(1, 2, self.aggregate.events[0].idsupitem) |
205 |
DBSession.add(self.aggregate)
|
206 |
|
207 |
### 1er cas : l'utilisateur n'est pas connecté.
|
208 |
# On vérifie que le plugin retourne bien une erreur 401.
|
209 |
resp = self.app.post(
|
210 |
'/get_plugin_value',
|
211 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
212 |
"plugin_name" : "hls"}, |
213 |
status = 401,)
|
214 |
|
215 |
### 2ème cas : l'utilisateur n'a pas les droits
|
216 |
### sur l'hôte ayant causé le correvent, on doit
|
217 |
### obtenir une erreur 404 (pas d'événement trouvé
|
218 |
### avec les informations liées à cet utilisateur).
|
219 |
resp = self.app.post(
|
220 |
'/get_plugin_value',
|
221 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
222 |
"plugin_name" : "hls"}, |
223 |
status = 404,
|
224 |
extra_environ={'REMOTE_USER': 'no_access'}) |
225 |
|
226 |
### 3ème cas : l'utilisateur a cette fois les droits.
|
227 |
resp = self.app.post(
|
228 |
'/get_plugin_value',
|
229 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
230 |
"plugin_name" : "hls"}, |
231 |
extra_environ={'REMOTE_USER': 'access'}) |
232 |
# On vérifie que le plugin retourne bien les 2 HLS impactés.
|
233 |
assert_equal(resp.json, {"services": ['HLS12']}) |
234 |
|
235 |
def test_2_impacted_hls_path(self): |
236 |
"""
|
237 |
Retour du plugin HLS pour 2 chemins impactés
|
238 |
Teste la valeur de retour du plugin lorsque deux
|
239 |
chemins de services de haut niveau sont impactés.
|
240 |
"""
|
241 |
|
242 |
# On peuple la base de données avant le test.
|
243 |
DBSession.add(self.aggregate)
|
244 |
add_paths(2, 2, self.aggregate.events[0].idsupitem) |
245 |
DBSession.add(self.aggregate)
|
246 |
|
247 |
### 1er cas : l'utilisateur n'est pas connecté.
|
248 |
# On vérifie que le plugin retourne bien une erreur 401.
|
249 |
resp = self.app.post(
|
250 |
'/get_plugin_value',
|
251 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
252 |
"plugin_name" : "hls"}, |
253 |
status = 401,)
|
254 |
|
255 |
### 2ème cas : l'utilisateur n'a pas les
|
256 |
### droits sur l'hôte ayant causé le correvent.
|
257 |
resp = self.app.post(
|
258 |
'/get_plugin_value',
|
259 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
260 |
"plugin_name" : "hls"}, |
261 |
status = 404,
|
262 |
extra_environ={'REMOTE_USER': 'no_access'}) |
263 |
|
264 |
### 3ème cas : l'utilisateur a cette fois les droits.
|
265 |
resp = self.app.post(
|
266 |
'/get_plugin_value',
|
267 |
{"idcorrevent" : str(self.aggregate.idcorrevent), |
268 |
"plugin_name" : "hls"}, |
269 |
extra_environ={'REMOTE_USER': 'access'}) |
270 |
# On vérifie que le plugin retourne bien les 4 HLS impactés.
|
271 |
assert_equal(resp.json, {"services": ['HLS12', 'HLS22']}) |
272 |
|