Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_history_table.py @ 6f6efdcd

History | View | Annotate | Download (7.99 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Vérifie que la page qui affiche l'historique des actions sur un événement
5
brut fonctionne correctement.
6
"""
7

    
8
from nose.tools import assert_true, assert_equal
9
from datetime import datetime
10
import transaction
11

    
12
from vigilo.models.session import DBSession
13
from vigilo.models.tables import Event, EventHistory, CorrEvent, \
14
                            Permission, StateName, Host, \
15
                            Permission, DataPermission, User, \
16
                            LowLevelService, UserGroup
17
from vigilo.models.demo.functions import *
18
from vigiboard.tests import TestController
19

    
20
def populate_DB():
21
    """ Peuple la base de données. """
22
    # On ajoute un groupe d'hôtes et un groupe de services.
23
    supitemmanagers = add_supitemgroup('managersgroup')
24

    
25
    usergroup = UserGroup.by_group_name(u'users_with_access')
26
    DBSession.add(DataPermission(
27
        group=supitemmanagers,
28
        usergroup=usergroup,
29
        access=u'r',
30
    ))
31
    DBSession.flush()
32

    
33
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
34
    managerhost = Host(
35
        name = u'managerhost',
36
        checkhostcmd = u'halt',
37
        snmpcommunity = u'public',
38
        hosttpl = u'/dev/null',
39
        address = u'192.168.1.1',
40
        snmpport = 42,
41
        weight = 42,
42
    )
43
    DBSession.add(managerhost)
44
    supitemmanagers.supitems.append(managerhost)
45
    DBSession.flush()
46

    
47
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
48
    managerservice = LowLevelService(
49
        host = managerhost,
50
        servicename = u'managerservice',
51
        command = u'halt',
52
        weight = 42,
53
    )
54
    DBSession.add(managerservice)
55
    supitemmanagers.supitems.append(managerservice)
56
    DBSession.flush()
57

    
58
    return (managerhost, managerservice)
59

    
60
def add_correvent_caused_by(supitem):
61
    """
62
    Ajoute dans la base de données un évènement corrélé causé
63
    par un incident survenu sur l'item passé en paramètre.
64
    Génère un historique pour les tests.
65
    """
66

    
67
    # Ajout d'un événement
68
    event = Event(
69
        supitem = supitem,
70
        message = u'foo',
71
        current_state = StateName.statename_to_value(u"WARNING"),
72
        timestamp = datetime.now(),
73
    )
74
    DBSession.add(event)
75
    DBSession.flush()
76

    
77
    # Ajout des historiques
78
    DBSession.add(EventHistory(
79
        type_action=u'Nagios update state',
80
        idevent=event.idevent,
81
        timestamp=datetime.now()))
82
    DBSession.add(EventHistory(
83
        type_action=u'Acknowlegement change state',
84
        idevent=event.idevent,
85
        timestamp=datetime.now()))
86
    DBSession.flush()
87

    
88
    # Ajout d'un événement corrélé
89
    aggregate = CorrEvent(
90
        idcause = event.idevent,
91
        timestamp_active = datetime.now(),
92
        priority = 1,
93
        status = u"None")
94
    aggregate.events.append(event)
95
    DBSession.add(aggregate)
96
    DBSession.flush()
97

    
98
    return event.idevent
99

    
100

    
101
class TestHistoryTable(TestController):
102
    """
103
    Teste la table qui affiche l'historique des actions
104
    sur un événement brut.
105
    """
106
    def setUp(self):
107
        super(TestHistoryTable, self).setUp()
108
        perm = Permission.by_permission_name(u'vigiboard-access')
109

    
110
        user = User(
111
            user_name=u'access',
112
            fullname=u'',
113
            email=u'user.has@access',
114
        )
115
        usergroup = UserGroup(
116
            group_name=u'users_with_access',
117
        )
118
        usergroup.permissions.append(perm)
119
        user.usergroups.append(usergroup)
120
        DBSession.add(user)
121
        DBSession.add(usergroup)
122
        DBSession.flush()
123

    
124
        user = User(
125
            user_name=u'limited_access',
126
            fullname=u'',
127
            email=u'user.has.no@access',
128
        )
129
        usergroup = UserGroup(
130
            group_name=u'users_with_limited_access',
131
        )
132
        usergroup.permissions.append(perm)
133
        user.usergroups.append(usergroup)
134
        DBSession.add(user)
135
        DBSession.add(usergroup)
136
        DBSession.flush()
137

    
138

    
139
    def test_cause_host_history(self):
140
        """Historique de la cause d'un événement corrélé sur un hôte."""
141

    
142
        # On peuple la BDD avec un hôte, un service de bas niveau,
143
        # et un groupe d'hôtes et de services associés à ces items.
144
        (managerhost, managerservice) = populate_DB()
145

    
146
        # On ajoute un évènement corrélé causé par l'hôte
147
        idevent = add_correvent_caused_by(managerhost)
148
        transaction.commit()
149

    
150
        # L'utilisateur n'est pas authentifié.
151
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
152
        # demandant à l'utilisateur de s'authentifier.
153
        response = self.app.get(
154
            '/event/%d' % idevent,
155
            status = 401)
156

    
157
        # L'utilisateur N'A PAS les bonnes permissions.
158
        environ = {'REMOTE_USER': 'limited_access'}
159

    
160
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
161
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
162
        # n'a pas accès aux informations concernant cet évènement.
163
        response = self.app.get(
164
            '/event/%d' % idevent,
165
            status = 302,
166
            extra_environ = environ)
167

    
168
        # On suit la redirection.
169
        response = response.follow(status = 200, extra_environ = environ)
170
        assert_true(len(response.lxml.xpath(
171
            '//div[@id="flash"]/div[@class="error"]')))
172

    
173
        # L'utilisateur a les bonnes permissions.
174
        environ = {'REMOTE_USER': 'access'}
175

    
176
        # On s'attend à ce que le statut de la requête soit 200.
177
        response = self.app.get(
178
            '/event/%d' % idevent,
179
            status = 200,
180
            extra_environ = environ)
181

    
182
        # Il doit y avoir 2 lignes de résultats.
183
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
184
        # de rechercher directement une valeur dans une liste. Elle devrait
185
        # néanmoins suffire pour les besoins des tests.
186
        rows = response.lxml.xpath(
187
            '//table[contains(@class, "vigitable")]/tbody/tr')
188
        assert_equal(len(rows), 2)
189

    
190
    def test_cause_service_history(self):
191
        """Historique de la cause d'un événement corrélé sur un LLS."""
192

    
193
        # On peuple la BDD avec un hôte, un service de bas niveau,
194
        # et un groupe d'hôtes et de services associés à ces items.
195
        (managerhost, managerservice) = populate_DB()
196

    
197
        # On ajoute un évènement corrélé causé par le service
198
        idevent = add_correvent_caused_by(managerservice)
199

    
200
        transaction.commit()
201

    
202
        # L'utilisateur n'est pas authentifié.
203
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
204
        # demandant à l'utilisateur de s'authentifier.
205
        response = self.app.get(
206
            '/event/%d' % idevent,
207
            status = 401)
208

    
209
        # L'utilisateur N'A PAS les bonnes permissions.
210
        environ = {'REMOTE_USER': 'limited_access'}
211

    
212
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
213
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
214
        # n'a pas accès aux informations concernant cet évènement.
215
        response = self.app.get(
216
            '/event/%d' % idevent,
217
            status = 302,
218
            extra_environ = environ)
219

    
220
        # On suit la redirection.
221
        response = response.follow(status = 200, extra_environ = environ)
222
        assert_true(len(response.lxml.xpath(
223
            '//div[@id="flash"]/div[@class="error"]')))
224

    
225
        # L'utilisateur a les bonnes permissions.
226
        environ = {'REMOTE_USER': 'access'}
227

    
228
        # On s'attend à ce que le statut de la requête soit 200.
229
        response = self.app.get(
230
            '/event/%d' % idevent,
231
            status = 200,
232
            extra_environ = environ)
233

    
234
        # Il doit y avoir 2 lignes de résultats.
235
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
236
        # de rechercher directement une valeur dans une liste. Elle devrait
237
        # néanmoins suffire pour les besoins des tests.
238
        rows = response.lxml.xpath(
239
            '//table[contains(@class,"vigitable")]/tbody/tr')
240
        assert_equal(len(rows), 2)