vigiboard / vigiboard / tests / functional / test_host_vigiboardrequest.py @ 5dbfa80d
History | View | Annotate | Download (7.21 KB)
1 |
# -*- coding: utf-8 -*-
|
---|---|
2 |
# vim:set expandtab tabstop=4 shiftwidth=4:
|
3 |
"""
|
4 |
Test de la classe Vigiboard Request pour des requêtes concernant les hôtes
|
5 |
"""
|
6 |
|
7 |
from nose.tools import assert_true |
8 |
from datetime import datetime |
9 |
import tg |
10 |
import transaction |
11 |
|
12 |
from vigilo.models.configure import DBSession |
13 |
from vigilo.models import Event, EventHistory, CorrEvent, \ |
14 |
Permission, User, StateName, \ |
15 |
Host, HostGroup |
16 |
from vigiboard.tests import TestController |
17 |
from vigiboard.controllers.vigiboardrequest import VigiboardRequest |
18 |
from vigiboard.controllers.vigiboard_plugin.tests import MonPlugin |
19 |
|
20 |
|
21 |
class TestHostVigiboardRequest(TestController): |
22 |
""" Préparation de la base de données en vue des tests. """
|
23 |
|
24 |
def setUp(self): |
25 |
super(TestHostVigiboardRequest, self).setUp() |
26 |
|
27 |
# On peuple la base de données.
|
28 |
|
29 |
# On ajoute les noms d'états.
|
30 |
DBSession.add(StateName(statename=u'OK', order=0)) |
31 |
DBSession.add(StateName(statename=u'WARNING', order=2)) |
32 |
DBSession.flush() |
33 |
transaction.commit() |
34 |
|
35 |
# On ajoute les groupes et leurs dépendances
|
36 |
self.hosteditors = HostGroup(name=u'editorsgroup') |
37 |
DBSession.add(self.hosteditors)
|
38 |
self.hostmanagers = HostGroup(name=u'managersgroup', |
39 |
parent=self.hosteditors)
|
40 |
DBSession.add(self.hostmanagers)
|
41 |
DBSession.flush() |
42 |
|
43 |
manage_perm = Permission.by_permission_name(u'manage')
|
44 |
edit_perm = Permission.by_permission_name(u'edit')
|
45 |
|
46 |
self.hostmanagers.permissions.append(manage_perm)
|
47 |
self.hosteditors.permissions.append(edit_perm)
|
48 |
DBSession.flush() |
49 |
|
50 |
# Création des hôtes de test.
|
51 |
host_template = { |
52 |
'checkhostcmd': u'halt', |
53 |
'snmpcommunity': u'public', |
54 |
'hosttpl': u'/dev/null', |
55 |
'mainip': u'192.168.1.1', |
56 |
'snmpport': 42, |
57 |
'weight': 42, |
58 |
} |
59 |
|
60 |
managerhost = Host(name=u'managerhost', **host_template)
|
61 |
editorhost = Host(name=u'editorhost', **host_template)
|
62 |
DBSession.add(managerhost) |
63 |
DBSession.add(editorhost) |
64 |
|
65 |
# Affectation des hôtes aux groupes.
|
66 |
self.hosteditors.hosts.append(editorhost)
|
67 |
self.hostmanagers.hosts.append(managerhost)
|
68 |
DBSession.flush() |
69 |
|
70 |
# Ajout des événements eux-mêmes
|
71 |
event_template = { |
72 |
'message': u'foo', |
73 |
'current_state': StateName.statename_to_value(u'WARNING'), |
74 |
} |
75 |
event1 = Event(supitem=managerhost, **event_template) |
76 |
event2 = Event(supitem=editorhost, **event_template) |
77 |
DBSession.add(event1) |
78 |
DBSession.add(event2) |
79 |
DBSession.flush() |
80 |
|
81 |
# Ajout des historiques
|
82 |
DBSession.add(EventHistory(type_action=u'Nagios update state',
|
83 |
idevent=event1.idevent, timestamp=datetime.now())) |
84 |
DBSession.add(EventHistory(type_action=u'Acknowlegement change state',
|
85 |
idevent=event1.idevent, timestamp=datetime.now())) |
86 |
DBSession.add(EventHistory(type_action=u'Nagios update state',
|
87 |
idevent=event2.idevent, timestamp=datetime.now())) |
88 |
DBSession.add(EventHistory(type_action=u'Acknowlegement change state',
|
89 |
idevent=event2.idevent, timestamp=datetime.now())) |
90 |
DBSession.flush() |
91 |
|
92 |
# Ajout des événements corrélés
|
93 |
aggregate_template = { |
94 |
'timestamp_active': datetime.now(),
|
95 |
'priority': 1, |
96 |
'status': u'None', |
97 |
} |
98 |
self.aggregate1 = CorrEvent(
|
99 |
idcause=event1.idevent, **aggregate_template) |
100 |
self.aggregate2 = CorrEvent(
|
101 |
idcause=event2.idevent, **aggregate_template) |
102 |
|
103 |
self.aggregate1.events.append(event1)
|
104 |
self.aggregate2.events.append(event2)
|
105 |
DBSession.add(self.aggregate1)
|
106 |
DBSession.add(self.aggregate2)
|
107 |
DBSession.flush() |
108 |
transaction.commit() |
109 |
|
110 |
def tearDown(self): |
111 |
""" Nettoyage de la base de données après les tests. """
|
112 |
super(TestHostVigiboardRequest, self).tearDown() |
113 |
|
114 |
|
115 |
def test_request_creation(self): |
116 |
"""Génération d'une requête avec plugin et permissions."""
|
117 |
|
118 |
# On indique qui on est et on envoie une requête sur
|
119 |
# l'index pour obtenir toutes les variables de sessions.
|
120 |
environ = {'REMOTE_USER': 'editor'} |
121 |
response = self.app.get('/', extra_environ=environ) |
122 |
tg.request = response.request |
123 |
|
124 |
# Derrière, VigiboardRequest doit charger le plugin de tests tout seul
|
125 |
tg.config['vigiboard_plugins'] = [['tests', 'MonPlugin']] |
126 |
vigi_req = VigiboardRequest(User.by_user_name(u'editor'))
|
127 |
vigi_req.add_table( |
128 |
CorrEvent, |
129 |
vigi_req.items.c.hostname, |
130 |
vigi_req.items.c.servicename, |
131 |
) |
132 |
vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent)) |
133 |
vigi_req.add_join((vigi_req.items, |
134 |
Event.idsupitem == vigi_req.items.c.idsupitem)) |
135 |
|
136 |
# On vérifie que le nombre d'événements corrélés
|
137 |
# trouvés par la requête est bien égal à 1.
|
138 |
num_rows = vigi_req.num_rows() |
139 |
assert_true(num_rows == 1,
|
140 |
msg = "One history should be available for " +
|
141 |
"the user 'editor' but there are %d" % num_rows)
|
142 |
|
143 |
vigi_req.format_events(0, 10) |
144 |
vigi_req.format_history() |
145 |
assert_true(len(vigi_req.events) == 1 + 1, |
146 |
msg = "One history should be available for the user " +
|
147 |
"'editor' but there are %d" % (len(vigi_req.events) - 1)) |
148 |
|
149 |
# # On s'assure que le plugin fonctionne correctement
|
150 |
# assert_true(vigi_req.events[1][6][0][0] != 'Error',
|
151 |
# msg = "Probleme d'execution des plugins ou de " +
|
152 |
# "formatage des evenements")
|
153 |
|
154 |
|
155 |
# On recommence les tests précédents avec l'utilisateur
|
156 |
# manager (qui dispose de plus de droits).
|
157 |
environ = {'REMOTE_USER': 'manager'} |
158 |
response = self.app.get('/', extra_environ=environ) |
159 |
tg.request = response.request |
160 |
|
161 |
vigi_req = VigiboardRequest(User.by_user_name(u'manager'))
|
162 |
vigi_req.add_plugin(MonPlugin) |
163 |
vigi_req.add_table( |
164 |
CorrEvent, |
165 |
vigi_req.items.c.hostname, |
166 |
vigi_req.items.c.servicename, |
167 |
) |
168 |
vigi_req.add_join((Event, CorrEvent.idcause == Event.idevent)) |
169 |
vigi_req.add_join((vigi_req.items, |
170 |
Event.idsupitem == vigi_req.items.c.idsupitem)) |
171 |
|
172 |
# On vérifie que le nombre d'événements corrélés
|
173 |
# trouvés par la requête est bien égal à 2.
|
174 |
num_rows = vigi_req.num_rows() |
175 |
assert_true(num_rows == 2,
|
176 |
msg = "2 histories should be available for " +
|
177 |
"the user 'manager' but there are %d" % num_rows)
|
178 |
|
179 |
vigi_req.format_events(0, 10) |
180 |
vigi_req.format_history() |
181 |
assert_true(len(vigi_req.events) == 2 + 1, |
182 |
msg = "2 histories should be available for the user " +
|
183 |
"'manager' but there are %d" % (len(vigi_req.events) - 1)) |
184 |
|
185 |
# On s'assure que le plugin fonctionne correctement
|
186 |
# assert_true(vigi_req.events[1][6][0][0] != 'Error',
|
187 |
# msg = "Probleme d'execution des plugins ou de " +
|
188 |
# "formatage des evenements")
|
189 |
|