Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / plugins / test_plugin_hls.py @ cf3c2494

History | View | Annotate | Download (6.67 KB)

1
# -*- coding: utf-8 -*-
2
""" Test du plugin listant les services de haut niveau impactés. """
3

    
4
from datetime import datetime
5
import transaction
6
from nose.tools import assert_equal
7

    
8
from vigilo.models.session import DBSession
9
from vigilo.models.tables import Permission, DataPermission, StateName, \
10
                            SupItemGroup, Host, HighLevelService, \
11
                            Event, CorrEvent, ImpactedPath, ImpactedHLS, \
12
                            User, UserGroup
13
from vigiboard.tests import TestController
14

    
15
def populate_DB():
16
    """ Peuple la base de données. """
17

    
18
    # On ajoute un groupe d'hôtes
19
    hostmanagers = SupItemGroup(name=u'managersgroup')
20
    DBSession.add(hostmanagers)
21
    DBSession.flush()
22

    
23
    # On lui octroie les permissions
24
    usergroup = UserGroup.by_group_name(u'users_with_access')
25
    DBSession.add(DataPermission(
26
        group=hostmanagers,
27
        usergroup=usergroup,
28
        access=u'r',
29
    ))
30
    DBSession.flush()
31

    
32
    # On crée un hôte de test.
33
    host = Host(
34
        name = u'host',
35
        checkhostcmd = u'halt',
36
        snmpcommunity = u'public',
37
        hosttpl = u'/dev/null',
38
        address = u'192.168.1.1',
39
        snmpport = 42,
40
        weight = 42,
41
        )
42
    DBSession.add(host)
43

    
44
    # On affecte cet hôte au groupe précédemment créé.
45
    hostmanagers.supitems.append(host)
46
    DBSession.flush()
47

    
48
    # On ajoute un évènement causé par cet hôte.
49
    event1 = Event(
50
        supitem = host,
51
        message = u'foo',
52
        current_state = StateName.statename_to_value(u'WARNING'),
53
        timestamp = datetime.now(),
54
    )
55
    DBSession.add(event1)
56
    DBSession.flush()
57

    
58
    # On ajoute un évènement corrélé causé par cet évènement 'brut'.
59
    aggregate = CorrEvent(
60
        idcause = event1.idevent,
61
        timestamp_active = datetime.now(),
62
        priority = 1,
63
        status = u'None')
64
    aggregate.events.append(event1)
65
    DBSession.add(aggregate)
66
    DBSession.flush()
67

    
68
    transaction.commit()
69
    return aggregate
70

    
71
def add_paths(path_number, path_length, idsupitem):
72
    """
73
    Ajoute path_number chemins de services de haut niveau impactés
74
    dans la base de donnée. Leur longeur sera égale à path_length.
75
    La 3ème valeur passée en paramètre est l'id du supitem impactant.
76

77
    path_number * path_length services de
78
    haut niveau sont créés dans l'opération.
79
    """
80

    
81
    # Création de services de haut niveau dans la BDD.
82
    hls_template = {
83
        'message': u'Bar',
84
        'warning_threshold': 60,
85
        'critical_threshold': 80,
86
        'weight': None,
87
        'priority': 2,
88
    }
89

    
90
    # Création des chemins de services de haut niveau impactés.
91
    for j in range(path_number):
92

    
93
        # On crée le chemin en lui-même
94
        path = ImpactedPath(idsupitem = idsupitem)
95
        DBSession.add(path)
96
        DBSession.flush()
97

    
98
        # Pour chaque étage du chemin,
99
        for i in range(path_length):
100
            # on ajoute un service de haut niveau dans la BDD,
101
            hls = HighLevelService(
102
                servicename = u'HLS' + str(j + 1) + str(i + 1),
103
                **hls_template)
104
            DBSession.add(hls)
105
            # et on ajoute un étage au chemin contenant ce service.
106
            DBSession.add(
107
                ImpactedHLS(
108
                    path = path,
109
                    hls = hls,
110
                    distance = i + 1,
111
                    ))
112

    
113
    DBSession.flush()
114
    transaction.commit()
115

    
116

    
117
class TestHLSPlugin(TestController):
118
    """
119
    Classe de test du contrôleur listant les services
120
    de haut niveau impactés par un évènement corrélé.
121
    """
122
    def setUp(self):
123
        super(TestHLSPlugin, self).setUp()
124
        perm = Permission.by_permission_name(u'vigiboard-access')
125

    
126
        user = User(
127
            user_name=u'access',
128
            fullname=u'',
129
            email=u'user.has@access',
130
        )
131
        usergroup = UserGroup(
132
            group_name=u'users_with_access',
133
        )
134
        usergroup.permissions.append(perm)
135
        user.usergroups.append(usergroup)
136
        DBSession.add(user)
137
        DBSession.add(usergroup)
138
        DBSession.flush()
139

    
140
        self.aggregate = populate_DB()
141

    
142
    def test_no_impacted_hls(self):
143
        """
144
        Données affichée par le plugin HLS pour 0 HLS impacté
145
        Teste les données affichées par le  plugin lorsque
146
        aucun service de haut niveau n'est impacté.
147
        """
148

    
149
        # On peuple la base de données avant le test.
150
        DBSession.add(self.aggregate)
151
        add_paths(0, 0, self.aggregate.events[0].idsupitem)
152
        DBSession.add(self.aggregate)
153

    
154
        # On accède à la page principale de VigiBoard
155
        resp = self.app.post(
156
            '/', extra_environ={'REMOTE_USER': 'access'})
157

    
158
        # On s'assure que la colonne des HLS
159
        # impactés est vide pour notre évènement.
160
        plugin_data = resp.lxml.xpath('//table[@class="vigitable"]'
161
            '/tbody/tr/td[@class="plugin_hls"]/text()')
162
        assert_equal(plugin_data[0].strip(), "")
163

    
164
    def test_1_impacted_hls_path(self):
165
        """
166
        Données affichée par le plugin HLS pour 1 chemin impacté
167
        Teste les données affichées par le  plugin lorsque
168
        1 chemin de services de haut niveau est impacté.
169
        """
170

    
171
        # On peuple la base de données avant le test.
172
        DBSession.add(self.aggregate)
173
        add_paths(1, 2, self.aggregate.events[0].idsupitem)
174
        DBSession.add(self.aggregate)
175

    
176
        # On accède à la page principale de VigiBoard
177
        resp = self.app.post(
178
            '/', extra_environ={'REMOTE_USER': 'access'})
179

    
180
        # On s'assure que la colonne des HLS impactés contient
181
        # bien le nom de notre HLS de plus haut niveau impacté.
182
        plugin_data = resp.lxml.xpath('//table[@class="vigitable"]'
183
            '/tbody/tr/td[@class="plugin_hls"]/text()')
184
        assert_equal(plugin_data[0].strip(), "HLS12")
185

    
186
    def test_2_impacted_hls_path(self):
187
        """
188
        Données affichée par le plugin HLS pour 2 chemins impactés
189
        Teste les données affichées par le plugin lorsque
190
        2 chemins de services de haut niveau sont impactés.
191
        """
192

    
193
        # On peuple la base de données avant le test.
194
        DBSession.add(self.aggregate)
195
        add_paths(2, 2, self.aggregate.events[0].idsupitem)
196
        DBSession.add(self.aggregate)
197

    
198
        # On accède à la page principale de VigiBoard
199
        resp = self.app.post(
200
            '/', extra_environ={'REMOTE_USER': 'access'})
201

    
202
        # On s'assure que la colonne des HLS contient bien
203
        # le nombre de HLS de plus haut niveau impactés,
204
        plugin_data = resp.lxml.xpath('//table[@class="vigitable"]'
205
            '/tbody/tr/td[@class="plugin_hls"]/a/text()')
206
        assert_equal(plugin_data[0].strip(), "2")
207