Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_history_table.py @ 0bd9c069

History | View | Annotate | Download (8.13 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Vérifie que la page qui affiche l'historique des actions sur un événement
5
brut fonctionne correctement.
6
"""
7

    
8
from nose.tools import assert_true, assert_equal
9
from datetime import datetime
10
import transaction
11

    
12
from vigilo.models.session import DBSession
13
from vigilo.models.tables import Event, EventHistory, CorrEvent, \
14
                            Permission, StateName, Host, \
15
                            SupItemGroup, LowLevelService, \
16
                            Permission, DataPermission, User, \
17
                            UserGroup
18
from vigiboard.tests import TestController
19

    
20
def populate_DB():
21
    """ Peuple la base de données. """
22
    # On ajoute un groupe d'hôtes et un groupe de services.
23
    supitemmanagers = SupItemGroup(name=u'managersgroup')
24
    DBSession.add(supitemmanagers)
25
    DBSession.flush()
26

    
27
    usergroup = UserGroup.by_group_name(u'users_with_access')
28
    DBSession.add(DataPermission(
29
        group=supitemmanagers,
30
        usergroup=usergroup,
31
        access=u'r',
32
    ))
33
    DBSession.flush()
34

    
35
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
36
    managerhost = Host(
37
        name = u'managerhost',      
38
        checkhostcmd = u'halt',
39
        snmpcommunity = u'public',
40
        hosttpl = u'/dev/null',
41
        address = u'192.168.1.1',
42
        snmpport = 42,
43
        weight = 42,
44
    )
45
    DBSession.add(managerhost)
46
    supitemmanagers.supitems.append(managerhost)
47
    DBSession.flush()
48

    
49
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
50
    managerservice = LowLevelService(
51
        host = managerhost,
52
        servicename = u'managerservice',
53
        command = u'halt',
54
        weight = 42,
55
    )
56
    DBSession.add(managerservice)
57
    supitemmanagers.supitems.append(managerservice)
58
    DBSession.flush()
59
    
60
    return (managerhost, managerservice)
61

    
62
def add_correvent_caused_by(supitem):
63
    """
64
    Ajoute dans la base de données un évènement corrélé causé 
65
    par un incident survenu sur l'item passé en paramètre.
66
    Génère un historique pour les tests.
67
    """
68

    
69
    # Ajout d'un événement
70
    event = Event(
71
        supitem = supitem, 
72
        message = u'foo',
73
        current_state = StateName.statename_to_value(u"WARNING"),
74
        timestamp = datetime.now(),
75
    )
76
    DBSession.add(event)
77
    DBSession.flush()
78

    
79
    # Ajout des historiques
80
    DBSession.add(EventHistory(
81
        type_action=u'Nagios update state',
82
        idevent=event.idevent, 
83
        timestamp=datetime.now()))
84
    DBSession.add(EventHistory(
85
        type_action=u'Acknowlegement change state',
86
        idevent=event.idevent, 
87
        timestamp=datetime.now()))
88
    DBSession.flush()
89

    
90
    # Ajout d'un événement corrélé
91
    aggregate = CorrEvent(
92
        idcause = event.idevent, 
93
        timestamp_active = datetime.now(),
94
        priority = 1,
95
        status = u"None")
96
    aggregate.events.append(event)
97
    DBSession.add(aggregate)
98
    DBSession.flush()
99
    
100
    return event.idevent
101
    
102

    
103
class TestHistoryTable(TestController):
104
    """
105
    Teste la table qui affiche l'historique des actions
106
    sur un événement brut.
107
    """
108
    def setUp(self):
109
        super(TestHistoryTable, self).setUp()
110
        perm = Permission.by_permission_name(u'vigiboard-access')
111

    
112
        user = User(
113
            user_name=u'access',
114
            fullname=u'',
115
            email=u'user.has@access',
116
        )
117
        usergroup = UserGroup(
118
            group_name=u'users_with_access',
119
        )
120
        usergroup.permissions.append(perm)
121
        user.usergroups.append(usergroup)
122
        DBSession.add(user)
123
        DBSession.add(usergroup)
124
        DBSession.flush()
125

    
126
        user = User(
127
            user_name=u'limited_access',
128
            fullname=u'',
129
            email=u'user.has.no@access',
130
        )
131
        usergroup = UserGroup(
132
            group_name=u'users_with_limited_access',
133
        )
134
        usergroup.permissions.append(perm)
135
        user.usergroups.append(usergroup)
136
        DBSession.add(user)
137
        DBSession.add(usergroup)
138
        DBSession.flush()
139

    
140

    
141
    def test_cause_host_history(self):
142
        """Historique de la cause d'un événement corrélé sur un hôte."""
143

    
144
        # On peuple la BDD avec un hôte, un service de bas niveau,
145
        # et un groupe d'hôtes et de services associés à ces items.
146
        (managerhost, managerservice) = populate_DB()
147
        
148
        # On ajoute un évènement corrélé causé par l'hôte
149
        idevent = add_correvent_caused_by(managerhost)
150
        transaction.commit()
151

    
152
        # L'utilisateur n'est pas authentifié.
153
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
154
        # demandant à l'utilisateur de s'authentifier.
155
        response = self.app.get(
156
            '/event/%d' % idevent,
157
            status = 401)
158

    
159
        # L'utilisateur N'A PAS les bonnes permissions.
160
        environ = {'REMOTE_USER': 'limited_access'}
161
        
162
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
163
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
164
        # n'a pas accès aux informations concernant cet évènement.
165
        response = self.app.get(
166
            '/event/%d' % idevent,
167
            status = 302, 
168
            extra_environ = environ)
169

    
170
        # On suit la redirection.
171
        response = response.follow(status = 200, extra_environ = environ)
172
        assert_true(len(response.lxml.xpath(
173
            '//div[@id="flash"]/div[@class="error"]')))
174

    
175
        # L'utilisateur a les bonnes permissions.
176
        environ = {'REMOTE_USER': 'access'}
177
        
178
        # On s'attend à ce que le statut de la requête soit 200.
179
        response = self.app.get(
180
            '/event/%d' % idevent,
181
            status = 200, 
182
            extra_environ = environ)
183

    
184
        # Il doit y avoir 2 lignes de résultats.
185
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
186
        # de rechercher directement une valeur dans une liste. Elle devrait
187
        # néanmoins suffire pour les besoins des tests.
188
        rows = response.lxml.xpath(
189
            '//table[contains(@class, "vigitable")]/tbody/tr')
190
        assert_equal(len(rows), 2)
191

    
192
    def test_cause_service_history(self):
193
        """Historique de la cause d'un événement corrélé sur un LLS."""
194

    
195
        # On peuple la BDD avec un hôte, un service de bas niveau,
196
        # et un groupe d'hôtes et de services associés à ces items.
197
        (managerhost, managerservice) = populate_DB()
198
        
199
        # On ajoute un évènement corrélé causé par le service
200
        idevent = add_correvent_caused_by(managerservice)
201
        
202
        transaction.commit()
203

    
204
        # L'utilisateur n'est pas authentifié.
205
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
206
        # demandant à l'utilisateur de s'authentifier.
207
        response = self.app.get(
208
            '/event/%d' % idevent,
209
            status = 401)
210

    
211
        # L'utilisateur N'A PAS les bonnes permissions.
212
        environ = {'REMOTE_USER': 'limited_access'}
213
        
214
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
215
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
216
        # n'a pas accès aux informations concernant cet évènement.
217
        response = self.app.get(
218
            '/event/%d' % idevent,
219
            status = 302, 
220
            extra_environ = environ)
221

    
222
        # On suit la redirection.
223
        response = response.follow(status = 200, extra_environ = environ)
224
        assert_true(len(response.lxml.xpath(
225
            '//div[@id="flash"]/div[@class="error"]')))
226

    
227
        # L'utilisateur a les bonnes permissions.
228
        environ = {'REMOTE_USER': 'access'}
229
        
230
        # On s'attend à ce que le statut de la requête soit 200.
231
        response = self.app.get(
232
            '/event/%d' % idevent,
233
            status = 200, 
234
            extra_environ = environ)
235

    
236
        # Il doit y avoir 2 lignes de résultats.
237
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
238
        # de rechercher directement une valeur dans une liste. Elle devrait
239
        # néanmoins suffire pour les besoins des tests.
240
        rows = response.lxml.xpath(
241
            '//table[contains(@class,"vigitable")]/tbody/tr')
242
        assert_equal(len(rows), 2)
243