Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_raw_events_table.py @ 0dcb87f7

History | View | Annotate | Download (10.8 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Event, EventHistory, CorrEvent, User, \
13
                            Permission, StateName, Host, UserGroup, \
14
                            LowLevelService, SupItemGroup, DataPermission
15
from vigilo.models.tables.grouphierarchy import GroupHierarchy
16
from vigiboard.tests import TestController
17

    
18
def populate_accounts():
19
        perm = Permission.by_permission_name(u'vigiboard-access')
20

    
21
        user = User(
22
            user_name=u'access',
23
            fullname=u'',
24
            email=u'user.has@access',
25
        )
26
        usergroup = UserGroup(
27
            group_name=u'users_with_access',
28
        )
29
        usergroup.permissions.append(perm)
30
        user.usergroups.append(usergroup)
31
        DBSession.add(user)
32
        DBSession.add(usergroup)
33
        DBSession.flush()
34

    
35
        user = User(
36
            user_name=u'limited_access',
37
            fullname=u'',
38
            email=u'user.has.no@access',
39
        )
40
        usergroup = UserGroup(
41
            group_name=u'users_with_limited_access',
42
        )
43
        usergroup.permissions.append(perm)
44
        user.usergroups.append(usergroup)
45
        DBSession.add(user)
46
        DBSession.add(usergroup)
47
        DBSession.flush()
48
        transaction.commit()
49

    
50
def populate_DB(caused_by_service):
51
    """ Peuple la base de données. """
52
    # On ajoute un groupe d'hôtes et un groupe de services.
53
    supitemmanagers = SupItemGroup(name = u'managersgroup')
54
    DBSession.add(supitemmanagers)
55
    DBSession.flush()
56

    
57
    DBSession.add(GroupHierarchy(
58
        parent=supitemmanagers,
59
        child=supitemmanagers,
60
        hops=0,
61
    ))
62
    DBSession.flush()
63

    
64
    usergroup = UserGroup.by_group_name(u'users_with_access')
65
    DBSession.add(DataPermission(
66
        group=supitemmanagers,
67
        usergroup=usergroup,
68
        access=u'r',
69
    ))
70
    DBSession.flush()
71

    
72
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
73
    managerhost = Host(
74
        name = u'managerhost',      
75
        checkhostcmd = u'halt',
76
        snmpcommunity = u'public',
77
        hosttpl = u'/dev/null',
78
        address = u'192.168.1.1',
79
        snmpport = 42,
80
        weight = 42,
81
    )
82
    DBSession.add(managerhost)
83
    supitemmanagers.supitems.append(managerhost)
84
    DBSession.flush()
85

    
86
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
87
    managerservice = LowLevelService(
88
        host = managerhost,
89
        servicename = u'managerservice',
90
        command = u'halt',
91
        op_dep = u'+',
92
        weight = 42,
93
    )
94
    DBSession.add(managerservice)
95
    supitemmanagers.supitems.append(managerservice)
96
    DBSession.flush()
97

    
98
    if caused_by_service:
99
        supitem = managerservice
100
    else:
101
        supitem = managerhost
102

    
103
    # Ajout d'un événement
104
    event = Event(
105
        supitem = supitem, 
106
        message = u'foo',
107
        current_state = StateName.statename_to_value(u"WARNING"),
108
        timestamp = datetime.now(),
109
    )
110
    DBSession.add(event)
111
    DBSession.flush()
112

    
113
    # Ajout des historiques
114
    DBSession.add(EventHistory(
115
        type_action=u'Nagios update state',
116
        idevent=event.idevent, 
117
        timestamp=datetime.now()))
118
    DBSession.add(EventHistory(
119
        type_action=u'Acknowlegement change state',
120
        idevent=event.idevent, 
121
        timestamp=datetime.now()))
122
    DBSession.flush()
123

    
124
    # Ajout d'un événement corrélé
125
    aggregate = CorrEvent(
126
        idcause = event.idevent, 
127
        timestamp_active = datetime.now(),
128
        priority = 1,
129
        status = u"None")
130
    aggregate.events.append(event)
131
    DBSession.add(aggregate)
132
    DBSession.flush()
133
    return aggregate.idcorrevent
134

    
135
def add_masked_event(idcorrevent):
136
    """Ajoute un événement masqué à un événement corrélé."""
137
    transaction.begin()
138
    hostmanagers = SupItemGroup.by_group_name(u'managersgroup')
139
    nb_hosts = DBSession.query(Host).count()
140

    
141
    masked_host = Host(
142
        name = u'masked host #%d' % nb_hosts,      
143
        checkhostcmd = u'halt',
144
        snmpcommunity = u'public',
145
        hosttpl = u'/dev/null',
146
        address = u'192.168.1.%d' % nb_hosts,
147
        snmpport = 42,
148
        weight = 42,
149
    )
150
    DBSession.add(masked_host)
151
    hostmanagers.supitems.append(masked_host)
152
    DBSession.flush()
153

    
154
    event = Event(
155
        supitem = masked_host,
156
        message = u'masked event #%d' % nb_hosts,
157
        current_state = StateName.statename_to_value(u"CRITICAL"),
158
        timestamp = datetime.now(),
159
    )
160
    DBSession.add(event)
161
    DBSession.flush()
162

    
163
    aggregate = DBSession.query(CorrEvent).filter(
164
        CorrEvent.idcorrevent == idcorrevent).one()
165
    aggregate.events.append(event)
166
    DBSession.add(aggregate)
167
    DBSession.flush()
168
    transaction.commit()
169

    
170

    
171
class TestRawEventsTableAnonymousLLS(TestController):
172
    """
173
    Teste l'affichage des événements bruts masqués par un agrégat.
174
    Dans ces tests, l'utilisateur n'est pas authentifié et l'agrégat
175
    a été causé par un LLS.
176
    """
177
    test_service = True
178

    
179
    def setUp(self):
180
        super(TestRawEventsTableAnonymousLLS, self).setUp()
181
        populate_accounts()
182

    
183
    def test_table(self):
184
        """Événements masqués d'un agrégat sur un LLS en anonyme."""
185
        # On peuple la BDD avec un hôte, un service de bas niveau,
186
        # et un groupe d'hôtes et de services associés à ces items.
187
        idcorrevent = populate_DB(self.test_service)
188
        transaction.commit()
189

    
190
        # L'utilisateur n'est pas authentifié.
191
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
192
        # demandant à l'utilisateur de s'authentifier.
193
        response = self.app.get(
194
            '/masked_events/%d' % idcorrevent,
195
            status = 401)
196

    
197
class TestRawEventsTableAnonymousHost(TestRawEventsTableAnonymousLLS):
198
    """Idem que TestRawEventsTableAnonymousLLS mais avec un hôte."""
199
    test_service = False
200

    
201
    def test_table(self):
202
        """Événements masqués d'un agrégat sur un hôte en anonyme."""
203
        super(TestRawEventsTableAnonymousHost, self).test_table()
204

    
205

    
206
class TestRawEventsTableWithoutPermsLLS(TestController):
207
    """
208
    Teste l'affichage des événements bruts masqués par un agrégat.
209
    Dans ces tests, l'utilisateur n'a pas les bonnes permissions
210
    et l'agrégat a été causé par un LLS.
211
    """
212
    test_service = True
213

    
214
    def setUp(self):
215
        super(TestRawEventsTableWithoutPermsLLS, self).setUp()
216
        populate_accounts()
217

    
218
    def test_table(self):
219
        """Événements masqués d'un agrégat sur un LLS sans permissions."""
220
        # On peuple la BDD avec un hôte, un service de bas niveau,
221
        # et un groupe d'hôtes et de services associés à ces items.
222
        idcorrevent = populate_DB(self.test_service)
223
        transaction.commit()
224

    
225
        environ = {'REMOTE_USER': 'limited_access'}
226

    
227
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
228
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
229
        # n'a pas accès aux informations concernant cet évènement.
230
        response = self.app.get(
231
            '/masked_events/%d' % idcorrevent,
232
            status = 302, 
233
            extra_environ = environ)
234
        response = response.follow(status = 200, extra_environ = environ)
235
        assert_true(len(response.lxml.xpath(
236
            '//div[@id="flash"]/div[@class="error"]')))
237

    
238
        # L'erreur 302 peut venir de l'absence de permissions
239
        # ou bien simplement de l'absence d'événements masqués.
240
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
241
        # on ajoute un événement masqué. On doit à nouveau obtenir
242
        # une erreur 302.
243
        add_masked_event(idcorrevent)
244
        response = self.app.get(
245
            '/masked_events/%d' % idcorrevent,
246
            status = 302, 
247
            extra_environ = environ)
248
        response = response.follow(status = 200, extra_environ = environ)
249
        assert_true(len(response.lxml.xpath(
250
            '//div[@id="flash"]/div[@class="error"]')))
251

    
252
class TestRawEventsTableWithoutPermsHost(TestRawEventsTableWithoutPermsLLS):
253
    """Idem que TestRawEventsTableWithoutPermsLLS mais avec un hôte."""
254
    test_service = False
255

    
256
    def test_table(self):
257
        """Événements masqués d'un agrégat sur un hôte sans permissions."""
258
        super(TestRawEventsTableWithoutPermsHost, self).test_table()
259

    
260
class TestRawEventsTableWithPermsLLS(TestController):
261
    """
262
    Teste l'affichage d'une table des événements bruts
263
    rattachés à un agrégat, hormis la cause de cet agrégat.
264
    """
265
    test_service = True
266

    
267
    def setUp(self):
268
        super(TestRawEventsTableWithPermsLLS, self).setUp()
269
        populate_accounts()
270

    
271
    def test_table(self):
272
        """Événements masqués d'un agrégat sur un LLS avec permissions."""
273
        # On peuple la BDD avec un hôte, un service de bas niveau,
274
        # et un groupe d'hôtes et de services associés à ces items.
275
        idcorrevent = populate_DB(True)
276
        transaction.commit()
277

    
278
        environ = {'REMOTE_USER': 'access'}
279

    
280
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
281
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
282
        # n'a pas accès aux informations concernant cet évènement.
283
        response = self.app.get(
284
            '/masked_events/%d' % idcorrevent,
285
            status = 302,
286
            extra_environ = environ)
287
        response = response.follow(status = 200, extra_environ = environ)
288
        assert_true(len(response.lxml.xpath(
289
            '//div[@id="flash"]/div[@class="error"]')))
290

    
291
        # L'erreur 302 peut venir de l'absence de permissions
292
        # ou bien simplement de l'absence d'événements masqués.
293
        # Pour vérifier qu'il s'agit bien d'un problème de permissions,
294
        # on ajoute un événement masqué. On doit avoir accès à exactement
295
        # 1 événement masqué à présent.
296
        add_masked_event(idcorrevent)
297
        response = self.app.get(
298
            '/masked_events/%d' % idcorrevent,
299
            status = 200,
300
            extra_environ = environ)
301

    
302
        # On s'attend à trouver exactement 1 événement masqué.
303
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
304
        # de rechercher directement une valeur dans une liste. Elle devrait
305
        # néanmoins suffire pour les besoins des tests.
306
        rows = response.lxml.xpath(
307
            '//table[contains(@class, "vigitable")]/tbody/tr')
308
        assert_equal(len(rows), 1)
309

    
310
class TestRawEventsTableWithPermsHost(TestRawEventsTableWithPermsLLS):
311
    """Idem que TestRawEventsTableWithPermsLLS mais avec un hôte."""
312
    test_service = False
313

    
314
    def test_table(self):
315
        """Événements masqués d'un agrégat sur un hôte avec permissions."""
316
        super(TestRawEventsTableWithPermsHost, self).test_table()
317