Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_raw_events_table.py @ 011743be

History | View | Annotate | Download (9.55 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
# Copyright (C) 2006-2020 CS GROUP - France
4
# License: GNU GPL v2 <http://www.gnu.org/licenses/gpl-2.0.html>
5

    
6
"""
7
Test du tableau d'événements de Vigiboard
8
"""
9

    
10
from nose.tools import assert_true, assert_equal
11
from datetime import datetime
12
import transaction
13

    
14
from vigilo.models.session import DBSession
15
from vigilo.models.demo import functions
16
from vigilo.models.tables import EventHistory, CorrEvent, User, \
17
                            Permission, Host, UserGroup, \
18
                            SupItemGroup, DataPermission
19
from vigiboard.tests import TestController
20

    
21
def populate_accounts():
22
    perm = Permission.by_permission_name(u'vigiboard-access')
23

    
24
    user = User(
25
        user_name=u'access',
26
        fullname=u'',
27
        email=u'user.has@access',
28
    )
29
    usergroup = UserGroup(group_name=u'users_with_access')
30
    usergroup.permissions.append(perm)
31
    user.usergroups.append(usergroup)
32
    DBSession.add(user)
33
    DBSession.add(usergroup)
34
    DBSession.flush()
35

    
36
    user = User(
37
        user_name=u'limited_access',
38
        fullname=u'',
39
        email=u'user.has.no@access',
40
    )
41
    usergroup = UserGroup(group_name=u'users_with_limited_access')
42
    usergroup.permissions.append(perm)
43
    user.usergroups.append(usergroup)
44
    DBSession.add(user)
45
    DBSession.add(usergroup)
46
    DBSession.flush()
47
    transaction.commit()
48

    
49
def populate_DB(caused_by_service):
50
    """ Peuple la base de données. """
51
    # On ajoute un groupe d'hôtes et un groupe de services.
52
    supitemmanagers = SupItemGroup(name = u'managersgroup', parent=None)
53
    DBSession.add(supitemmanagers)
54
    DBSession.flush()
55

    
56
    usergroup = UserGroup.by_group_name(u'users_with_access')
57
    DBSession.add(DataPermission(
58
        group=supitemmanagers,
59
        usergroup=usergroup,
60
        access=u'r',
61
    ))
62
    DBSession.flush()
63

    
64
    # On crée un hôte de test et on l'ajoute au groupe d'hôtes.
65
    managerhost = functions.add_host(u'managerhost')
66
    supitemmanagers.supitems.append(managerhost)
67
    DBSession.flush()
68

    
69
    # On crée un services de bas niveau et on l'ajoute au groupe de services.
70
    managerservice = functions.add_lowlevelservice(
71
                        managerhost, u'managerservice')
72
    supitemmanagers.supitems.append(managerservice)
73
    DBSession.flush()
74

    
75
    if caused_by_service:
76
        event = functions.add_event(managerservice, u'WARNING', u'foo')
77
    else:
78
        event = functions.add_event(managerhost, u'WARNING', u'foo')
79

    
80
    # Ajout des historiques
81
    DBSession.add(EventHistory(
82
        type_action=u'Nagios update state',
83
        idevent=event.idevent,
84
        timestamp=datetime.utcnow()))
85
    DBSession.add(EventHistory(
86
        type_action=u'Acknowlegement change state',
87
        idevent=event.idevent,
88
        timestamp=datetime.utcnow()))
89
    DBSession.flush()
90

    
91
    # Ajout d'un événement corrélé
92
    aggregate = functions.add_correvent([event])
93
    return aggregate.idcorrevent
94

    
95
def add_masked_event(idcorrevent):
96
    """Ajoute un événement masqué à un événement corrélé."""
97
    transaction.begin()
98
    hostmanagers = SupItemGroup.by_group_name(u'managersgroup')
99
    nb_hosts = DBSession.query(Host).count()
100

    
101
    masked_host = functions.add_host(u'masked host #%d' % nb_hosts)
102
    hostmanagers.supitems.append(masked_host)
103
    DBSession.flush()
104

    
105
    event = functions.add_event(masked_host, u'CRITICAL',
106
                                u'masked event #%d' % nb_hosts)
107

    
108
    aggregate = DBSession.query(CorrEvent).filter(
109
        CorrEvent.idcorrevent == idcorrevent).one()
110
    aggregate.events.append(event)
111
    DBSession.add(aggregate)
112
    DBSession.flush()
113
    transaction.commit()
114

    
115

    
116
class TestRawEventsTableAnonymousLLS(TestController):
117
    """
118
    Teste l'affichage des événements bruts masqués par un agrégat.
119
    Dans ces tests, l'utilisateur n'est pas authentifié et l'agrégat
120
    a été causé par un LLS.
121
    """
122
    test_service = True
123

    
124
    def setUp(self):
125
        super(TestRawEventsTableAnonymousLLS, self).setUp()
126
        populate_accounts()
127

    
128
    def test_table(self):
129
        """Événements masqués d'un agrégat sur un LLS en anonyme."""
130
        # On peuple la BDD avec un hôte, un service de bas niveau,
131
        # et un groupe d'hôtes et de services associés à ces items.
132
        idcorrevent = populate_DB(self.test_service)
133
        transaction.commit()
134

    
135
        # L'utilisateur n'est pas authentifié.
136
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
137
        # demandant à l'utilisateur de s'authentifier.
138
        self.app.get(
139
            '/masked_events/%d' % idcorrevent,
140
            status = 401)
141

    
142
class TestRawEventsTableAnonymousHost(TestRawEventsTableAnonymousLLS):
143
    """Idem que TestRawEventsTableAnonymousLLS mais avec un hôte."""
144
    test_service = False
145

    
146
    def test_table(self):
147
        """Événements masqués d'un agrégat sur un hôte en anonyme."""
148
        super(TestRawEventsTableAnonymousHost, self).test_table()
149

    
150

    
151
class TestRawEventsTableWithoutPermsLLS(TestController):
152
    """
153
    Teste l'affichage des événements bruts masqués par un agrégat.
154
    Dans ces tests, l'utilisateur n'a pas les bonnes permissions
155
    et l'agrégat a été causé par un LLS.
156
    """
157
    test_service = True
158

    
159
    def setUp(self):
160
        super(TestRawEventsTableWithoutPermsLLS, self).setUp()
161
        populate_accounts()
162

    
163
    def test_table(self):
164
        """Événements masqués d'un agrégat sur un LLS sans permissions."""
165
        # On peuple la BDD avec un hôte, un service de bas niveau,
166
        # et un groupe d'hôtes et de services associés à ces items.
167
        idcorrevent = populate_DB(self.test_service)
168
        transaction.commit()
169

    
170
        environ = {'REMOTE_USER': 'limited_access'}
171

    
172
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
173
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
174
        # n'a pas accès aux informations concernant cet évènement.
175
        response = self.app.get(
176
            '/masked_events/%d' % idcorrevent,
177
            status = 302,
178
            extra_environ = environ)
179
        response = response.follow(status = 200, extra_environ = environ)
180
        assert_true(len(response.lxml.xpath(
181
            '//div[@id="flash"]/div[@class="error"]')))
182

    
183
        # L'erreur 302 peut venir de l'absence de permissions
184
        # ou bien simplement de l'absence d'événements masqués.
185
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
186
        # on ajoute un événement masqué. On doit à nouveau obtenir
187
        # une erreur 302.
188
        add_masked_event(idcorrevent)
189
        response = self.app.get(
190
            '/masked_events/%d' % idcorrevent,
191
            status = 302,
192
            extra_environ = environ)
193
        response = response.follow(status = 200, extra_environ = environ)
194
        assert_true(len(response.lxml.xpath(
195
            '//div[@id="flash"]/div[@class="error"]')))
196

    
197
class TestRawEventsTableWithoutPermsHost(TestRawEventsTableWithoutPermsLLS):
198
    """Idem que TestRawEventsTableWithoutPermsLLS mais avec un hôte."""
199
    test_service = False
200

    
201
    def test_table(self):
202
        """Événements masqués d'un agrégat sur un hôte sans permissions."""
203
        super(TestRawEventsTableWithoutPermsHost, self).test_table()
204

    
205
class TestRawEventsTableWithPermsLLS(TestController):
206
    """
207
    Teste l'affichage d'une table des événements bruts
208
    rattachés à un agrégat, hormis la cause de cet agrégat.
209
    """
210
    test_service = True
211

    
212
    def setUp(self):
213
        super(TestRawEventsTableWithPermsLLS, self).setUp()
214
        populate_accounts()
215

    
216
    def test_table(self):
217
        """Événements masqués d'un agrégat sur un LLS avec permissions."""
218
        # On peuple la BDD avec un hôte, un service de bas niveau,
219
        # et un groupe d'hôtes et de services associés à ces items.
220
        idcorrevent = populate_DB(True)
221
        transaction.commit()
222

    
223
        environ = {'REMOTE_USER': 'access'}
224

    
225
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
226
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
227
        # n'a pas accès aux informations concernant cet évènement.
228
        response = self.app.get(
229
            '/masked_events/%d' % idcorrevent,
230
            status = 302,
231
            extra_environ = environ)
232
        response = response.follow(status = 200, extra_environ = environ)
233
        assert_true(len(response.lxml.xpath(
234
            '//div[@id="flash"]/div[@class="error"]')))
235

    
236
        # L'erreur 302 peut venir de l'absence de permissions
237
        # ou bien simplement de l'absence d'événements masqués.
238
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
239
        # on ajoute un événement masqué. On doit avoir accès à exactement
240
        # 1 événement masqué à présent.
241
        add_masked_event(idcorrevent)
242
        response = self.app.get(
243
            '/masked_events/%d' % idcorrevent,
244
            status = 200,
245
            extra_environ = environ)
246

    
247
        # On s'attend à trouver exactement 1 événement masqué.
248
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
249
        # de rechercher directement une valeur dans une liste. Elle devrait
250
        # néanmoins suffire pour les besoins des tests.
251
        rows = response.lxml.xpath(
252
            '//table[contains(@class, "vigitable")]/tbody/tr')
253
        assert_equal(len(rows), 1)
254

    
255
class TestRawEventsTableWithPermsHost(TestRawEventsTableWithPermsLLS):
256
    """Idem que TestRawEventsTableWithPermsLLS mais avec un hôte."""
257
    test_service = False
258

    
259
    def test_table(self):
260
        """Événements masqués d'un agrégat sur un hôte avec permissions."""
261
        super(TestRawEventsTableWithPermsHost, self).test_table()