Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ 0dcb87f7

History | View | Annotate | Download (9.38 KB)

1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

    
4
from datetime import datetime
5
import transaction
6
from nose.tools import assert_equal
7

    
8
from vigilo.models.session import DBSession
9
from vigilo.models.tables import Permission, DataPermission, StateName, \
10
                            SupItemGroup, Host, HighLevelService, \
11
                            Event, CorrEvent, ImpactedPath, ImpactedHLS, \
12
                            User, UserGroup
13
from vigilo.models.tables.grouphierarchy import GroupHierarchy
14
from vigiboard.tests import TestController
15

    
16
def populate_DB():
17
    """ Peuple la base de données. """
18

    
19
    # On ajoute un groupe d'hôtes
20
    hostmanagers = SupItemGroup(name=u'managersgroup')
21
    DBSession.add(hostmanagers)
22
    DBSession.flush()
23

    
24
    DBSession.add(GroupHierarchy(
25
        parent=hostmanagers,
26
        child=hostmanagers,
27
        hops=0,
28
    ))
29

    
30
    # On lui octroie les permissions
31
    usergroup = UserGroup.by_group_name(u'users_with_access')
32
    DBSession.add(DataPermission(
33
        group=hostmanagers,
34
        usergroup=usergroup,
35
        access=u'r',
36
    ))
37
    DBSession.flush()
38

    
39
    # On crée un hôte de test.
40
    host = Host(
41
        name = u'host', 
42
        checkhostcmd = u'halt',
43
        snmpcommunity = u'public',
44
        hosttpl = u'/dev/null',
45
        address = u'192.168.1.1',
46
        snmpport = 42,
47
        weight = 42,
48
        )
49
    DBSession.add(host)
50

    
51
    # On affecte cet hôte au groupe précédemment créé.
52
    hostmanagers.supitems.append(host)
53
    DBSession.flush()
54

    
55
    # On ajoute un évènement causé par cet hôte.
56
    event1 = Event(
57
        supitem = host,
58
        message = u'foo',
59
        current_state = StateName.statename_to_value(u'WARNING'),
60
        timestamp = datetime.now(),
61
    )
62
    DBSession.add(event1)
63
    DBSession.flush()
64

    
65
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
66
    aggregate = CorrEvent(
67
        idcause = event1.idevent, 
68
        timestamp_active = datetime.now(),
69
        priority = 1,
70
        status = u'None')
71
    aggregate.events.append(event1)
72
    DBSession.add(aggregate)
73
    DBSession.flush()
74
    
75
    transaction.commit()
76
    return aggregate
77

    
78
def add_paths(path_number, path_length, idsupitem):
79
    """ 
80
    Ajoute path_number chemins de services de haut niveau impactés
81
    dans la base de donnée. Leur longeur sera égale à path_length.
82
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
83
     
84
    path_number * path_length services de 
85
    haut niveau sont créés dans l'opération.
86
    """
87

    
88
    # Création de services de haut niveau dans la BDD.
89
    hls_template = {
90
        'op_dep': u'&',
91
        'message': u'Bar',
92
        'warning_threshold': 60,
93
        'critical_threshold': 80,
94
        'weight': None,
95
        'priority': 2,
96
    }
97

    
98
    # Création des chemins de services de haut niveau impactés.
99
    for j in range(path_number):
100
        
101
        # On crée le chemin en lui-même
102
        path = ImpactedPath(idsupitem = idsupitem)
103
        DBSession.add(path)
104
        DBSession.flush()
105
        
106
        # Pour chaque étage du chemin,
107
        for i in range(path_length):
108
            # on ajoute un service de haut niveau dans la BDD,
109
            hls = HighLevelService(
110
                servicename = u'HLS' + str(j + 1) + str(i + 1), 
111
                **hls_template)
112
            DBSession.add(hls)
113
            # et on ajoute un étage au chemin contenant ce service. 
114
            DBSession.add(
115
                ImpactedHLS(
116
                    path = path,
117
                    hls = hls,
118
                    distance = i + 1,
119
                    ))
120
    
121
    DBSession.flush()
122
    transaction.commit()
123

    
124

    
125
class TestHLSPlugin(TestController):
126
    """
127
    Classe de test du contrôleur listant les services 
128
    de haut niveau impactés par un évènement corrélé.
129
    """
130
    def setUp(self):
131
        super(TestHLSPlugin, self).setUp()
132
        perm = Permission.by_permission_name(u'vigiboard-access')
133

    
134
        user = User(
135
            user_name=u'access',
136
            fullname=u'',
137
            email=u'user.has@access',
138
        )
139
        usergroup = UserGroup(
140
            group_name=u'users_with_access',
141
        )
142
        usergroup.permissions.append(perm)
143
        user.usergroups.append(usergroup)
144
        DBSession.add(user)
145
        DBSession.add(usergroup)
146
        DBSession.flush()
147

    
148
        user = User(
149
            user_name=u'no_access',
150
            fullname=u'',
151
            email=u'user.has.no@access',
152
        )
153
        usergroup = UserGroup(
154
            group_name=u'users_without_access',
155
        )
156
        usergroup.permissions.append(perm)
157
        user.usergroups.append(usergroup)
158
        DBSession.add(user)
159
        DBSession.add(usergroup)
160
        DBSession.flush()
161

    
162
        self.aggregate = populate_DB()
163

    
164
    def test_no_impacted_hls(self):
165
        """
166
        Retour du plugin HLS pour 0 HLS impacté
167
        Teste la valeur de retour du plugin lorsque
168
        aucun service de haut niveau n'est impacté.
169
        """
170
        
171
        # On peuple la base de données avant le test.
172
        DBSession.add(self.aggregate)
173
        add_paths(0, 0, self.aggregate.events[0].idsupitem)
174
        DBSession.add(self.aggregate)
175
        
176
        ### 1er cas : l'utilisateur n'est pas connecté.
177
        # On vérifie que le plugin retourne bien une erreur 401.
178
        resp = self.app.post(
179
            '/get_plugin_value', 
180
            {"idcorrevent" : str(self.aggregate.idcorrevent),
181
             "plugin_name" : "hls"},
182
            status = 401,)
183
        
184
        ### 2ème cas : l'utilisateur n'a pas les
185
        ### droits sur l'hôte ayant causé le correvent.
186
        # On vérifie que le plugin retourne bien une erreur 404.
187
        resp = self.app.post(
188
            '/get_plugin_value', 
189
            {"idcorrevent" : str(self.aggregate.idcorrevent),
190
             "plugin_name" : "hls"},
191
            status = 404,
192
            extra_environ={'REMOTE_USER': 'no_access'})
193
        
194
        ### 3ème cas : l'utilisateur a cette fois les droits.
195
        resp = self.app.post(
196
            '/get_plugin_value', 
197
            {"idcorrevent" : str(self.aggregate.idcorrevent),
198
             "plugin_name" : "hls"},
199
            extra_environ={'REMOTE_USER': 'access'})
200
        # On vérifie que le plugin ne retourne toujours rien.
201
        assert_equal(resp.json, {"services": []})
202
    
203
    def test_1_impacted_hls_path(self):
204
        """
205
        Retour du plugin HLS pour 1 chemin impacté
206
        Teste la valeur de retour du plugin lorsqu'un
207
        chemin de services de haut niveau est impacté.
208
        """
209
        
210
        # On peuple la base de données avant le test.
211
        DBSession.add(self.aggregate)
212
        add_paths(1, 2, self.aggregate.events[0].idsupitem)
213
        DBSession.add(self.aggregate)
214
        
215
        ### 1er cas : l'utilisateur n'est pas connecté.
216
        # On vérifie que le plugin retourne bien une erreur 401.
217
        resp = self.app.post(
218
            '/get_plugin_value', 
219
            {"idcorrevent" : str(self.aggregate.idcorrevent),
220
             "plugin_name" : "hls"},
221
            status = 401,)
222
        
223
        ### 2ème cas : l'utilisateur n'a pas les droits
224
        ### sur l'hôte ayant causé le correvent, on doit
225
        ### obtenir une erreur 404 (pas d'événement trouvé
226
        ### avec les informations liées à cet utilisateur).
227
        resp = self.app.post(
228
            '/get_plugin_value', 
229
            {"idcorrevent" : str(self.aggregate.idcorrevent),
230
             "plugin_name" : "hls"},
231
            status = 404,
232
            extra_environ={'REMOTE_USER': 'no_access'})
233
        
234
        ### 3ème cas : l'utilisateur a cette fois les droits.
235
        resp = self.app.post(
236
            '/get_plugin_value', 
237
            {"idcorrevent" : str(self.aggregate.idcorrevent),
238
             "plugin_name" : "hls"},
239
            extra_environ={'REMOTE_USER': 'access'})
240
        # On vérifie que le plugin retourne bien les 2 HLS impactés.
241
        assert_equal(resp.json, {"services": ['HLS12']})
242
    
243
    def test_2_impacted_hls_path(self):
244
        """
245
        Retour du plugin HLS pour 2 chemins impactés
246
        Teste la valeur de retour du plugin lorsque deux
247
        chemins de services de haut niveau sont impactés.
248
        """
249
        
250
        # On peuple la base de données avant le test.
251
        DBSession.add(self.aggregate)
252
        add_paths(2, 2, self.aggregate.events[0].idsupitem)
253
        DBSession.add(self.aggregate)
254
        
255
        ### 1er cas : l'utilisateur n'est pas connecté.
256
        # On vérifie que le plugin retourne bien une erreur 401.
257
        resp = self.app.post(
258
            '/get_plugin_value', 
259
            {"idcorrevent" : str(self.aggregate.idcorrevent),
260
             "plugin_name" : "hls"},
261
            status = 401,)
262
        
263
        ### 2ème cas : l'utilisateur n'a pas les
264
        ### droits sur l'hôte ayant causé le correvent.
265
        resp = self.app.post(
266
            '/get_plugin_value', 
267
            {"idcorrevent" : str(self.aggregate.idcorrevent),
268
             "plugin_name" : "hls"},
269
            status = 404,
270
            extra_environ={'REMOTE_USER': 'no_access'})
271
        
272
        ### 3ème cas : l'utilisateur a cette fois les droits.
273
        resp = self.app.post(
274
            '/get_plugin_value', 
275
            {"idcorrevent" : str(self.aggregate.idcorrevent),
276
             "plugin_name" : "hls"},
277
            extra_environ={'REMOTE_USER': 'access'})
278
        # On vérifie que le plugin retourne bien les 4 HLS impactés.
279
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})
280