Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_raw_events_table.py @ 0bd9c069

History | View | Annotate | Download (10.5 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Event, EventHistory, CorrEvent, User, \
13
                            Permission, StateName, Host, UserGroup, \
14
                            LowLevelService, SupItemGroup, DataPermission
15
from vigiboard.tests import TestController
16

    
17
def populate_accounts():
18
    perm = Permission.by_permission_name(u'vigiboard-access')
19

    
20
    user = User(
21
        user_name=u'access',
22
        fullname=u'',
23
        email=u'user.has@access',
24
    )
25
    usergroup = UserGroup(
26
        group_name=u'users_with_access',
27
    )
28
    usergroup.permissions.append(perm)
29
    user.usergroups.append(usergroup)
30
    DBSession.add(user)
31
    DBSession.add(usergroup)
32
    DBSession.flush()
33

    
34
    user = User(
35
        user_name=u'limited_access',
36
        fullname=u'',
37
        email=u'user.has.no@access',
38
    )
39
    usergroup = UserGroup(
40
        group_name=u'users_with_limited_access',
41
    )
42
    usergroup.permissions.append(perm)
43
    user.usergroups.append(usergroup)
44
    DBSession.add(user)
45
    DBSession.add(usergroup)
46
    DBSession.flush()
47
    transaction.commit()
48

    
49
def populate_DB(caused_by_service):
50
    """ Peuple la base de données. """
51
    # On ajoute un groupe d'hôtes et un groupe de services.
52
    supitemmanagers = SupItemGroup(name = u'managersgroup')
53
    DBSession.add(supitemmanagers)
54
    DBSession.flush()
55

    
56
    usergroup = UserGroup.by_group_name(u'users_with_access')
57
    DBSession.add(DataPermission(
58
        group=supitemmanagers,
59
        usergroup=usergroup,
60
        access=u'r',
61
    ))
62
    DBSession.flush()
63

    
64
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
65
    managerhost = Host(
66
        name = u'managerhost',
67
        checkhostcmd = u'halt',
68
        snmpcommunity = u'public',
69
        hosttpl = u'/dev/null',
70
        address = u'192.168.1.1',
71
        snmpport = 42,
72
        weight = 42,
73
    )
74
    DBSession.add(managerhost)
75
    supitemmanagers.supitems.append(managerhost)
76
    DBSession.flush()
77

    
78
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
79
    managerservice = LowLevelService(
80
        host = managerhost,
81
        servicename = u'managerservice',
82
        command = u'halt',
83
        weight = 42,
84
    )
85
    DBSession.add(managerservice)
86
    supitemmanagers.supitems.append(managerservice)
87
    DBSession.flush()
88

    
89
    if caused_by_service:
90
        supitem = managerservice
91
    else:
92
        supitem = managerhost
93

    
94
    # Ajout d'un événement
95
    event = Event(
96
        supitem = supitem,
97
        message = u'foo',
98
        current_state = StateName.statename_to_value(u"WARNING"),
99
        timestamp = datetime.now(),
100
    )
101
    DBSession.add(event)
102
    DBSession.flush()
103

    
104
    # Ajout des historiques
105
    DBSession.add(EventHistory(
106
        type_action=u'Nagios update state',
107
        idevent=event.idevent,
108
        timestamp=datetime.now()))
109
    DBSession.add(EventHistory(
110
        type_action=u'Acknowlegement change state',
111
        idevent=event.idevent,
112
        timestamp=datetime.now()))
113
    DBSession.flush()
114

    
115
    # Ajout d'un événement corrélé
116
    aggregate = CorrEvent(
117
        idcause = event.idevent,
118
        timestamp_active = datetime.now(),
119
        priority = 1,
120
        status = u"None")
121
    aggregate.events.append(event)
122
    DBSession.add(aggregate)
123
    DBSession.flush()
124
    return aggregate.idcorrevent
125

    
126
def add_masked_event(idcorrevent):
127
    """Ajoute un événement masqué à un événement corrélé."""
128
    transaction.begin()
129
    hostmanagers = SupItemGroup.by_group_name(u'managersgroup')
130
    nb_hosts = DBSession.query(Host).count()
131

    
132
    masked_host = Host(
133
        name = u'masked host #%d' % nb_hosts,
134
        checkhostcmd = u'halt',
135
        snmpcommunity = u'public',
136
        hosttpl = u'/dev/null',
137
        address = u'192.168.1.%d' % nb_hosts,
138
        snmpport = 42,
139
        weight = 42,
140
    )
141
    DBSession.add(masked_host)
142
    hostmanagers.supitems.append(masked_host)
143
    DBSession.flush()
144

    
145
    event = Event(
146
        supitem = masked_host,
147
        message = u'masked event #%d' % nb_hosts,
148
        current_state = StateName.statename_to_value(u"CRITICAL"),
149
        timestamp = datetime.now(),
150
    )
151
    DBSession.add(event)
152
    DBSession.flush()
153

    
154
    aggregate = DBSession.query(CorrEvent).filter(
155
        CorrEvent.idcorrevent == idcorrevent).one()
156
    aggregate.events.append(event)
157
    DBSession.add(aggregate)
158
    DBSession.flush()
159
    transaction.commit()
160

    
161

    
162
class TestRawEventsTableAnonymousLLS(TestController):
163
    """
164
    Teste l'affichage des événements bruts masqués par un agrégat.
165
    Dans ces tests, l'utilisateur n'est pas authentifié et l'agrégat
166
    a été causé par un LLS.
167
    """
168
    test_service = True
169

    
170
    def setUp(self):
171
        super(TestRawEventsTableAnonymousLLS, self).setUp()
172
        populate_accounts()
173

    
174
    def test_table(self):
175
        """Événements masqués d'un agrégat sur un LLS en anonyme."""
176
        # On peuple la BDD avec un hôte, un service de bas niveau,
177
        # et un groupe d'hôtes et de services associés à ces items.
178
        idcorrevent = populate_DB(self.test_service)
179
        transaction.commit()
180

    
181
        # L'utilisateur n'est pas authentifié.
182
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
183
        # demandant à l'utilisateur de s'authentifier.
184
        response = self.app.get(
185
            '/masked_events/%d' % idcorrevent,
186
            status = 401)
187

    
188
class TestRawEventsTableAnonymousHost(TestRawEventsTableAnonymousLLS):
189
    """Idem que TestRawEventsTableAnonymousLLS mais avec un hôte."""
190
    test_service = False
191

    
192
    def test_table(self):
193
        """Événements masqués d'un agrégat sur un hôte en anonyme."""
194
        super(TestRawEventsTableAnonymousHost, self).test_table()
195

    
196

    
197
class TestRawEventsTableWithoutPermsLLS(TestController):
198
    """
199
    Teste l'affichage des événements bruts masqués par un agrégat.
200
    Dans ces tests, l'utilisateur n'a pas les bonnes permissions
201
    et l'agrégat a été causé par un LLS.
202
    """
203
    test_service = True
204

    
205
    def setUp(self):
206
        super(TestRawEventsTableWithoutPermsLLS, self).setUp()
207
        populate_accounts()
208

    
209
    def test_table(self):
210
        """Événements masqués d'un agrégat sur un LLS sans permissions."""
211
        # On peuple la BDD avec un hôte, un service de bas niveau,
212
        # et un groupe d'hôtes et de services associés à ces items.
213
        idcorrevent = populate_DB(self.test_service)
214
        transaction.commit()
215

    
216
        environ = {'REMOTE_USER': 'limited_access'}
217

    
218
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
219
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
220
        # n'a pas accès aux informations concernant cet évènement.
221
        response = self.app.get(
222
            '/masked_events/%d' % idcorrevent,
223
            status = 302,
224
            extra_environ = environ)
225
        response = response.follow(status = 200, extra_environ = environ)
226
        assert_true(len(response.lxml.xpath(
227
            '//div[@id="flash"]/div[@class="error"]')))
228

    
229
        # L'erreur 302 peut venir de l'absence de permissions
230
        # ou bien simplement de l'absence d'événements masqués.
231
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
232
        # on ajoute un événement masqué. On doit à nouveau obtenir
233
        # une erreur 302.
234
        add_masked_event(idcorrevent)
235
        response = self.app.get(
236
            '/masked_events/%d' % idcorrevent,
237
            status = 302,
238
            extra_environ = environ)
239
        response = response.follow(status = 200, extra_environ = environ)
240
        assert_true(len(response.lxml.xpath(
241
            '//div[@id="flash"]/div[@class="error"]')))
242

    
243
class TestRawEventsTableWithoutPermsHost(TestRawEventsTableWithoutPermsLLS):
244
    """Idem que TestRawEventsTableWithoutPermsLLS mais avec un hôte."""
245
    test_service = False
246

    
247
    def test_table(self):
248
        """Événements masqués d'un agrégat sur un hôte sans permissions."""
249
        super(TestRawEventsTableWithoutPermsHost, self).test_table()
250

    
251
class TestRawEventsTableWithPermsLLS(TestController):
252
    """
253
    Teste l'affichage d'une table des événements bruts
254
    rattachés à un agrégat, hormis la cause de cet agrégat.
255
    """
256
    test_service = True
257

    
258
    def setUp(self):
259
        super(TestRawEventsTableWithPermsLLS, self).setUp()
260
        populate_accounts()
261

    
262
    def test_table(self):
263
        """Événements masqués d'un agrégat sur un LLS avec permissions."""
264
        # On peuple la BDD avec un hôte, un service de bas niveau,
265
        # et un groupe d'hôtes et de services associés à ces items.
266
        idcorrevent = populate_DB(True)
267
        transaction.commit()
268

    
269
        environ = {'REMOTE_USER': 'access'}
270

    
271
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
272
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
273
        # n'a pas accès aux informations concernant cet évènement.
274
        response = self.app.get(
275
            '/masked_events/%d' % idcorrevent,
276
            status = 302,
277
            extra_environ = environ)
278
        response = response.follow(status = 200, extra_environ = environ)
279
        assert_true(len(response.lxml.xpath(
280
            '//div[@id="flash"]/div[@class="error"]')))
281

    
282
        # L'erreur 302 peut venir de l'absence de permissions
283
        # ou bien simplement de l'absence d'événements masqués.
284
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
285
        # on ajoute un événement masqué. On doit avoir accès à exactement
286
        # 1 événement masqué à présent.
287
        add_masked_event(idcorrevent)
288
        response = self.app.get(
289
            '/masked_events/%d' % idcorrevent,
290
            status = 200,
291
            extra_environ = environ)
292

    
293
        # On s'attend à trouver exactement 1 événement masqué.
294
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
295
        # de rechercher directement une valeur dans une liste. Elle devrait
296
        # néanmoins suffire pour les besoins des tests.
297
        rows = response.lxml.xpath(
298
            '//table[contains(@class, "vigitable")]/tbody/tr')
299
        assert_equal(len(rows), 1)
300

    
301
class TestRawEventsTableWithPermsHost(TestRawEventsTableWithPermsLLS):
302
    """Idem que TestRawEventsTableWithPermsLLS mais avec un hôte."""
303
    test_service = False
304

    
305
    def test_table(self):
306
        """Événements masqués d'un agrégat sur un hôte avec permissions."""
307
        super(TestRawEventsTableWithPermsHost, self).test_table()