Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ 6f6efdcd

History | View | Annotate | Download (9.02 KB)

1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

    
4
from datetime import datetime
5
import transaction
6
from nose.tools import assert_equal
7

    
8
from vigilo.models.session import DBSession
9
from vigilo.models.tables import Permission, DataPermission, StateName, \
10
                            Host, HighLevelService, \
11
                            Event, CorrEvent, ImpactedPath, ImpactedHLS, \
12
                            User, UserGroup
13
from vigilo.models.demo.functions import *
14
from vigiboard.tests import TestController
15

    
16
def populate_DB():
17
    """ Peuple la base de données. """
18

    
19
    # On ajoute un groupe d'hôtes
20
    hostmanagers = add_supitemgroup(name=u'managersgroup')
21

    
22
    # On lui octroie les permissions
23
    usergroup = UserGroup.by_group_name(u'users_with_access')
24
    DBSession.add(DataPermission(
25
        group=hostmanagers,
26
        usergroup=usergroup,
27
        access=u'r',
28
    ))
29
    DBSession.flush()
30

    
31
    # On crée un hôte de test.
32
    host = Host(
33
        name = u'host',
34
        checkhostcmd = u'halt',
35
        snmpcommunity = u'public',
36
        hosttpl = u'/dev/null',
37
        address = u'192.168.1.1',
38
        snmpport = 42,
39
        weight = 42,
40
        )
41
    DBSession.add(host)
42

    
43
    # On affecte cet hôte au groupe précédemment créé.
44
    hostmanagers.supitems.append(host)
45
    DBSession.flush()
46

    
47
    # On ajoute un évènement causé par cet hôte.
48
    event1 = Event(
49
        supitem = host,
50
        message = u'foo',
51
        current_state = StateName.statename_to_value(u'WARNING'),
52
        timestamp = datetime.now(),
53
    )
54
    DBSession.add(event1)
55
    DBSession.flush()
56

    
57
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
58
    aggregate = CorrEvent(
59
        idcause = event1.idevent,
60
        timestamp_active = datetime.now(),
61
        priority = 1,
62
        status = u'None')
63
    aggregate.events.append(event1)
64
    DBSession.add(aggregate)
65
    DBSession.flush()
66

    
67
    transaction.commit()
68
    return aggregate
69

    
70
def add_paths(path_number, path_length, idsupitem):
71
    """
72
    Ajoute path_number chemins de services de haut niveau impactés
73
    dans la base de donnée. Leur longeur sera égale à path_length.
74
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
75

76
    path_number * path_length services de
77
    haut niveau sont créés dans l'opération.
78
    """
79

    
80
    # Création de services de haut niveau dans la BDD.
81
    hls_template = {
82
        'message': u'Bar',
83
        'warning_threshold': 60,
84
        'critical_threshold': 80,
85
        'weight': None,
86
        'priority': 2,
87
    }
88

    
89
    # Création des chemins de services de haut niveau impactés.
90
    for j in range(path_number):
91

    
92
        # On crée le chemin en lui-même
93
        path = ImpactedPath(idsupitem = idsupitem)
94
        DBSession.add(path)
95
        DBSession.flush()
96

    
97
        # Pour chaque étage du chemin,
98
        for i in range(path_length):
99
            # on ajoute un service de haut niveau dans la BDD,
100
            hls = HighLevelService(
101
                servicename = u'HLS' + str(j + 1) + str(i + 1),
102
                **hls_template)
103
            DBSession.add(hls)
104
            # et on ajoute un étage au chemin contenant ce service.
105
            DBSession.add(
106
                ImpactedHLS(
107
                    path = path,
108
                    hls = hls,
109
                    distance = i + 1,
110
                    ))
111

    
112
    DBSession.flush()
113
    transaction.commit()
114

    
115

    
116
class TestHLSPlugin(TestController):
117
    """
118
    Classe de test du contrôleur listant les services
119
    de haut niveau impactés par un évènement corrélé.
120
    """
121
    def setUp(self):
122
        super(TestHLSPlugin, self).setUp()
123
        perm = Permission.by_permission_name(u'vigiboard-access')
124

    
125
        user = User(
126
            user_name=u'access',
127
            fullname=u'',
128
            email=u'user.has@access',
129
        )
130
        usergroup = UserGroup(
131
            group_name=u'users_with_access',
132
        )
133
        usergroup.permissions.append(perm)
134
        user.usergroups.append(usergroup)
135
        DBSession.add(user)
136
        DBSession.add(usergroup)
137
        DBSession.flush()
138

    
139
        user = User(
140
            user_name=u'no_access',
141
            fullname=u'',
142
            email=u'user.has.no@access',
143
        )
144
        usergroup = UserGroup(
145
            group_name=u'users_without_access',
146
        )
147
        usergroup.permissions.append(perm)
148
        user.usergroups.append(usergroup)
149
        DBSession.add(user)
150
        DBSession.add(usergroup)
151
        DBSession.flush()
152

    
153
        self.aggregate = populate_DB()
154

    
155
    def test_no_impacted_hls(self):
156
        """
157
        Retour du plugin HLS pour 0 HLS impacté
158
        Teste la valeur de retour du plugin lorsque
159
        aucun service de haut niveau n'est impacté.
160
        """
161

    
162
        # On peuple la base de données avant le test.
163
        DBSession.add(self.aggregate)
164
        add_paths(0, 0, self.aggregate.events[0].idsupitem)
165
        DBSession.add(self.aggregate)
166

    
167
        ### 1er cas : l'utilisateur n'est pas connecté.
168
        # On vérifie que le plugin retourne bien une erreur 401.
169
        resp = self.app.post(
170
            '/get_plugin_value',
171
            {"idcorrevent" : str(self.aggregate.idcorrevent),
172
             "plugin_name" : "hls"},
173
            status = 401,)
174

    
175
        ### 2ème cas : l'utilisateur n'a pas les
176
        ### droits sur l'hôte ayant causé le correvent.
177
        # On vérifie que le plugin retourne bien une erreur 404.
178
        resp = self.app.post(
179
            '/get_plugin_value',
180
            {"idcorrevent" : str(self.aggregate.idcorrevent),
181
             "plugin_name" : "hls"},
182
            status = 404,
183
            extra_environ={'REMOTE_USER': 'no_access'})
184

    
185
        ### 3ème cas : l'utilisateur a cette fois les droits.
186
        resp = self.app.post(
187
            '/get_plugin_value',
188
            {"idcorrevent" : str(self.aggregate.idcorrevent),
189
             "plugin_name" : "hls"},
190
            extra_environ={'REMOTE_USER': 'access'})
191
        # On vérifie que le plugin ne retourne toujours rien.
192
        assert_equal(resp.json, {"services": []})
193

    
194
    def test_1_impacted_hls_path(self):
195
        """
196
        Retour du plugin HLS pour 1 chemin impacté
197
        Teste la valeur de retour du plugin lorsqu'un
198
        chemin de services de haut niveau est impacté.
199
        """
200

    
201
        # On peuple la base de données avant le test.
202
        DBSession.add(self.aggregate)
203
        add_paths(1, 2, self.aggregate.events[0].idsupitem)
204
        DBSession.add(self.aggregate)
205

    
206
        ### 1er cas : l'utilisateur n'est pas connecté.
207
        # On vérifie que le plugin retourne bien une erreur 401.
208
        resp = self.app.post(
209
            '/get_plugin_value',
210
            {"idcorrevent" : str(self.aggregate.idcorrevent),
211
             "plugin_name" : "hls"},
212
            status = 401,)
213

    
214
        ### 2ème cas : l'utilisateur n'a pas les droits
215
        ### sur l'hôte ayant causé le correvent, on doit
216
        ### obtenir une erreur 404 (pas d'événement trouvé
217
        ### avec les informations liées à cet utilisateur).
218
        resp = self.app.post(
219
            '/get_plugin_value',
220
            {"idcorrevent" : str(self.aggregate.idcorrevent),
221
             "plugin_name" : "hls"},
222
            status = 404,
223
            extra_environ={'REMOTE_USER': 'no_access'})
224

    
225
        ### 3ème cas : l'utilisateur a cette fois les droits.
226
        resp = self.app.post(
227
            '/get_plugin_value',
228
            {"idcorrevent" : str(self.aggregate.idcorrevent),
229
             "plugin_name" : "hls"},
230
            extra_environ={'REMOTE_USER': 'access'})
231
        # On vérifie que le plugin retourne bien les 2 HLS impactés.
232
        assert_equal(resp.json, {"services": ['HLS12']})
233

    
234
    def test_2_impacted_hls_path(self):
235
        """
236
        Retour du plugin HLS pour 2 chemins impactés
237
        Teste la valeur de retour du plugin lorsque deux
238
        chemins de services de haut niveau sont impactés.
239
        """
240

    
241
        # On peuple la base de données avant le test.
242
        DBSession.add(self.aggregate)
243
        add_paths(2, 2, self.aggregate.events[0].idsupitem)
244
        DBSession.add(self.aggregate)
245

    
246
        ### 1er cas : l'utilisateur n'est pas connecté.
247
        # On vérifie que le plugin retourne bien une erreur 401.
248
        resp = self.app.post(
249
            '/get_plugin_value',
250
            {"idcorrevent" : str(self.aggregate.idcorrevent),
251
             "plugin_name" : "hls"},
252
            status = 401,)
253

    
254
        ### 2ème cas : l'utilisateur n'a pas les
255
        ### droits sur l'hôte ayant causé le correvent.
256
        resp = self.app.post(
257
            '/get_plugin_value',
258
            {"idcorrevent" : str(self.aggregate.idcorrevent),
259
             "plugin_name" : "hls"},
260
            status = 404,
261
            extra_environ={'REMOTE_USER': 'no_access'})
262

    
263
        ### 3ème cas : l'utilisateur a cette fois les droits.
264
        resp = self.app.post(
265
            '/get_plugin_value',
266
            {"idcorrevent" : str(self.aggregate.idcorrevent),
267
             "plugin_name" : "hls"},
268
            extra_environ={'REMOTE_USER': 'access'})
269
        # On vérifie que le plugin retourne bien les 4 HLS impactés.
270
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})