Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_history_table.py @ 0dcb87f7

History | View | Annotate | Download (8.34 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Vérifie que la page qui affiche l'historique des actions sur un événement
5
brut fonctionne correctement.
6
"""
7

    
8
from nose.tools import assert_true, assert_equal
9
from datetime import datetime
10
import transaction
11

    
12
from vigilo.models.session import DBSession
13
from vigilo.models.tables import Event, EventHistory, CorrEvent, \
14
                            Permission, StateName, Host, \
15
                            SupItemGroup, LowLevelService, \
16
                            Permission, DataPermission, User, \
17
                            UserGroup
18
from vigilo.models.tables.grouphierarchy import GroupHierarchy
19
from vigiboard.tests import TestController
20

    
21
def populate_DB():
22
    """ Peuple la base de données. """
23
    # On ajoute un groupe d'hôtes et un groupe de services.
24
    supitemmanagers = SupItemGroup(name=u'managersgroup')
25
    DBSession.add(supitemmanagers)
26
    DBSession.flush()
27

    
28
    DBSession.add(GroupHierarchy(
29
        parent=supitemmanagers,
30
        child=supitemmanagers,
31
        hops=0,
32
    ))
33

    
34
    usergroup = UserGroup.by_group_name(u'users_with_access')
35
    DBSession.add(DataPermission(
36
        group=supitemmanagers,
37
        usergroup=usergroup,
38
        access=u'r',
39
    ))
40
    DBSession.flush()
41

    
42
    # On crée un hôte de test, et on l'ajoute au groupe d'hôtes.
43
    managerhost = Host(
44
        name = u'managerhost',      
45
        checkhostcmd = u'halt',
46
        snmpcommunity = u'public',
47
        hosttpl = u'/dev/null',
48
        address = u'192.168.1.1',
49
        snmpport = 42,
50
        weight = 42,
51
    )
52
    DBSession.add(managerhost)
53
    supitemmanagers.supitems.append(managerhost)
54
    DBSession.flush()
55

    
56
    # On crée un services de bas niveau, et on l'ajoute au groupe de services.
57
    managerservice = LowLevelService(
58
        host = managerhost,
59
        servicename = u'managerservice',
60
        command = u'halt',
61
        op_dep = u'+',
62
        weight = 42,
63
    )
64
    DBSession.add(managerservice)
65
    supitemmanagers.supitems.append(managerservice)
66
    DBSession.flush()
67
    
68
    return (managerhost, managerservice)
69

    
70
def add_correvent_caused_by(supitem):
71
    """
72
    Ajoute dans la base de données un évènement corrélé causé 
73
    par un incident survenu sur l'item passé en paramètre.
74
    Génère un historique pour les tests.
75
    """
76

    
77
    # Ajout d'un événement
78
    event = Event(
79
        supitem = supitem, 
80
        message = u'foo',
81
        current_state = StateName.statename_to_value(u"WARNING"),
82
        timestamp = datetime.now(),
83
    )
84
    DBSession.add(event)
85
    DBSession.flush()
86

    
87
    # Ajout des historiques
88
    DBSession.add(EventHistory(
89
        type_action=u'Nagios update state',
90
        idevent=event.idevent, 
91
        timestamp=datetime.now()))
92
    DBSession.add(EventHistory(
93
        type_action=u'Acknowlegement change state',
94
        idevent=event.idevent, 
95
        timestamp=datetime.now()))
96
    DBSession.flush()
97

    
98
    # Ajout d'un événement corrélé
99
    aggregate = CorrEvent(
100
        idcause = event.idevent, 
101
        timestamp_active = datetime.now(),
102
        priority = 1,
103
        status = u"None")
104
    aggregate.events.append(event)
105
    DBSession.add(aggregate)
106
    DBSession.flush()
107
    
108
    return event.idevent
109
    
110

    
111
class TestHistoryTable(TestController):
112
    """
113
    Teste la table qui affiche l'historique des actions
114
    sur un événement brut.
115
    """
116
    def setUp(self):
117
        super(TestHistoryTable, self).setUp()
118
        perm = Permission.by_permission_name(u'vigiboard-access')
119

    
120
        user = User(
121
            user_name=u'access',
122
            fullname=u'',
123
            email=u'user.has@access',
124
        )
125
        usergroup = UserGroup(
126
            group_name=u'users_with_access',
127
        )
128
        usergroup.permissions.append(perm)
129
        user.usergroups.append(usergroup)
130
        DBSession.add(user)
131
        DBSession.add(usergroup)
132
        DBSession.flush()
133

    
134
        user = User(
135
            user_name=u'limited_access',
136
            fullname=u'',
137
            email=u'user.has.no@access',
138
        )
139
        usergroup = UserGroup(
140
            group_name=u'users_with_limited_access',
141
        )
142
        usergroup.permissions.append(perm)
143
        user.usergroups.append(usergroup)
144
        DBSession.add(user)
145
        DBSession.add(usergroup)
146
        DBSession.flush()
147

    
148

    
149
    def test_cause_host_history(self):
150
        """Historique de la cause d'un événement corrélé sur un hôte."""
151

    
152
        # On peuple la BDD avec un hôte, un service de bas niveau,
153
        # et un groupe d'hôtes et de services associés à ces items.
154
        (managerhost, managerservice) = populate_DB()
155
        
156
        # On ajoute un évènement corrélé causé par l'hôte
157
        idevent = add_correvent_caused_by(managerhost)
158
        transaction.commit()
159

    
160
        # L'utilisateur n'est pas authentifié.
161
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
162
        # demandant à l'utilisateur de s'authentifier.
163
        response = self.app.get(
164
            '/event/%d' % idevent,
165
            status = 401)
166

    
167
        # L'utilisateur N'A PAS les bonnes permissions.
168
        environ = {'REMOTE_USER': 'limited_access'}
169
        
170
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
171
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
172
        # n'a pas accès aux informations concernant cet évènement.
173
        response = self.app.get(
174
            '/event/%d' % idevent,
175
            status = 302, 
176
            extra_environ = environ)
177

    
178
        # On suit la redirection.
179
        response = response.follow(status = 200, extra_environ = environ)
180
        assert_true(len(response.lxml.xpath(
181
            '//div[@id="flash"]/div[@class="error"]')))
182

    
183
        # L'utilisateur a les bonnes permissions.
184
        environ = {'REMOTE_USER': 'access'}
185
        
186
        # On s'attend à ce que le statut de la requête soit 200.
187
        response = self.app.get(
188
            '/event/%d' % idevent,
189
            status = 200, 
190
            extra_environ = environ)
191

    
192
        # Il doit y avoir 2 lignes de résultats.
193
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
194
        # de rechercher directement une valeur dans une liste. Elle devrait
195
        # néanmoins suffire pour les besoins des tests.
196
        rows = response.lxml.xpath(
197
            '//table[contains(@class, "vigitable")]/tbody/tr')
198
        assert_equal(len(rows), 2)
199

    
200
    def test_cause_service_history(self):
201
        """Historique de la cause d'un événement corrélé sur un LLS."""
202

    
203
        # On peuple la BDD avec un hôte, un service de bas niveau,
204
        # et un groupe d'hôtes et de services associés à ces items.
205
        (managerhost, managerservice) = populate_DB()
206
        
207
        # On ajoute un évènement corrélé causé par le service
208
        idevent = add_correvent_caused_by(managerservice)
209
        
210
        transaction.commit()
211

    
212
        # L'utilisateur n'est pas authentifié.
213
        # On s'attend à ce qu'une erreur 401 soit renvoyée,
214
        # demandant à l'utilisateur de s'authentifier.
215
        response = self.app.get(
216
            '/event/%d' % idevent,
217
            status = 401)
218

    
219
        # L'utilisateur N'A PAS les bonnes permissions.
220
        environ = {'REMOTE_USER': 'limited_access'}
221
        
222
        # On s'attend à ce qu'une erreur 302 soit renvoyée, et à
223
        # ce qu'un message d'erreur précise à l'utilisateur qu'il
224
        # n'a pas accès aux informations concernant cet évènement.
225
        response = self.app.get(
226
            '/event/%d' % idevent,
227
            status = 302, 
228
            extra_environ = environ)
229

    
230
        # On suit la redirection.
231
        response = response.follow(status = 200, extra_environ = environ)
232
        assert_true(len(response.lxml.xpath(
233
            '//div[@id="flash"]/div[@class="error"]')))
234

    
235
        # L'utilisateur a les bonnes permissions.
236
        environ = {'REMOTE_USER': 'access'}
237
        
238
        # On s'attend à ce que le statut de la requête soit 200.
239
        response = self.app.get(
240
            '/event/%d' % idevent,
241
            status = 200, 
242
            extra_environ = environ)
243

    
244
        # Il doit y avoir 2 lignes de résultats.
245
        # NB: la requête XPath est approchante, car XPath 1.0 ne permet pas
246
        # de rechercher directement une valeur dans une liste. Elle devrait
247
        # néanmoins suffire pour les besoins des tests.
248
        rows = response.lxml.xpath(
249
            '//table[contains(@class,"vigitable")]/tbody/tr')
250
        assert_equal(len(rows), 2)
251