Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / vigiboard_plugin / test_plugin_hls.py @ 5dbfa80d

History | View | Annotate | Download (8.21 KB)

1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

    
4
from datetime import datetime
5
import transaction
6
from vigiboard.tests import TestController
7
from nose.tools import assert_equal
8
from vigilo.models import Permission, StateName, \
9
                            HostGroup, Host, HighLevelService, \
10
                            Event, CorrEvent, ImpactedPath, ImpactedHLS
11
from vigilo.models.configure import DBSession
12

    
13
def populate_DB():
14
    """ Peuple la base de données. """
15

    
16
    # On ajoute des noms d'états.
17
    DBSession.add(StateName(statename = u'OK', order=0))
18
    DBSession.add(StateName(statename = u'WARNING', order=2))
19
    DBSession.flush()
20
    transaction.commit()
21

    
22
    # On ajoute un groupe d'hôtes
23
    hostmanagers = HostGroup(name = u'managersgroup')
24
    DBSession.add(hostmanagers)
25
    DBSession.flush()
26

    
27
    # On lui octroie les permissions
28
    manage_perm = Permission.by_permission_name(u'manage')
29
    hostmanagers.permissions.append(manage_perm)
30
    DBSession.flush()
31

    
32
    # On crée un hôte de test.
33
    host = Host(
34
        name = u'host', 
35
        checkhostcmd = u'halt',
36
        snmpcommunity = u'public',
37
        hosttpl = u'/dev/null',
38
        mainip = u'192.168.1.1',
39
        snmpport = 42,
40
        weight = 42,
41
        )
42
    DBSession.add(host)
43

    
44
    # On affecte cet hôte au groupe précédemment créé.
45
    hostmanagers.hosts.append(host)
46
    DBSession.flush()
47

    
48
    # On ajoute un évènement causé par cet hôte.
49
    event1 = Event(
50
        supitem = host,
51
        message = u'foo',
52
        current_state = StateName.statename_to_value(u'WARNING'))
53
    DBSession.add(event1)
54
    DBSession.flush()
55

    
56
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
57
    aggregate = CorrEvent(
58
        idcause = event1.idevent, 
59
        timestamp_active = datetime.now(),
60
        priority = 1,
61
        status = u'None')
62
    aggregate.events.append(event1)
63
    DBSession.add(aggregate)
64
    DBSession.flush()
65
    
66
    transaction.commit()
67

    
68
    return aggregate
69

    
70
def add_paths(path_number, path_length, idsupitem):
71
    """ 
72
    Ajoute path_number chemins de services de haut niveau impactés
73
    dans la base de donnée. Leur longeur sera égale à path_length.
74
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
75
     
76
    path_number * path_length services de 
77
    haut niveau sont créés dans l'opération.
78
    """
79

    
80
    # Création de services de haut niveau dans la BDD.
81
    hls_template = {
82
        'op_dep': u'&',
83
        'message': u'Bar',
84
        'warning_threshold': 60,
85
        'critical_threshold': 80,
86
        'weight': None,
87
        'priority': 2,
88
    }
89

    
90
    # Création des chemins de services de haut niveau impactés.
91
    for j in range(path_number):
92
        
93
        # On crée le chemin en lui-même
94
        path = ImpactedPath(idsupitem = idsupitem)
95
        DBSession.add(path)
96
        DBSession.flush()
97
        
98
        # Pour chaque étage du chemin,
99
        for i in range(path_length):
100
            # on ajoute un service de haut niveau dans la BDD,
101
            hls = HighLevelService(
102
                servicename = u'HLS' + str(j + 1) + str(i + 1), 
103
                **hls_template)
104
            DBSession.add(hls)
105
            # et on ajoute un étage au chemin contenant ce service. 
106
            DBSession.add(
107
                ImpactedHLS(
108
                    path = path,
109
                    hls = hls,
110
                    distance = i + 1,
111
                    ))
112
    
113
    DBSession.flush()
114
    transaction.commit()
115

    
116

    
117
class TestHLSPlugin(TestController):
118
    """
119
    Classe de test du contrôleur listant les services 
120
    de haut niveau impactés par un évènement corrélé.
121
    """
122
    
123
    def test_no_impacted_hls(self):
124
        """
125
        Retour du plugin SHN pour 0 SHN impacté
126
        Teste la valeur de retour du plugin lorsque
127
        aucun service de haut niveau n'est impacté.
128
        """
129
        
130
        # On peuple la base de données avant le test.
131
        aggregate = populate_DB()
132
        DBSession.add(aggregate)
133
        add_paths(0, 0, aggregate.events[0].idsupitem)
134
        DBSession.add(aggregate)
135
        
136
        ### 1er cas : l'utilisateur n'est pas connecté.
137
        # On vérifie que le plugin retourne bien une erreur 404.
138
        resp = self.app.post(
139
            '/get_plugin_value', 
140
            {"idcorrevent" : str(aggregate.idcorrevent),
141
             "plugin_name" : "shn"},
142
            status = 401,)
143
        
144
        ### 2ème cas : l'utilisateur n'a pas les
145
        ### droits sur l'hôte ayant causé le correvent.
146
        # On vérifie que le plugin retourne bien une erreur 404.
147
        resp = self.app.post(
148
            '/get_plugin_value', 
149
            {"idcorrevent" : str(aggregate.idcorrevent),
150
             "plugin_name" : "shn"},
151
            status = 404,
152
            extra_environ={'REMOTE_USER': 'editor'})
153
        
154
        ### 3ème cas : l'utilisateur a cette fois les droits.        
155
        resp = self.app.post(
156
            '/get_plugin_value', 
157
            {"idcorrevent" : str(aggregate.idcorrevent),
158
             "plugin_name" : "shn"},
159
            extra_environ={'REMOTE_USER': 'manager'})
160
        # On vérifie que le plugin ne retourne toujours rien.
161
        assert_equal(resp.json, {"services": []})
162
    
163
    def test_1_impacted_hls_path(self):
164
        """
165
        Retour du plugin SHN pour 1 chemin impacté
166
        Teste la valeur de retour du plugin lorsqu'un
167
        chemin de services de haut niveau est impacté.
168
        """
169
        
170
        # On peuple la base de données avant le test.
171
        aggregate = populate_DB()
172
        DBSession.add(aggregate)
173
        add_paths(1, 2, aggregate.events[0].idsupitem)
174
        DBSession.add(aggregate)
175
        
176
        ### 1er cas : l'utilisateur n'est pas connecté.
177
        # On vérifie que le plugin retourne bien une erreur 404.
178
        resp = self.app.post(
179
            '/get_plugin_value', 
180
            {"idcorrevent" : str(aggregate.idcorrevent),
181
             "plugin_name" : "shn"},
182
            status = 401,)
183
        
184
        ### 2ème cas : l'utilisateur n'a pas les
185
        ### droits sur l'hôte ayant causé le correvent.
186
        resp = self.app.post(
187
            '/get_plugin_value', 
188
            {"idcorrevent" : str(aggregate.idcorrevent),
189
             "plugin_name" : "shn"},
190
            status = 404,
191
            extra_environ={'REMOTE_USER': 'editor'})
192
        
193
        ### 3ème cas : l'utilisateur a cette fois les droits.        
194
        resp = self.app.post(
195
            '/get_plugin_value', 
196
            {"idcorrevent" : str(aggregate.idcorrevent),
197
             "plugin_name" : "shn"},
198
            extra_environ={'REMOTE_USER': 'manager'})
199
        # On vérifie que le plugin retourne bien les 2 SHN impactés..
200
        assert_equal(resp.json, {"services": ['HLS12']})
201
    
202
    def test_2_impacted_hls_path(self):
203
        """
204
        Retour du plugin SHN pour 2 chemins impactés
205
        Teste la valeur de retour du plugin lorsque deux
206
        chemins de services de haut niveau sont impactés.
207
        """
208
        
209
        # On peuple la base de données avant le test.
210
        aggregate = populate_DB()
211
        DBSession.add(aggregate)
212
        add_paths(2, 2, aggregate.events[0].idsupitem)
213
        DBSession.add(aggregate)
214
        
215
        ### 1er cas : l'utilisateur n'est pas connecté.
216
        # On vérifie que le plugin retourne bien une erreur 404.
217
        resp = self.app.post(
218
            '/get_plugin_value', 
219
            {"idcorrevent" : str(aggregate.idcorrevent),
220
             "plugin_name" : "shn"},
221
            status = 401,)
222
        
223
        ### 2ème cas : l'utilisateur n'a pas les
224
        ### droits sur l'hôte ayant causé le correvent.
225
        resp = self.app.post(
226
            '/get_plugin_value', 
227
            {"idcorrevent" : str(aggregate.idcorrevent),
228
             "plugin_name" : "shn"},
229
            status = 404,
230
            extra_environ={'REMOTE_USER': 'editor'})
231
        
232
        ### 3ème cas : l'utilisateur a cette fois les droits.        
233
        resp = self.app.post(
234
            '/get_plugin_value', 
235
            {"idcorrevent" : str(aggregate.idcorrevent),
236
             "plugin_name" : "shn"},
237
            extra_environ={'REMOTE_USER': 'manager'})
238
        # On vérifie que le plugin retourne bien les 4 SHN impactés..
239
        assert_equal(resp.json, {"services": ['HLS12', 'HLS22']})
240