Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_correvents_table.py @ 0dcb87f7

History | View | Annotate | Download (9.77 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Event, CorrEvent, DataPermission, \
13
                            Permission, StateName, Host, SupItemGroup, \
14
                            LowLevelService, User, UserGroup, Permission
15
from vigilo.models.tables.grouphierarchy import GroupHierarchy
16
from vigiboard.tests import TestController
17

    
18
def populate_DB():
19
    """ Peuple la base de données. """
20
    # On ajoute les groupes et leurs dépendances
21
    hosteditors = SupItemGroup(name=u'editorsgroup')
22
    DBSession.add(hosteditors)
23
    hostmanagers = SupItemGroup(name=u'managersgroup')
24
    DBSession.add(hostmanagers)
25
    DBSession.flush()
26

    
27
    DBSession.add(GroupHierarchy(
28
        parent=hosteditors,
29
        child=hosteditors,
30
        hops=0,
31
    ))
32
    DBSession.add(GroupHierarchy(
33
        parent=hostmanagers,
34
        child=hostmanagers,
35
        hops=0,
36
    ))
37
    DBSession.add(GroupHierarchy(
38
        parent=hostmanagers,
39
        child=hosteditors,
40
        hops=1,
41
    ))
42
    DBSession.flush()
43

    
44
    usergroup = UserGroup.by_group_name(u'users_with_access')
45
    DBSession.add(DataPermission(
46
        group=hostmanagers,
47
        usergroup=usergroup,
48
        access=u'r',
49
    ))
50
    DBSession.add(DataPermission(
51
        group=hosteditors,
52
        usergroup=usergroup,
53
        access=u'r',
54
    ))
55

    
56
    usergroup = UserGroup.by_group_name(u'users_with_limited_access')
57
    DBSession.add(DataPermission(
58
        group=hosteditors,
59
        usergroup=usergroup,
60
        access=u'r',
61
    ))
62
    DBSession.flush()
63

    
64
    # Création des hôtes de test.
65
    host_template = {
66
        'checkhostcmd': u'halt',
67
        'snmpcommunity': u'public',
68
        'hosttpl': u'/dev/null',
69
        'address': u'192.168.1.1',
70
        'snmpport': 42,
71
        'weight': 42,
72
    }
73

    
74
    managerhost = Host(name=u'managerhost', **host_template)
75
    editorhost = Host(name=u'editorhost', **host_template)
76
    DBSession.add(managerhost)
77
    DBSession.add(editorhost)
78

    
79
    # Affectation des hôtes aux groupes.
80
    hosteditors.supitems.append(editorhost)
81
    hostmanagers.supitems.append(managerhost)
82
    DBSession.flush()
83

    
84
    # Création des services techniques de test.
85
    service_template = {
86
        'command': u'halt',
87
        'op_dep': u'+',
88
        'weight': 42,
89
    }
90

    
91
    service1 = LowLevelService(
92
        host=managerhost,
93
        servicename=u'managerservice',
94
        **service_template
95
    )
96

    
97
    service2 = LowLevelService(
98
        host=editorhost,
99
        servicename=u'managerservice',
100
        **service_template
101
    )
102

    
103
    service3 = LowLevelService(
104
        host=managerhost,
105
        servicename=u'editorservice',
106
        **service_template
107
    )
108

    
109
    service4 = LowLevelService(
110
        host=editorhost,
111
        servicename=u'editorservice',
112
        **service_template
113
    )
114

    
115
    DBSession.add(service1)
116
    DBSession.add(service2)
117
    DBSession.add(service3)
118
    DBSession.add(service4)
119
    DBSession.flush()
120

    
121
    # Ajout des événements eux-mêmes
122
    event_template = {
123
        'message': u'foo',
124
        'current_state': StateName.statename_to_value(u'WARNING'),
125
        'timestamp': datetime.now(),
126
    }
127

    
128
    event1 = Event(supitem=service1, **event_template)
129
    event2 = Event(supitem=service2, **event_template)
130
    event3 = Event(supitem=service3, **event_template)
131
    event4 = Event(supitem=service4, **event_template)
132
    event5 = Event(supitem=editorhost, **event_template)
133
    event6 = Event(supitem=managerhost, **event_template)
134

    
135
    DBSession.add(event1)
136
    DBSession.add(event2)
137
    DBSession.add(event3)
138
    DBSession.add(event4)
139
    DBSession.add(event5)
140
    DBSession.add(event6)
141
    DBSession.flush()
142

    
143
    # Ajout des événements corrélés
144
    aggregate_template = {
145
        'timestamp_active': datetime.now(),
146
        'priority': 1,
147
        'status': u'None',
148
    }
149

    
150
    aggregate1 = CorrEvent(
151
        idcause=event1.idevent, **aggregate_template)
152
    aggregate2 = CorrEvent(
153
        idcause=event4.idevent, **aggregate_template)
154
    aggregate3 = CorrEvent(
155
        idcause=event5.idevent, **aggregate_template)
156
    aggregate4 = CorrEvent(
157
        idcause=event6.idevent, **aggregate_template)
158

    
159
    aggregate1.events.append(event1)
160
    aggregate1.events.append(event3)
161
    aggregate2.events.append(event4)
162
    aggregate2.events.append(event2)
163
    aggregate3.events.append(event5)
164
    aggregate4.events.append(event6)
165
    DBSession.add(aggregate1)
166
    DBSession.add(aggregate2)
167
    DBSession.add(aggregate3)
168
    DBSession.add(aggregate4)
169

    
170
    DBSession.flush()
171
    transaction.commit()
172

    
173
class TestEventTable(TestController):
174
    """
175
    Test du tableau d'événements de Vigiboard
176
    """
177
    def setUp(self):
178
        super(TestEventTable, self).setUp()
179
        perm = Permission.by_permission_name(u'vigiboard-access')
180

    
181
        user = User(
182
            user_name=u'access',
183
            fullname=u'',
184
            email=u'user.has@access',
185
        )
186
        usergroup = UserGroup(
187
            group_name=u'users_with_access',
188
        )
189
        usergroup.permissions.append(perm)
190
        user.usergroups.append(usergroup)
191
        DBSession.add(user)
192
        DBSession.add(usergroup)
193
        DBSession.flush()
194

    
195
        user = User(
196
            user_name=u'limited_access',
197
            fullname=u'',
198
            email=u'user.has.no@access',
199
        )
200
        usergroup = UserGroup(
201
            group_name=u'users_with_limited_access',
202
        )
203
        usergroup.permissions.append(perm)
204
        user.usergroups.append(usergroup)
205
        DBSession.add(user)
206
        DBSession.add(usergroup)
207
        DBSession.flush()
208

    
209
        populate_DB()
210

    
211

    
212
    def test_homepage(self):
213
        """
214
        Tableau des événements (page d'accueil).
215
        """
216
        # L'utilisateur n'est pas authentifié.
217
        response = self.app.get('/', status=401)
218

    
219
        # L'utilisateur est authentifié avec des permissions réduites.
220
        environ = {'REMOTE_USER': 'limited_access'}
221
        response = self.app.get('/', extra_environ=environ)
222

    
223
        # Il doit y avoir 2 lignes de résultats.
224
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
225
        print "There are %d rows in the result set" % len(rows)
226
        assert_equal(len(rows), 2)
227

    
228
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
229
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
230
        print "There are %d columns in the result set" % len(cols)
231
        assert_true(len(cols) > 1)
232

    
233
        # L'utilisateur est authentifié avec des permissions plus étendues.
234
        environ = {'REMOTE_USER': 'access'}
235
        response = self.app.get('/', extra_environ=environ)
236

    
237
        # Il doit y avoir 4 lignes de résultats.
238
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
239
        print "There are %d rows in the result set" % len(rows)
240
        assert_equal(len(rows), 4)
241

    
242
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
243
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
244
        print "There are %d columns in the result set" % len(cols)
245
        assert_true(len(cols) > 1)
246

    
247
    def test_correvents_table_for_LLS(self):
248
        """
249
        Tableau des événements corrélés pour un service de bas niveau.
250
        """
251
        url = '/item/1/%s/%s' % ('managerhost', 'managerservice')
252

    
253
        # L'utilisateur n'est pas authentifié.
254
        response = self.app.get(url, status=401)
255

    
256
        # L'utilisateur dispose de permissions restreintes.
257
        # Il n'a pas accès aux événements corrélés sur le service donné.
258
        # Donc, on s'attend à être redirigé avec un message d'erreur.
259
        environ = {'REMOTE_USER': 'limited_access'}
260
        response = self.app.get(url, extra_environ=environ, status=302)
261
        response = response.follow(status = 200, extra_environ = environ)
262
        assert_true(len(response.lxml.xpath(
263
            '//div[@id="flash"]/div[@class="error"]')))
264

    
265
        # L'utilisateur dispose de permissions plus étendues.
266
        # Il doit avoir accès à l'historique.
267
        # Ici, il n'y a qu'un seul événement corrélé pour ce service.
268
        environ = {'REMOTE_USER': 'access'}
269
        response = self.app.get(url, extra_environ=environ, status=200)
270

    
271
        # Il doit y avoir 1 ligne de résultats.
272
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
273
        print "There are %d rows in the result set" % len(rows)
274
        assert_equal(len(rows), 1)
275

    
276
    def test_correvents_table_for_host(self):
277
        """
278
        Tableau des événements corrélés pour un hôte.
279
        """
280
        url = '/item/1/%s/' % ('managerhost', )
281

    
282
        # L'utilisateur n'est pas authentifié.
283
        response = self.app.get(url, status=401)
284

    
285
        # L'utilisateur dispose de permissions restreintes.
286
        # Il n'a pas accès aux événements corrélés sur le service donné.
287
        # Donc, on s'attend à être redirigé avec un message d'erreur.
288
        environ = {'REMOTE_USER': 'limited_access'}
289
        response = self.app.get(url, extra_environ=environ, status=302)
290
        response = response.follow(status = 200, extra_environ = environ)
291
        assert_true(len(response.lxml.xpath(
292
            '//div[@id="flash"]/div[@class="error"]')))
293

    
294
        # L'utilisateur dispose de permissions plus étendues.
295
        # Il doit avoir accès à l'historique.
296
        # Ici, il n'y a qu'un seul événement corrélé pour ce service.
297
        environ = {'REMOTE_USER': 'access'}
298
        response = self.app.get(url, extra_environ=environ, status=200)
299

    
300
        # Il doit y avoir 1 ligne de résultats.
301
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
302
        print "There are %d rows in the result set" % len(rows)
303
        assert_equal(len(rows), 1)
304