Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_history_table.py @ b373a5de

History | View | Annotate | Download (8.12 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
# Copyright (C) 2006-2011 CS-SI
4
# License: GNU GPL v2 <http://www.gnu.org/licenses/gpl-2.0.html>
5

    
6
"""
7
Vérifie que la page qui affiche l'historique des actions sur un événement
8
brut fonctionne correctement.
9
"""
10

    
11
from nose.tools import assert_true, assert_equal
12
from datetime import datetime
13
import transaction
14

    
15
from vigilo.models.session import DBSession
16
from vigilo.models.tables import Event, EventHistory, CorrEvent, \
17
                            Permission, StateName, Host, \
18
                            SupItemGroup, LowLevelService, \
19
                            Permission, DataPermission, User, \
20
                            UserGroup
21
from vigiboard.tests import TestController
22

    
23
def populate_DB():
24
    """ Peuple la base de données. """
25
    # On ajoute un groupe d'hôtes et un groupe de services.
26
    supitemmanagers = SupItemGroup(name=u'managersgroup', parent=None)
27
    DBSession.add(supitemmanagers)
28
    DBSession.flush()
29

    
30
    usergroup = UserGroup.by_group_name(u'users_with_access')
31
    DBSession.add(DataPermission(
32
        group=supitemmanagers,
33
        usergroup=usergroup,
34
        access=u'r',
35
    ))
36
    DBSession.flush()
37

    
38
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
39
    managerhost = Host(
40
        name = u'managerhost',
41
        checkhostcmd = u'halt',
42
        snmpcommunity = u'public',
43
        hosttpl = u'/dev/null',
44
        address = u'192.168.1.1',
45
        snmpport = 42,
46
        weight = 42,
47
    )
48
    DBSession.add(managerhost)
49
    supitemmanagers.supitems.append(managerhost)
50
    DBSession.flush()
51

    
52
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
53
    managerservice = LowLevelService(
54
        host = managerhost,
55
        servicename = u'managerservice',
56
        command = u'halt',
57
        weight = 42,
58
    )
59
    DBSession.add(managerservice)
60
    supitemmanagers.supitems.append(managerservice)
61
    DBSession.flush()
62

    
63
    return (managerhost, managerservice)
64

    
65
def add_correvent_caused_by(supitem):
66
    """
67
    Ajoute dans la base de données un évènement corrélé causé
68
    par un incident survenu sur l'item passé en paramètre.
69
    Génère un historique pour les tests.
70
    """
71

    
72
    # Ajout d'un événement
73
    event = Event(
74
        supitem = supitem,
75
        message = u'foo',
76
        current_state = StateName.statename_to_value(u"WARNING"),
77
        timestamp = datetime.now(),
78
    )
79
    DBSession.add(event)
80
    DBSession.flush()
81

    
82
    # Ajout des historiques
83
    DBSession.add(EventHistory(
84
        type_action=u'Nagios update state',
85
        idevent=event.idevent,
86
        timestamp=datetime.now()))
87
    DBSession.add(EventHistory(
88
        type_action=u'Acknowlegement change state',
89
        idevent=event.idevent,
90
        timestamp=datetime.now()))
91
    DBSession.flush()
92

    
93
    # Ajout d'un événement corrélé
94
    aggregate = CorrEvent(
95
        idcause = event.idevent,
96
        timestamp_active = datetime.now(),
97
        priority = 1,
98
        status = u"None")
99
    aggregate.events.append(event)
100
    DBSession.add(aggregate)
101
    DBSession.flush()
102

    
103
    return event.idevent
104

    
105

    
106
class TestHistoryTable(TestController):
107
    """
108
    Teste la table qui affiche l'historique des actions
109
    sur un événement brut.
110
    """
111
    def setUp(self):
112
        super(TestHistoryTable, self).setUp()
113
        perm = Permission.by_permission_name(u'vigiboard-access')
114

    
115
        user = User(
116
            user_name=u'access',
117
            fullname=u'',
118
            email=u'user.has@access',
119
        )
120
        usergroup = UserGroup(group_name=u'users_with_access')
121
        usergroup.permissions.append(perm)
122
        user.usergroups.append(usergroup)
123
        DBSession.add(user)
124
        DBSession.add(usergroup)
125
        DBSession.flush()
126

    
127
        user = User(
128
            user_name=u'limited_access',
129
            fullname=u'',
130
            email=u'user.has.no@access',
131
        )
132
        usergroup = UserGroup(group_name=u'users_with_limited_access')
133
        usergroup.permissions.append(perm)
134
        user.usergroups.append(usergroup)
135
        DBSession.add(user)
136
        DBSession.add(usergroup)
137
        DBSession.flush()
138

    
139

    
140
    def test_cause_host_history(self):
141
        """Historique de la cause d'un événement corrélé sur un hôte."""
142

    
143
        # On peuple la BDD avec un hôte, un service de bas niveau,
144
        # et un groupe d'hôtes et de services associés à ces items.
145
        (managerhost, managerservice) = populate_DB()
146

    
147
        # On ajoute un évènement corrélé causé par l'hôte
148
        idevent = add_correvent_caused_by(managerhost)
149
        transaction.commit()
150

    
151
        # L'utilisateur n'est pas authentifié.
152
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
153
        # demandant à l'utilisateur de s'authentifier.
154
        response = self.app.get(
155
            '/event/%d' % idevent,
156
            status = 401)
157

    
158
        # L'utilisateur N'A PAS les bonnes permissions.
159
        environ = {'REMOTE_USER': 'limited_access'}
160

    
161
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
162
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
163
        # n'a pas accès aux informations concernant cet évènement.
164
        response = self.app.get(
165
            '/event/%d' % idevent,
166
            status = 302,
167
            extra_environ = environ)
168

    
169
        # On suit la redirection.
170
        response = response.follow(status = 200, extra_environ = environ)
171
        assert_true(len(response.lxml.xpath(
172
            '//div[@id="flash"]/div[@class="error"]')))
173

    
174
        # L'utilisateur a les bonnes permissions.
175
        environ = {'REMOTE_USER': 'access'}
176

    
177
        # On s'attend à ce que le statut de la requête soit 200.
178
        response = self.app.get(
179
            '/event/%d' % idevent,
180
            status = 200,
181
            extra_environ = environ)
182

    
183
        # Il doit y avoir 2 lignes de résultats.
184
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
185
        # de rechercher directement une valeur dans une liste. Elle devrait
186
        # néanmoins suffire pour les besoins des tests.
187
        rows = response.lxml.xpath(
188
            '//table[contains(@class, "vigitable")]/tbody/tr')
189
        assert_equal(len(rows), 2)
190

    
191
    def test_cause_service_history(self):
192
        """Historique de la cause d'un événement corrélé sur un LLS."""
193

    
194
        # On peuple la BDD avec un hôte, un service de bas niveau,
195
        # et un groupe d'hôtes et de services associés à ces items.
196
        (managerhost, managerservice) = populate_DB()
197

    
198
        # On ajoute un évènement corrélé causé par le service
199
        idevent = add_correvent_caused_by(managerservice)
200

    
201
        transaction.commit()
202

    
203
        # L'utilisateur n'est pas authentifié.
204
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
205
        # demandant à l'utilisateur de s'authentifier.
206
        response = self.app.get(
207
            '/event/%d' % idevent,
208
            status = 401)
209

    
210
        # L'utilisateur N'A PAS les bonnes permissions.
211
        environ = {'REMOTE_USER': 'limited_access'}
212

    
213
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
214
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
215
        # n'a pas accès aux informations concernant cet évènement.
216
        response = self.app.get(
217
            '/event/%d' % idevent,
218
            status = 302,
219
            extra_environ = environ)
220

    
221
        # On suit la redirection.
222
        response = response.follow(status = 200, extra_environ = environ)
223
        assert_true(len(response.lxml.xpath(
224
            '//div[@id="flash"]/div[@class="error"]')))
225

    
226
        # L'utilisateur a les bonnes permissions.
227
        environ = {'REMOTE_USER': 'access'}
228

    
229
        # On s'attend à ce que le statut de la requête soit 200.
230
        response = self.app.get(
231
            '/event/%d' % idevent,
232
            status = 200,
233
            extra_environ = environ)
234

    
235
        # Il doit y avoir 2 lignes de résultats.
236
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
237
        # de rechercher directement une valeur dans une liste. Elle devrait
238
        # néanmoins suffire pour les besoins des tests.
239
        rows = response.lxml.xpath(
240
            '//table[contains(@class,"vigitable")]/tbody/tr')
241
        assert_equal(len(rows), 2)