Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ b373a5de

History | View | Annotate | Download (6.76 KB)

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2006-2011 CS-SI
3
# License: GNU GPL v2 <http://www.gnu.org/licenses/gpl-2.0.html>
4

    
5
""" Test du plugin listant les services de haut niveau impactés. """
6

    
7
from datetime import datetime
8
import transaction
9
from nose.tools import assert_equal
10

    
11
from vigilo.models.session import DBSession
12
from vigilo.models.tables import Permission, DataPermission, StateName, \
13
                            SupItemGroup, Host, HighLevelService, \
14
                            Event, CorrEvent, ImpactedPath, ImpactedHLS, \
15
                            User, UserGroup
16
from vigiboard.tests import TestController
17

    
18
def populate_DB():
19
    """ Peuple la base de données. """
20

    
21
    # On ajoute un groupe d'hôtes
22
    hostmanagers = SupItemGroup(name=u'managersgroup', parent=None)
23
    DBSession.add(hostmanagers)
24
    DBSession.flush()
25

    
26
    # On lui octroie les permissions
27
    usergroup = UserGroup.by_group_name(u'users_with_access')
28
    DBSession.add(DataPermission(
29
        group=hostmanagers,
30
        usergroup=usergroup,
31
        access=u'r',
32
    ))
33
    DBSession.flush()
34

    
35
    # On crée un hôte de test.
36
    host = Host(
37
        name = u'host',
38
        checkhostcmd = u'halt',
39
        snmpcommunity = u'public',
40
        hosttpl = u'/dev/null',
41
        address = u'192.168.1.1',
42
        snmpport = 42,
43
        weight = 42,
44
        )
45
    DBSession.add(host)
46

    
47
    # On affecte cet hôte au groupe précédemment créé.
48
    hostmanagers.supitems.append(host)
49
    DBSession.flush()
50

    
51
    # On ajoute un évènement causé par cet hôte.
52
    event1 = Event(
53
        supitem = host,
54
        message = u'foo',
55
        current_state = StateName.statename_to_value(u'WARNING'),
56
        timestamp = datetime.now(),
57
    )
58
    DBSession.add(event1)
59
    DBSession.flush()
60

    
61
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
62
    aggregate = CorrEvent(
63
        idcause = event1.idevent,
64
        timestamp_active = datetime.now(),
65
        priority = 1,
66
        status = u'None')
67
    aggregate.events.append(event1)
68
    DBSession.add(aggregate)
69
    DBSession.flush()
70

    
71
    transaction.commit()
72
    return aggregate
73

    
74
def add_paths(path_number, path_length, idsupitem):
75
    """
76
    Ajoute path_number chemins de services de haut niveau impactés
77
    dans la base de donnée. Leur longeur sera égale à path_length.
78
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
79

80
    path_number * path_length services de
81
    haut niveau sont créés dans l'opération.
82
    """
83

    
84
    # Création de services de haut niveau dans la BDD.
85
    hls_template = {
86
        'message': u'Bar',
87
        'warning_threshold': 60,
88
        'critical_threshold': 80,
89
        'weight': None,
90
        'priority': 2,
91
    }
92

    
93
    # Création des chemins de services de haut niveau impactés.
94
    for j in range(path_number):
95

    
96
        # On crée le chemin en lui-même
97
        path = ImpactedPath(idsupitem = idsupitem)
98
        DBSession.add(path)
99
        DBSession.flush()
100

    
101
        # Pour chaque étage du chemin,
102
        for i in range(path_length):
103
            # on ajoute un service de haut niveau dans la BDD,
104
            hls = HighLevelService(
105
                servicename = u'HLS' + str(j + 1) + str(i + 1),
106
                **hls_template)
107
            DBSession.add(hls)
108
            # et on ajoute un étage au chemin contenant ce service.
109
            DBSession.add(
110
                ImpactedHLS(
111
                    path = path,
112
                    hls = hls,
113
                    distance = i + 1,
114
                    ))
115

    
116
    DBSession.flush()
117
    transaction.commit()
118

    
119

    
120
class TestHLSPlugin(TestController):
121
    """
122
    Classe de test du contrôleur listant les services
123
    de haut niveau impactés par un évènement corrélé.
124
    """
125
    def setUp(self):
126
        super(TestHLSPlugin, self).setUp()
127
        perm = Permission.by_permission_name(u'vigiboard-access')
128

    
129
        user = User(
130
            user_name=u'access',
131
            fullname=u'',
132
            email=u'user.has@access',
133
        )
134
        usergroup = UserGroup(group_name=u'users_with_access')
135
        usergroup.permissions.append(perm)
136
        user.usergroups.append(usergroup)
137
        DBSession.add(user)
138
        DBSession.add(usergroup)
139
        DBSession.flush()
140

    
141
        self.aggregate = populate_DB()
142

    
143
    def test_no_impacted_hls(self):
144
        """
145
        Données affichées par le plugin HLS pour 0 HLS impacté
146
        Teste les données affichées par le  plugin lorsque
147
        aucun service de haut niveau n'est impacté.
148
        """
149

    
150
        # On peuple la base de données avant le test.
151
        DBSession.add(self.aggregate)
152
        add_paths(0, 0, self.aggregate.events[0].idsupitem)
153
        DBSession.add(self.aggregate)
154

    
155
        # On accède à la page principale de VigiBoard
156
        resp = self.app.post(
157
            '/', extra_environ={'REMOTE_USER': 'access'})
158

    
159
        # On s'assure que la colonne des HLS
160
        # impactés est vide pour notre évènement.
161
        plugin_data = resp.lxml.xpath('//table[@class="vigitable"]'
162
            '/tbody/tr/td[@class="plugin_hls"]/text()')
163
        assert_equal(plugin_data[0].strip(), "")
164

    
165
    def test_1_impacted_hls_path(self):
166
        """
167
        Données affichées par le plugin HLS pour 1 chemin impacté
168
        Teste les données affichées par le  plugin lorsque
169
        1 chemin de services de haut niveau est impacté.
170
        """
171

    
172
        # On peuple la base de données avant le test.
173
        DBSession.add(self.aggregate)
174
        add_paths(1, 2, self.aggregate.events[0].idsupitem)
175
        DBSession.add(self.aggregate)
176

    
177
        # On accède à la page principale de VigiBoard
178
        resp = self.app.post(
179
            '/', extra_environ={'REMOTE_USER': 'access'})
180

    
181
        # On s'assure que la colonne des HLS impactés contient
182
        # bien le nom de notre HLS de plus haut niveau impacté.
183
        plugin_data = resp.lxml.xpath('//table[@class="vigitable"]'
184
            '/tbody/tr/td[@class="plugin_hls"]/text()')
185
        assert_equal(plugin_data[0].strip(), "HLS12")
186

    
187
    def test_2_impacted_hls_path(self):
188
        """
189
        Données affichées par le plugin HLS pour 2 chemins impactés
190
        Teste les données affichées par le plugin lorsque
191
        2 chemins de services de haut niveau sont impactés.
192
        """
193

    
194
        # On peuple la base de données avant le test.
195
        DBSession.add(self.aggregate)
196
        add_paths(2, 2, self.aggregate.events[0].idsupitem)
197
        DBSession.add(self.aggregate)
198

    
199
        # On accède à la page principale de VigiBoard
200
        resp = self.app.post(
201
            '/', extra_environ={'REMOTE_USER': 'access'})
202

    
203
        # On s'assure que la colonne des HLS contient bien
204
        # le nombre de HLS de plus haut niveau impactés,
205
        plugin_data = resp.lxml.xpath('//table[@class="vigitable"]'
206
            '/tbody/tr/td[@class="plugin_hls"]/a/text()')
207
        assert_equal(plugin_data[0].strip(), "2")