Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ 0bd9c069

History | View | Annotate | Download (9.19 KB)

1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

    
4
from datetime import datetime
5
import transaction
6
from nose.tools import assert_equal
7

    
8
from vigilo.models.session import DBSession
9
from vigilo.models.tables import Permission, DataPermission, StateName, \
10
                            SupItemGroup, Host, HighLevelService, \
11
                            Event, CorrEvent, ImpactedPath, ImpactedHLS, \
12
                            User, UserGroup
13
from vigiboard.tests import TestController
14

    
15
def populate_DB():
16
    """ Peuple la base de données. """
17

    
18
    # On ajoute un groupe d'hôtes
19
    hostmanagers = SupItemGroup(name=u'managersgroup')
20
    DBSession.add(hostmanagers)
21
    DBSession.flush()
22

    
23
    # On lui octroie les permissions
24
    usergroup = UserGroup.by_group_name(u'users_with_access')
25
    DBSession.add(DataPermission(
26
        group=hostmanagers,
27
        usergroup=usergroup,
28
        access=u'r',
29
    ))
30
    DBSession.flush()
31

    
32
    # On crée un hôte de test.
33
    host = Host(
34
        name = u'host', 
35
        checkhostcmd = u'halt',
36
        snmpcommunity = u'public',
37
        hosttpl = u'/dev/null',
38
        address = u'192.168.1.1',
39
        snmpport = 42,
40
        weight = 42,
41
        )
42
    DBSession.add(host)
43

    
44
    # On affecte cet hôte au groupe précédemment créé.
45
    hostmanagers.supitems.append(host)
46
    DBSession.flush()
47

    
48
    # On ajoute un évènement causé par cet hôte.
49
    event1 = Event(
50
        supitem = host,
51
        message = u'foo',
52
        current_state = StateName.statename_to_value(u'WARNING'),
53
        timestamp = datetime.now(),
54
    )
55
    DBSession.add(event1)
56
    DBSession.flush()
57

    
58
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
59
    aggregate = CorrEvent(
60
        idcause = event1.idevent, 
61
        timestamp_active = datetime.now(),
62
        priority = 1,
63
        status = u'None')
64
    aggregate.events.append(event1)
65
    DBSession.add(aggregate)
66
    DBSession.flush()
67
    
68
    transaction.commit()
69
    return aggregate
70

    
71
def add_paths(path_number, path_length, idsupitem):
72
    """ 
73
    Ajoute path_number chemins de services de haut niveau impactés
74
    dans la base de donnée. Leur longeur sera égale à path_length.
75
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
76
     
77
    path_number * path_length services de 
78
    haut niveau sont créés dans l'opération.
79
    """
80

    
81
    # Création de services de haut niveau dans la BDD.
82
    hls_template = {
83
        'message': u'Bar',
84
        'warning_threshold': 60,
85
        'critical_threshold': 80,
86
        'weight': None,
87
        'priority': 2,
88
    }
89

    
90
    # Création des chemins de services de haut niveau impactés.
91
    for j in range(path_number):
92
        
93
        # On crée le chemin en lui-même
94
        path = ImpactedPath(idsupitem = idsupitem)
95
        DBSession.add(path)
96
        DBSession.flush()
97
        
98
        # Pour chaque étage du chemin,
99
        for i in range(path_length):
100
            # on ajoute un service de haut niveau dans la BDD,
101
            hls = HighLevelService(
102
                servicename = u'HLS' + str(j + 1) + str(i + 1), 
103
                **hls_template)
104
            DBSession.add(hls)
105
            # et on ajoute un étage au chemin contenant ce service. 
106
            DBSession.add(
107
                ImpactedHLS(
108
                    path = path,
109
                    hls = hls,
110
                    distance = i + 1,
111
                    ))
112
    
113
    DBSession.flush()
114
    transaction.commit()
115

    
116

    
117
class TestHLSPlugin(TestController):
118
    """
119
    Classe de test du contrôleur listant les services 
120
    de haut niveau impactés par un évènement corrélé.
121
    """
122
    def setUp(self):
123
        super(TestHLSPlugin, self).setUp()
124
        perm = Permission.by_permission_name(u'vigiboard-access')
125

    
126
        user = User(
127
            user_name=u'access',
128
            fullname=u'',
129
            email=u'user.has@access',
130
        )
131
        usergroup = UserGroup(
132
            group_name=u'users_with_access',
133
        )
134
        usergroup.permissions.append(perm)
135
        user.usergroups.append(usergroup)
136
        DBSession.add(user)
137
        DBSession.add(usergroup)
138
        DBSession.flush()
139

    
140
        user = User(
141
            user_name=u'no_access',
142
            fullname=u'',
143
            email=u'user.has.no@access',
144
        )
145
        usergroup = UserGroup(
146
            group_name=u'users_without_access',
147
        )
148
        usergroup.permissions.append(perm)
149
        user.usergroups.append(usergroup)
150
        DBSession.add(user)
151
        DBSession.add(usergroup)
152
        DBSession.flush()
153

    
154
        self.aggregate = populate_DB()
155

    
156
    def test_no_impacted_hls(self):
157
        """
158
        Retour du plugin HLS pour 0 HLS impacté
159
        Teste la valeur de retour du plugin lorsque
160
        aucun service de haut niveau n'est impacté.
161
        """
162
        
163
        # On peuple la base de données avant le test.
164
        DBSession.add(self.aggregate)
165
        add_paths(0, 0, self.aggregate.events[0].idsupitem)
166
        DBSession.add(self.aggregate)
167
        
168
        ### 1er cas : l'utilisateur n'est pas connecté.
169
        # On vérifie que le plugin retourne bien une erreur 401.
170
        resp = self.app.post(
171
            '/get_plugin_value', 
172
            {"idcorrevent" : str(self.aggregate.idcorrevent),
173
             "plugin_name" : "hls"},
174
            status = 401,)
175
        
176
        ### 2ème cas : l'utilisateur n'a pas les
177
        ### droits sur l'hôte ayant causé le correvent.
178
        # On vérifie que le plugin retourne bien une erreur 404.
179
        resp = self.app.post(
180
            '/get_plugin_value', 
181
            {"idcorrevent" : str(self.aggregate.idcorrevent),
182
             "plugin_name" : "hls"},
183
            status = 404,
184
            extra_environ={'REMOTE_USER': 'no_access'})
185
        
186
        ### 3ème cas : l'utilisateur a cette fois les droits.
187
        resp = self.app.post(
188
            '/get_plugin_value', 
189
            {"idcorrevent" : str(self.aggregate.idcorrevent),
190
             "plugin_name" : "hls"},
191
            extra_environ={'REMOTE_USER': 'access'})
192
        # On vérifie que le plugin ne retourne toujours rien.
193
        assert_equal(resp.json, {"services": []})
194
    
195
    def test_1_impacted_hls_path(self):
196
        """
197
        Retour du plugin HLS pour 1 chemin impacté
198
        Teste la valeur de retour du plugin lorsqu'un
199
        chemin de services de haut niveau est impacté.
200
        """
201
        
202
        # On peuple la base de données avant le test.
203
        DBSession.add(self.aggregate)
204
        add_paths(1, 2, self.aggregate.events[0].idsupitem)
205
        DBSession.add(self.aggregate)
206
        
207
        ### 1er cas : l'utilisateur n'est pas connecté.
208
        # On vérifie que le plugin retourne bien une erreur 401.
209
        resp = self.app.post(
210
            '/get_plugin_value', 
211
            {"idcorrevent" : str(self.aggregate.idcorrevent),
212
             "plugin_name" : "hls"},
213
            status = 401,)
214
        
215
        ### 2ème cas : l'utilisateur n'a pas les droits
216
        ### sur l'hôte ayant causé le correvent, on doit
217
        ### obtenir une erreur 404 (pas d'événement trouvé
218
        ### avec les informations liées à cet utilisateur).
219
        resp = self.app.post(
220
            '/get_plugin_value', 
221
            {"idcorrevent" : str(self.aggregate.idcorrevent),
222
             "plugin_name" : "hls"},
223
            status = 404,
224
            extra_environ={'REMOTE_USER': 'no_access'})
225
        
226
        ### 3ème cas : l'utilisateur a cette fois les droits.
227
        resp = self.app.post(
228
            '/get_plugin_value', 
229
            {"idcorrevent" : str(self.aggregate.idcorrevent),
230
             "plugin_name" : "hls"},
231
            extra_environ={'REMOTE_USER': 'access'})
232
        # On vérifie que le plugin retourne bien les 2 HLS impactés.
233
        assert_equal(resp.json, {"services": ['HLS12']})
234
    
235
    def test_2_impacted_hls_path(self):
236
        """
237
        Retour du plugin HLS pour 2 chemins impactés
238
        Teste la valeur de retour du plugin lorsque deux
239
        chemins de services de haut niveau sont impactés.
240
        """
241
        
242
        # On peuple la base de données avant le test.
243
        DBSession.add(self.aggregate)
244
        add_paths(2, 2, self.aggregate.events[0].idsupitem)
245
        DBSession.add(self.aggregate)
246
        
247
        ### 1er cas : l'utilisateur n'est pas connecté.
248
        # On vérifie que le plugin retourne bien une erreur 401.
249
        resp = self.app.post(
250
            '/get_plugin_value', 
251
            {"idcorrevent" : str(self.aggregate.idcorrevent),
252
             "plugin_name" : "hls"},
253
            status = 401,)
254
        
255
        ### 2ème cas : l'utilisateur n'a pas les
256
        ### droits sur l'hôte ayant causé le correvent.
257
        resp = self.app.post(
258
            '/get_plugin_value', 
259
            {"idcorrevent" : str(self.aggregate.idcorrevent),
260
             "plugin_name" : "hls"},
261
            status = 404,
262
            extra_environ={'REMOTE_USER': 'no_access'})
263
        
264
        ### 3ème cas : l'utilisateur a cette fois les droits.
265
        resp = self.app.post(
266
            '/get_plugin_value', 
267
            {"idcorrevent" : str(self.aggregate.idcorrevent),
268
             "plugin_name" : "hls"},
269
            extra_environ={'REMOTE_USER': 'access'})
270
        # On vérifie que le plugin retourne bien les 4 HLS impactés.
271
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})
272