Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_raw_events_table.py @ 6f6efdcd

History | View | Annotate | Download (10.5 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Event, EventHistory, CorrEvent, User, \
13
                            Permission, StateName, Host, UserGroup, \
14
                            LowLevelService, SupItemGroup, DataPermission
15
from vigilo.models.demo.functions import *
16
from vigiboard.tests import TestController
17

    
18
def populate_accounts():
19
    perm = Permission.by_permission_name(u'vigiboard-access')
20

    
21
    user = User(
22
        user_name=u'access',
23
        fullname=u'',
24
        email=u'user.has@access',
25
    )
26
    usergroup = UserGroup(
27
        group_name=u'users_with_access',
28
    )
29
    usergroup.permissions.append(perm)
30
    user.usergroups.append(usergroup)
31
    DBSession.add(user)
32
    DBSession.add(usergroup)
33
    DBSession.flush()
34

    
35
    user = User(
36
        user_name=u'limited_access',
37
        fullname=u'',
38
        email=u'user.has.no@access',
39
    )
40
    usergroup = UserGroup(
41
        group_name=u'users_with_limited_access',
42
    )
43
    usergroup.permissions.append(perm)
44
    user.usergroups.append(usergroup)
45
    DBSession.add(user)
46
    DBSession.add(usergroup)
47
    DBSession.flush()
48
    transaction.commit()
49

    
50
def populate_DB(caused_by_service):
51
    """ Peuple la base de données. """
52
    # On ajoute un groupe d'hôtes et un groupe de services.
53
    supitemmanagers = add_supitemgroup('managersgroup')
54

    
55
    usergroup = UserGroup.by_group_name(u'users_with_access')
56
    DBSession.add(DataPermission(
57
        group=supitemmanagers,
58
        usergroup=usergroup,
59
        access=u'r',
60
    ))
61
    DBSession.flush()
62

    
63
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
64
    managerhost = Host(
65
        name = u'managerhost',
66
        checkhostcmd = u'halt',
67
        snmpcommunity = u'public',
68
        hosttpl = u'/dev/null',
69
        address = u'192.168.1.1',
70
        snmpport = 42,
71
        weight = 42,
72
    )
73
    DBSession.add(managerhost)
74
    supitemmanagers.supitems.append(managerhost)
75
    DBSession.flush()
76

    
77
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
78
    managerservice = LowLevelService(
79
        host = managerhost,
80
        servicename = u'managerservice',
81
        command = u'halt',
82
        weight = 42,
83
    )
84
    DBSession.add(managerservice)
85
    supitemmanagers.supitems.append(managerservice)
86
    DBSession.flush()
87

    
88
    if caused_by_service:
89
        supitem = managerservice
90
    else:
91
        supitem = managerhost
92

    
93
    # Ajout d'un événement
94
    event = Event(
95
        supitem = supitem,
96
        message = u'foo',
97
        current_state = StateName.statename_to_value(u"WARNING"),
98
        timestamp = datetime.now(),
99
    )
100
    DBSession.add(event)
101
    DBSession.flush()
102

    
103
    # Ajout des historiques
104
    DBSession.add(EventHistory(
105
        type_action=u'Nagios update state',
106
        idevent=event.idevent,
107
        timestamp=datetime.now()))
108
    DBSession.add(EventHistory(
109
        type_action=u'Acknowlegement change state',
110
        idevent=event.idevent,
111
        timestamp=datetime.now()))
112
    DBSession.flush()
113

    
114
    # Ajout d'un événement corrélé
115
    aggregate = CorrEvent(
116
        idcause = event.idevent,
117
        timestamp_active = datetime.now(),
118
        priority = 1,
119
        status = u"None")
120
    aggregate.events.append(event)
121
    DBSession.add(aggregate)
122
    DBSession.flush()
123
    return aggregate.idcorrevent
124

    
125
def add_masked_event(idcorrevent):
126
    """Ajoute un événement masqué à un événement corrélé."""
127
    transaction.begin()
128
    hostmanagers = SupItemGroup.by_group_name(u'managersgroup')
129
    nb_hosts = DBSession.query(Host).count()
130

    
131
    masked_host = Host(
132
        name = u'masked host #%d' % nb_hosts,
133
        checkhostcmd = u'halt',
134
        snmpcommunity = u'public',
135
        hosttpl = u'/dev/null',
136
        address = u'192.168.1.%d' % nb_hosts,
137
        snmpport = 42,
138
        weight = 42,
139
    )
140
    DBSession.add(masked_host)
141
    hostmanagers.supitems.append(masked_host)
142
    DBSession.flush()
143

    
144
    event = Event(
145
        supitem = masked_host,
146
        message = u'masked event #%d' % nb_hosts,
147
        current_state = StateName.statename_to_value(u"CRITICAL"),
148
        timestamp = datetime.now(),
149
    )
150
    DBSession.add(event)
151
    DBSession.flush()
152

    
153
    aggregate = DBSession.query(CorrEvent).filter(
154
        CorrEvent.idcorrevent == idcorrevent).one()
155
    aggregate.events.append(event)
156
    DBSession.add(aggregate)
157
    DBSession.flush()
158
    transaction.commit()
159

    
160

    
161
class TestRawEventsTableAnonymousLLS(TestController):
162
    """
163
    Teste l'affichage des événements bruts masqués par un agrégat.
164
    Dans ces tests, l'utilisateur n'est pas authentifié et l'agrégat
165
    a été causé par un LLS.
166
    """
167
    test_service = True
168

    
169
    def setUp(self):
170
        super(TestRawEventsTableAnonymousLLS, self).setUp()
171
        populate_accounts()
172

    
173
    def test_table(self):
174
        """Événements masqués d'un agrégat sur un LLS en anonyme."""
175
        # On peuple la BDD avec un hôte, un service de bas niveau,
176
        # et un groupe d'hôtes et de services associés à ces items.
177
        idcorrevent = populate_DB(self.test_service)
178
        transaction.commit()
179

    
180
        # L'utilisateur n'est pas authentifié.
181
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
182
        # demandant à l'utilisateur de s'authentifier.
183
        response = self.app.get(
184
            '/masked_events/%d' % idcorrevent,
185
            status = 401)
186

    
187
class TestRawEventsTableAnonymousHost(TestRawEventsTableAnonymousLLS):
188
    """Idem que TestRawEventsTableAnonymousLLS mais avec un hôte."""
189
    test_service = False
190

    
191
    def test_table(self):
192
        """Événements masqués d'un agrégat sur un hôte en anonyme."""
193
        super(TestRawEventsTableAnonymousHost, self).test_table()
194

    
195

    
196
class TestRawEventsTableWithoutPermsLLS(TestController):
197
    """
198
    Teste l'affichage des événements bruts masqués par un agrégat.
199
    Dans ces tests, l'utilisateur n'a pas les bonnes permissions
200
    et l'agrégat a été causé par un LLS.
201
    """
202
    test_service = True
203

    
204
    def setUp(self):
205
        super(TestRawEventsTableWithoutPermsLLS, self).setUp()
206
        populate_accounts()
207

    
208
    def test_table(self):
209
        """Événements masqués d'un agrégat sur un LLS sans permissions."""
210
        # On peuple la BDD avec un hôte, un service de bas niveau,
211
        # et un groupe d'hôtes et de services associés à ces items.
212
        idcorrevent = populate_DB(self.test_service)
213
        transaction.commit()
214

    
215
        environ = {'REMOTE_USER': 'limited_access'}
216

    
217
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
218
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
219
        # n'a pas accès aux informations concernant cet évènement.
220
        response = self.app.get(
221
            '/masked_events/%d' % idcorrevent,
222
            status = 302,
223
            extra_environ = environ)
224
        response = response.follow(status = 200, extra_environ = environ)
225
        assert_true(len(response.lxml.xpath(
226
            '//div[@id="flash"]/div[@class="error"]')))
227

    
228
        # L'erreur 302 peut venir de l'absence de permissions
229
        # ou bien simplement de l'absence d'événements masqués.
230
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
231
        # on ajoute un événement masqué. On doit à nouveau obtenir
232
        # une erreur 302.
233
        add_masked_event(idcorrevent)
234
        response = self.app.get(
235
            '/masked_events/%d' % idcorrevent,
236
            status = 302,
237
            extra_environ = environ)
238
        response = response.follow(status = 200, extra_environ = environ)
239
        assert_true(len(response.lxml.xpath(
240
            '//div[@id="flash"]/div[@class="error"]')))
241

    
242
class TestRawEventsTableWithoutPermsHost(TestRawEventsTableWithoutPermsLLS):
243
    """Idem que TestRawEventsTableWithoutPermsLLS mais avec un hôte."""
244
    test_service = False
245

    
246
    def test_table(self):
247
        """Événements masqués d'un agrégat sur un hôte sans permissions."""
248
        super(TestRawEventsTableWithoutPermsHost, self).test_table()
249

    
250
class TestRawEventsTableWithPermsLLS(TestController):
251
    """
252
    Teste l'affichage d'une table des événements bruts
253
    rattachés à un agrégat, hormis la cause de cet agrégat.
254
    """
255
    test_service = True
256

    
257
    def setUp(self):
258
        super(TestRawEventsTableWithPermsLLS, self).setUp()
259
        populate_accounts()
260

    
261
    def test_table(self):
262
        """Événements masqués d'un agrégat sur un LLS avec permissions."""
263
        # On peuple la BDD avec un hôte, un service de bas niveau,
264
        # et un groupe d'hôtes et de services associés à ces items.
265
        idcorrevent = populate_DB(True)
266
        transaction.commit()
267

    
268
        environ = {'REMOTE_USER': 'access'}
269

    
270
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
271
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
272
        # n'a pas accès aux informations concernant cet évènement.
273
        response = self.app.get(
274
            '/masked_events/%d' % idcorrevent,
275
            status = 302,
276
            extra_environ = environ)
277
        response = response.follow(status = 200, extra_environ = environ)
278
        assert_true(len(response.lxml.xpath(
279
            '//div[@id="flash"]/div[@class="error"]')))
280

    
281
        # L'erreur 302 peut venir de l'absence de permissions
282
        # ou bien simplement de l'absence d'événements masqués.
283
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
284
        # on ajoute un événement masqué. On doit avoir accès à exactement
285
        # 1 événement masqué à présent.
286
        add_masked_event(idcorrevent)
287
        response = self.app.get(
288
            '/masked_events/%d' % idcorrevent,
289
            status = 200,
290
            extra_environ = environ)
291

    
292
        # On s'attend à trouver exactement 1 événement masqué.
293
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
294
        # de rechercher directement une valeur dans une liste. Elle devrait
295
        # néanmoins suffire pour les besoins des tests.
296
        rows = response.lxml.xpath(
297
            '//table[contains(@class, "vigitable")]/tbody/tr')
298
        assert_equal(len(rows), 1)
299

    
300
class TestRawEventsTableWithPermsHost(TestRawEventsTableWithPermsLLS):
301
    """Idem que TestRawEventsTableWithPermsLLS mais avec un hôte."""
302
    test_service = False
303

    
304
    def test_table(self):
305
        """Événements masqués d'un agrégat sur un hôte avec permissions."""
306
        super(TestRawEventsTableWithPermsHost, self).test_table()