Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_correvents_table.py @ 0bd9c069

History | View | Annotate | Download (9.35 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Event, CorrEvent, DataPermission, \
13
                            Permission, StateName, Host, SupItemGroup, \
14
                            LowLevelService, User, UserGroup, Permission
15
from vigiboard.tests import TestController
16

    
17
def populate_DB():
18
    """ Peuple la base de données. """
19
    # On ajoute les groupes et leurs dépendances
20
    hostmanagers = SupItemGroup(name=u'managersgroup')
21
    DBSession.add(hostmanagers)
22
    hosteditors = SupItemGroup(name=u'editorsgroup', parent=hostmanagers)
23
    DBSession.add(hosteditors)
24
    DBSession.flush()
25

    
26
    usergroup = UserGroup.by_group_name(u'users_with_access')
27
    DBSession.add(DataPermission(
28
        group=hostmanagers,
29
        usergroup=usergroup,
30
        access=u'r',
31
    ))
32
    DBSession.add(DataPermission(
33
        group=hosteditors,
34
        usergroup=usergroup,
35
        access=u'r',
36
    ))
37

    
38
    usergroup = UserGroup.by_group_name(u'users_with_limited_access')
39
    DBSession.add(DataPermission(
40
        group=hosteditors,
41
        usergroup=usergroup,
42
        access=u'r',
43
    ))
44
    DBSession.flush()
45

    
46
    # Création des hôtes de test.
47
    host_template = {
48
        'checkhostcmd': u'halt',
49
        'snmpcommunity': u'public',
50
        'hosttpl': u'/dev/null',
51
        'address': u'192.168.1.1',
52
        'snmpport': 42,
53
        'weight': 42,
54
    }
55

    
56
    managerhost = Host(name=u'managerhost', **host_template)
57
    editorhost = Host(name=u'editorhost', **host_template)
58
    DBSession.add(managerhost)
59
    DBSession.add(editorhost)
60

    
61
    # Affectation des hôtes aux groupes.
62
    hosteditors.supitems.append(editorhost)
63
    hostmanagers.supitems.append(managerhost)
64
    DBSession.flush()
65

    
66
    # Création des services techniques de test.
67
    service_template = {
68
        'command': u'halt',
69
        'weight': 42,
70
    }
71

    
72
    service1 = LowLevelService(
73
        host=managerhost,
74
        servicename=u'managerservice',
75
        **service_template
76
    )
77

    
78
    service2 = LowLevelService(
79
        host=editorhost,
80
        servicename=u'managerservice',
81
        **service_template
82
    )
83

    
84
    service3 = LowLevelService(
85
        host=managerhost,
86
        servicename=u'editorservice',
87
        **service_template
88
    )
89

    
90
    service4 = LowLevelService(
91
        host=editorhost,
92
        servicename=u'editorservice',
93
        **service_template
94
    )
95

    
96
    DBSession.add(service1)
97
    DBSession.add(service2)
98
    DBSession.add(service3)
99
    DBSession.add(service4)
100
    DBSession.flush()
101

    
102
    # Ajout des événements eux-mêmes
103
    event_template = {
104
        'message': u'foo',
105
        'current_state': StateName.statename_to_value(u'WARNING'),
106
        'timestamp': datetime.now(),
107
    }
108

    
109
    event1 = Event(supitem=service1, **event_template)
110
    event2 = Event(supitem=service2, **event_template)
111
    event3 = Event(supitem=service3, **event_template)
112
    event4 = Event(supitem=service4, **event_template)
113
    event5 = Event(supitem=editorhost, **event_template)
114
    event6 = Event(supitem=managerhost, **event_template)
115

    
116
    DBSession.add(event1)
117
    DBSession.add(event2)
118
    DBSession.add(event3)
119
    DBSession.add(event4)
120
    DBSession.add(event5)
121
    DBSession.add(event6)
122
    DBSession.flush()
123

    
124
    # Ajout des événements corrélés
125
    aggregate_template = {
126
        'timestamp_active': datetime.now(),
127
        'priority': 1,
128
        'status': u'None',
129
    }
130

    
131
    aggregate1 = CorrEvent(
132
        idcause=event1.idevent, **aggregate_template)
133
    aggregate2 = CorrEvent(
134
        idcause=event4.idevent, **aggregate_template)
135
    aggregate3 = CorrEvent(
136
        idcause=event5.idevent, **aggregate_template)
137
    aggregate4 = CorrEvent(
138
        idcause=event6.idevent, **aggregate_template)
139

    
140
    aggregate1.events.append(event1)
141
    aggregate1.events.append(event3)
142
    aggregate2.events.append(event4)
143
    aggregate2.events.append(event2)
144
    aggregate3.events.append(event5)
145
    aggregate4.events.append(event6)
146
    DBSession.add(aggregate1)
147
    DBSession.add(aggregate2)
148
    DBSession.add(aggregate3)
149
    DBSession.add(aggregate4)
150

    
151
    DBSession.flush()
152
    transaction.commit()
153

    
154
class TestEventTable(TestController):
155
    """
156
    Test du tableau d'événements de Vigiboard
157
    """
158
    def setUp(self):
159
        super(TestEventTable, self).setUp()
160
        perm = Permission.by_permission_name(u'vigiboard-access')
161

    
162
        user = User(
163
            user_name=u'access',
164
            fullname=u'',
165
            email=u'user.has@access',
166
        )
167
        usergroup = UserGroup(
168
            group_name=u'users_with_access',
169
        )
170
        usergroup.permissions.append(perm)
171
        user.usergroups.append(usergroup)
172
        DBSession.add(user)
173
        DBSession.add(usergroup)
174
        DBSession.flush()
175

    
176
        user = User(
177
            user_name=u'limited_access',
178
            fullname=u'',
179
            email=u'user.has.no@access',
180
        )
181
        usergroup = UserGroup(
182
            group_name=u'users_with_limited_access',
183
        )
184
        usergroup.permissions.append(perm)
185
        user.usergroups.append(usergroup)
186
        DBSession.add(user)
187
        DBSession.add(usergroup)
188
        DBSession.flush()
189

    
190
        populate_DB()
191

    
192

    
193
    def test_homepage(self):
194
        """
195
        Tableau des événements (page d'accueil).
196
        """
197
        # L'utilisateur n'est pas authentifié.
198
        response = self.app.get('/', status=401)
199

    
200
        # L'utilisateur est authentifié avec des permissions réduites.
201
        environ = {'REMOTE_USER': 'limited_access'}
202
        response = self.app.get('/', extra_environ=environ)
203

    
204
        # Il doit y avoir 2 lignes de résultats.
205
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
206
        print "There are %d rows in the result set" % len(rows)
207
        assert_equal(len(rows), 2)
208

    
209
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
210
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
211
        print "There are %d columns in the result set" % len(cols)
212
        assert_true(len(cols) > 1)
213

    
214
        # L'utilisateur est authentifié avec des permissions plus étendues.
215
        environ = {'REMOTE_USER': 'access'}
216
        response = self.app.get('/', extra_environ=environ)
217

    
218
        # Il doit y avoir 4 lignes de résultats.
219
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
220
        print "There are %d rows in the result set" % len(rows)
221
        assert_equal(len(rows), 4)
222

    
223
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
224
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
225
        print "There are %d columns in the result set" % len(cols)
226
        assert_true(len(cols) > 1)
227

    
228
    def test_correvents_table_for_LLS(self):
229
        """
230
        Tableau des événements corrélés pour un service de bas niveau.
231
        """
232
        url = '/item/1/%s/%s' % ('managerhost', 'managerservice')
233

    
234
        # L'utilisateur n'est pas authentifié.
235
        response = self.app.get(url, status=401)
236

    
237
        # L'utilisateur dispose de permissions restreintes.
238
        # Il n'a pas accès aux événements corrélés sur le service donné.
239
        # Donc, on s'attend à être redirigé avec un message d'erreur.
240
        environ = {'REMOTE_USER': 'limited_access'}
241
        response = self.app.get(url, extra_environ=environ, status=302)
242
        response = response.follow(status = 200, extra_environ = environ)
243
        assert_true(len(response.lxml.xpath(
244
            '//div[@id="flash"]/div[@class="error"]')))
245

    
246
        # L'utilisateur dispose de permissions plus étendues.
247
        # Il doit avoir accès à l'historique.
248
        # Ici, il n'y a qu'un seul événement corrélé pour ce service.
249
        environ = {'REMOTE_USER': 'access'}
250
        response = self.app.get(url, extra_environ=environ, status=200)
251

    
252
        # Il doit y avoir 1 ligne de résultats.
253
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
254
        print "There are %d rows in the result set" % len(rows)
255
        assert_equal(len(rows), 1)
256

    
257
    def test_correvents_table_for_host(self):
258
        """
259
        Tableau des événements corrélés pour un hôte.
260
        """
261
        url = '/item/1/%s/' % ('managerhost', )
262

    
263
        # L'utilisateur n'est pas authentifié.
264
        response = self.app.get(url, status=401)
265

    
266
        # L'utilisateur dispose de permissions restreintes.
267
        # Il n'a pas accès aux événements corrélés sur le service donné.
268
        # Donc, on s'attend à être redirigé avec un message d'erreur.
269
        environ = {'REMOTE_USER': 'limited_access'}
270
        response = self.app.get(url, extra_environ=environ, status=302)
271
        response = response.follow(status = 200, extra_environ = environ)
272
        assert_true(len(response.lxml.xpath(
273
            '//div[@id="flash"]/div[@class="error"]')))
274

    
275
        # L'utilisateur dispose de permissions plus étendues.
276
        # Il doit avoir accès à l'historique.
277
        # Ici, il n'y a qu'un seul événement corrélé pour ce service.
278
        environ = {'REMOTE_USER': 'access'}
279
        response = self.app.get(url, extra_environ=environ, status=200)
280

    
281
        # Il doit y avoir 1 ligne de résultats.
282
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
283
        print "There are %d rows in the result set" % len(rows)
284
        assert_equal(len(rows), 1)
285