vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ 2bcebf54
History | View | Annotate | Download (7.32 KB)
1 |
# -*- coding: utf-8 -*-
|
---|---|
2 |
# Copyright (C) 2006-2016 CS-SI
|
3 |
# License: GNU GPL v2 <http://www.gnu.org/licenses/gpl-2.0.html>
|
4 |
|
5 |
""" Test du plugin listant les services de haut niveau impactés. """
|
6 |
|
7 |
import transaction |
8 |
from nose.tools import assert_equal |
9 |
|
10 |
from vigilo.models.session import DBSession |
11 |
from vigilo.models.demo import functions |
12 |
from vigilo.models.tables import Permission, DataPermission, SupItemGroup, \ |
13 |
ImpactedPath, ImpactedHLS, User, UserGroup |
14 |
from vigiboard.tests import TestController |
15 |
|
16 |
def populate_DB(): |
17 |
""" Peuple la base de données. """
|
18 |
|
19 |
# On ajoute un groupe d'hôtes
|
20 |
hostmanagers = SupItemGroup(name=u'managersgroup', parent=None) |
21 |
DBSession.add(hostmanagers) |
22 |
DBSession.flush() |
23 |
|
24 |
# On lui octroie les permissions
|
25 |
usergroup = UserGroup.by_group_name(u'users_with_access')
|
26 |
DBSession.add(DataPermission( |
27 |
group=hostmanagers, |
28 |
usergroup=usergroup, |
29 |
access=u'r',
|
30 |
)) |
31 |
DBSession.flush() |
32 |
|
33 |
# On crée un hôte de test.
|
34 |
host = functions.add_host(u'host')
|
35 |
|
36 |
# On affecte cet hôte au groupe précédemment créé.
|
37 |
hostmanagers.supitems.append(host) |
38 |
DBSession.flush() |
39 |
|
40 |
# On ajoute un évènement brut et un événement corrélé causé par cet hôte.
|
41 |
event1 = functions.add_event(host, u'WARNING', u'foo') |
42 |
aggregate = functions.add_correvent([event1]) |
43 |
|
44 |
transaction.commit() |
45 |
return aggregate
|
46 |
|
47 |
def add_paths(path_number, path_length, idsupitem): |
48 |
"""
|
49 |
Ajoute path_number chemins de services de haut niveau impactés
|
50 |
dans la base de donnée. Leur longeur sera égale à path_length.
|
51 |
La 3ème valeur passée en paramètre est l'id du supitem impactant.
|
52 |
|
53 |
path_number * path_length services de
|
54 |
haut niveau sont créés dans l'opération.
|
55 |
"""
|
56 |
|
57 |
# Création des chemins de services de haut niveau impactés.
|
58 |
for j in range(path_number): |
59 |
|
60 |
# On crée le chemin en lui-même
|
61 |
path = ImpactedPath(idsupitem = idsupitem) |
62 |
DBSession.add(path) |
63 |
DBSession.flush() |
64 |
|
65 |
# Pour chaque étage du chemin,
|
66 |
for i in range(path_length): |
67 |
# on ajoute un service de haut niveau dans la BDD...
|
68 |
hls = functions.add_highlevelservice( |
69 |
u'HLS' + str(j + 1) + str(i + 1), None) |
70 |
|
71 |
# ...et on ajoute un étage au chemin contenant ce service.
|
72 |
DBSession.add( |
73 |
ImpactedHLS( |
74 |
path = path, |
75 |
hls = hls, |
76 |
distance = i + 1,
|
77 |
)) |
78 |
|
79 |
DBSession.flush() |
80 |
transaction.commit() |
81 |
|
82 |
|
83 |
class TestHLSPlugin(TestController): |
84 |
"""
|
85 |
Classe de test du contrôleur listant les services
|
86 |
de haut niveau impactés par un évènement corrélé.
|
87 |
"""
|
88 |
def setUp(self): |
89 |
super(TestHLSPlugin, self).setUp() |
90 |
perm = Permission.by_permission_name(u'vigiboard-access')
|
91 |
|
92 |
user = User( |
93 |
user_name=u'access',
|
94 |
fullname=u'',
|
95 |
email=u'user.has@access',
|
96 |
) |
97 |
usergroup = UserGroup(group_name=u'users_with_access')
|
98 |
usergroup.permissions.append(perm) |
99 |
user.usergroups.append(usergroup) |
100 |
DBSession.add(user) |
101 |
DBSession.add(usergroup) |
102 |
DBSession.flush() |
103 |
|
104 |
self.aggregate = populate_DB()
|
105 |
|
106 |
def test_no_impacted_hls(self): |
107 |
"""
|
108 |
Données affichées par le plugin HLS pour 0 HLS impacté
|
109 |
Teste les données affichées par le plugin lorsque
|
110 |
aucun service de haut niveau n'est impacté.
|
111 |
"""
|
112 |
|
113 |
# On peuple la base de données avant le test.
|
114 |
DBSession.add(self.aggregate)
|
115 |
add_paths(0, 0, self.aggregate.events[0].idsupitem) |
116 |
DBSession.add(self.aggregate)
|
117 |
|
118 |
# On accède à la page principale de VigiBoard
|
119 |
resp = self.app.post(
|
120 |
'/', extra_environ={'REMOTE_USER': 'access'}) |
121 |
|
122 |
# On s'assure que la colonne des HLS
|
123 |
# impactés est vide pour notre évènement.
|
124 |
plugin_data = resp.lxml.xpath( |
125 |
'//table[contains(concat(" ", @class, " "), " vigitable ")]'
|
126 |
'/tbody/tr/td[@class="plugin_hls"]/text()')
|
127 |
assert_equal(plugin_data[0].strip(), "") |
128 |
|
129 |
def test_1_impacted_hls_path(self): |
130 |
"""
|
131 |
Données affichées par le plugin HLS pour 1 chemin impacté
|
132 |
Teste les données affichées par le plugin lorsque
|
133 |
1 chemin de services de haut niveau est impacté.
|
134 |
"""
|
135 |
|
136 |
# On peuple la base de données avant le test.
|
137 |
DBSession.add(self.aggregate)
|
138 |
add_paths(1, 2, self.aggregate.events[0].idsupitem) |
139 |
DBSession.add(self.aggregate)
|
140 |
|
141 |
# On accède à la page principale de VigiBoard
|
142 |
resp = self.app.post(
|
143 |
'/', extra_environ={'REMOTE_USER': 'access'}) |
144 |
|
145 |
# On s'assure que la colonne des HLS impactés contient
|
146 |
# bien le nom de notre HLS de plus haut niveau impacté.
|
147 |
plugin_data = resp.lxml.xpath( |
148 |
'//table[contains(concat(" ", @class, " "), " vigitable ")]'
|
149 |
'/tbody/tr/td[@class="plugin_hls"]/text()')
|
150 |
assert_equal(plugin_data[0].strip(), "HLS12") |
151 |
|
152 |
def test_2_impacted_hls_path(self): |
153 |
"""
|
154 |
Données affichées par le plugin HLS pour 2 chemins impactés
|
155 |
Teste les données affichées par le plugin lorsque
|
156 |
2 chemins de services de haut niveau sont impactés.
|
157 |
"""
|
158 |
|
159 |
# On peuple la base de données avant le test.
|
160 |
DBSession.add(self.aggregate)
|
161 |
add_paths(2, 2, self.aggregate.events[0].idsupitem) |
162 |
DBSession.add(self.aggregate)
|
163 |
|
164 |
# On accède à la page principale de VigiBoard
|
165 |
resp = self.app.post(
|
166 |
'/', extra_environ={'REMOTE_USER': 'access'}) |
167 |
|
168 |
# On s'assure que la colonne des HLS contient bien
|
169 |
# le nombre de HLS de plus haut niveau impactés,
|
170 |
plugin_data = resp.lxml.xpath( |
171 |
'//table[contains(concat(" ", @class, " "), " vigitable ")]'
|
172 |
'/tbody/tr/td[@class="plugin_hls"]/a/text()')
|
173 |
assert_equal(plugin_data[0].strip(), "2") |
174 |
|
175 |
def test_same_hls_impacted_twice(self): |
176 |
"""
|
177 |
Pas de doublons dans les HLS impactés.
|
178 |
Ticket #732.
|
179 |
"""
|
180 |
|
181 |
# On peuple la base de données avant le test.
|
182 |
DBSession.add(self.aggregate)
|
183 |
hls = functions.add_highlevelservice(u'HLS', None, u'Bar') |
184 |
path1 = ImpactedPath(idsupitem = self.aggregate.events[0].idsupitem) |
185 |
DBSession.add(path1) |
186 |
path2 = ImpactedPath(idsupitem = self.aggregate.events[0].idsupitem) |
187 |
DBSession.add(path2) |
188 |
DBSession.flush() |
189 |
DBSession.add( |
190 |
ImpactedHLS( |
191 |
path = path1, |
192 |
hls = hls, |
193 |
distance = 1,
|
194 |
) |
195 |
) |
196 |
DBSession.add( |
197 |
ImpactedHLS( |
198 |
path = path2, |
199 |
hls = hls, |
200 |
distance = 2,
|
201 |
) |
202 |
) |
203 |
DBSession.flush() |
204 |
transaction.commit() |
205 |
DBSession.add(self.aggregate)
|
206 |
|
207 |
# On accède à la page principale de VigiBoard
|
208 |
resp = self.app.post(
|
209 |
'/', extra_environ={'REMOTE_USER': 'access'}) |
210 |
|
211 |
# On s'assure que la colonne des HLS contient bien
|
212 |
# le nom de notre HLS de plus haut niveau impacté.
|
213 |
plugin_data = resp.lxml.xpath( |
214 |
'//table[contains(concat(" ", @class, " "), " vigitable ")]'
|
215 |
'/tbody/tr/td[@class="plugin_hls"]/text()')
|
216 |
assert_equal(plugin_data[0].strip(), "HLS") |