Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_vigiboardrequest.py @ 3e6ee4db

History | View | Annotate | Download (8.71 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test de la classe Vigiboard Request
5
"""
6

    
7
from nose.tools import assert_true
8
from datetime import datetime
9
import tg
10
import transaction
11

    
12
from vigiboard.model import DBSession, \
13
    Event, EventHistory, EventsAggregate, \
14
    Permission, User, StateName, \
15
    Group, Host, HostGroup, ServiceLowLevel, ServiceGroup
16
from vigiboard.tests import TestController
17
from vigiboard.controllers.vigiboardrequest import VigiboardRequest
18
from vigiboard.controllers.vigiboard_plugin.tests import MonPlugin
19

    
20

    
21
class TestVigiboardRequest(TestController):
22
    """Test de la classe Vigiboard Request"""
23

    
24
    def setUp(self):
25
        super(TestVigiboardRequest, self).setUp()
26

    
27
        # Les noms d'états.
28
        DBSession.add(StateName(statename=u'OK', order=0))
29
        DBSession.add(StateName(statename=u'WARNING', order=2))
30
        DBSession.flush()
31
        transaction.commit()
32

    
33
        # On peuple la base de données.
34

    
35
        # Les groupes et leurs dépendances
36
        self.editorsgroup = Group(name=u'editorsgroup')
37
        DBSession.add(self.editorsgroup)
38
        DBSession.flush()
39

    
40
        self.managersgroup = Group(name=u'managersgroup', parent=self.editorsgroup)
41
        DBSession.add(self.managersgroup)
42
        DBSession.flush()
43

    
44
        manage_perm = Permission.by_permission_name(u'manage')
45
        edit_perm = Permission.by_permission_name(u'edit')
46

    
47
        self.managersgroup.permissions.append(manage_perm)
48
        self.editorsgroup.permissions.append(edit_perm)
49
        DBSession.flush()
50

    
51
        # Les dépendances des événements
52
        host_template = {
53
            'checkhostcmd': u'halt',
54
            'snmpcommunity': u'public',
55
            'fqhn': u'localhost',
56
            'hosttpl': u'/dev/null',
57
            'mainip': u'192.168.1.1',
58
            'snmpport': 42,
59
        }
60

    
61
        service_template = {
62
            'command': u'halt',
63
            'op_dep': u'+',
64
        }
65

    
66
        DBSession.add(Host(name=u'managerhost', **host_template))
67
        DBSession.add(ServiceLowLevel(name=u'managerservice', **service_template))
68
        DBSession.add(Host(name=u'editorhost', **host_template))
69
        DBSession.add(ServiceLowLevel(name=u'editorservice', **service_template))
70
        DBSession.flush()
71

    
72
        # Table de jointure entre les hôtes/services et les groupes
73
        DBSession.add(HostGroup(hostname = u"managerhost",
74
            idgroup=self.managersgroup.idgroup))
75
        DBSession.add(HostGroup(hostname = u"editorhost",
76
            idgroup=self.editorsgroup.idgroup))
77
        DBSession.add(ServiceGroup(servicename = u"managerservice",
78
            idgroup=self.managersgroup.idgroup))
79
        DBSession.add(ServiceGroup(servicename = u"editorservice",
80
            idgroup=self.editorsgroup.idgroup))
81
        DBSession.flush()
82

    
83
        # Les événements eux-mêmes
84
        event_template = {
85
            'message': u'foo',
86
            'current_state': StateName.statename_to_value(u'WARNING'),
87
        }
88

    
89
        event1 = Event(hostname=u'managerhost',
90
            servicename=u'managerservice', **event_template)
91
        event2 = Event(hostname=u'editorhost',
92
            servicename=u'managerservice', **event_template)
93
        event3 = Event(hostname=u'managerhost',
94
            servicename=u'editorservice', **event_template)
95
        event4 = Event(hostname=u'editorhost',
96
            servicename=u'editorservice', **event_template)
97

    
98
        DBSession.add(event1)
99
        DBSession.add(event2)
100
        DBSession.add(event3)
101
        DBSession.add(event4)
102
        DBSession.flush()
103

    
104

    
105
        # Les historiques
106
        DBSession.add(EventHistory(type_action = u'Nagios update state',
107
            idevent=event1.idevent, timestamp=datetime.now()))
108
        DBSession.add(EventHistory(type_action = u'Acknowlegement change state',
109
            idevent=event1.idevent, timestamp=datetime.now()))
110
        DBSession.add(EventHistory(type_action = u'Nagios update state',
111
            idevent=event2.idevent, timestamp=datetime.now()))
112
        DBSession.add(EventHistory(type_action = u'Acknowlegement change state',
113
            idevent=event2.idevent, timestamp=datetime.now()))
114
        DBSession.add(EventHistory(type_action = u'Nagios update state',
115
            idevent=event3.idevent, timestamp=datetime.now()))
116
        DBSession.add(EventHistory(type_action = u'Acknowlegement change state',
117
            idevent=event3.idevent, timestamp=datetime.now()))
118
        DBSession.add(EventHistory(type_action = u'Nagios update state',
119
            idevent=event4.idevent, timestamp=datetime.now()))
120
        DBSession.add(EventHistory(type_action = u'Acknowlegement change state',
121
            idevent=event4.idevent, timestamp=datetime.now()))
122
        DBSession.flush()
123

    
124
        # Les événements corrélés
125
        aggregate_template = {
126
            'timestamp_active': datetime.now(),
127
            'priority': 1,
128
            'status': u'None',
129
        }
130
        self.aggregate1 = EventsAggregate(
131
            idcause=event1.idevent, **aggregate_template)
132
        self.aggregate2 = EventsAggregate(
133
            idcause=event4.idevent, **aggregate_template)
134

    
135
        self.aggregate1.events.append(event1)
136
        self.aggregate1.events.append(event3)
137
        self.aggregate2.events.append(event4)
138
        self.aggregate2.events.append(event2)
139
        DBSession.add(self.aggregate1)
140
        DBSession.add(self.aggregate2)
141
        DBSession.flush()
142

    
143
        for e in DBSession.query(Event).all():
144
            print "Event", e.idevent, e.hostname, e.servicename, e.current_state
145
        for ea in DBSession.query(EventsAggregate).all():
146
            print "EAggr", ea.idcause, ea.status
147
        for g in DBSession.query(Group).all():
148
            print "Group", g.idgroup, g.name, repr(g.idparent)
149
        for hg in DBSession.query(HostGroup).all():
150
            print "HGrup", hg.idgroup, hg.hostname
151
        for sg in DBSession.query(ServiceGroup).all():
152
            print "SGrup", sg.idgroup, sg.servicename
153
        transaction.commit()
154

    
155
    def tearDown(self):
156
        # This operation is only necessary for DBMS which are
157
        # really strict about table locks, such as PostgreSQL.
158
        # For our tests, we use an (in-memory) SQLite database,
159
        # so we're unaffected. This is done only for completeness.
160
        TestController.tearDown(self)
161

    
162

    
163
    def test_creation_requete(self):
164
        """Génération d'une requête avec plugin et permissions."""
165

    
166
        # On indique qui on est et on requête l'index pour obtenir
167
        # toutes les variables de sessions
168
        environ = {'REMOTE_USER': 'editor'}
169
        response = self.app.get('/', extra_environ=environ)
170
        tg.request = response.request
171

    
172
        # Derrière, VigiboardRequest doit charger le plugin de tests tout seul
173
        tg.config['vigiboard_plugins'] = [['tests', 'MonPlugin']]
174
        vigi_req = VigiboardRequest(User.by_user_name(u'editor'))
175

    
176
        # On effectue les tests suivants :
177
        #   le nombre de lignes (historique et événements) doivent
178
        #       correspondre (vérification des droits imposés par les groupes)
179
        #   le plugin fonctionne correctement
180

    
181
        num_rows = vigi_req.num_rows()
182
        assert_true(num_rows == 1, msg = "1 historique devrait " +
183
                "etre disponible pour l'utilisateur 'editor' mais il " +
184
                "y en a %d" % num_rows)
185

    
186
        vigi_req.format_events(0, 10)
187
        vigi_req.format_history()
188
        print vigi_req.events
189
        assert_true(len(vigi_req.events) == 1 + 1,
190
                msg = "1 evenement devrait etre disponible pour " +
191
                        "l'utilisateur 'editor' mais il y en a %d" %
192
                        (len(vigi_req.events) - 1))
193
        assert_true(vigi_req.events[1][6][0][0] != 'Error', 
194
                    msg = "Probleme d'execution des plugins ou de " +
195
                        "formatage des evenements") 
196

    
197
        # On recommence les tests précédents avec l'utilisateur
198
        # manager (plus de droits)
199

    
200
        environ = {'REMOTE_USER': 'manager'}
201
        response = self.app.get('/', extra_environ=environ)
202
        tg.request = response.request
203

    
204
        vigi_req = VigiboardRequest(User.by_user_name(u'manager'))
205
        vigi_req.add_plugin(MonPlugin)
206

    
207
        num_rows = vigi_req.num_rows()
208
        assert_true(num_rows == 2, 
209
                msg = "2 historiques devraient etre disponibles pour " +
210
                        "l'utilisateur 'manager' mais il y en a %d" % num_rows)
211
        vigi_req.format_events(0, 10)
212
        vigi_req.format_history()
213
        assert_true(len(vigi_req.events) == 2 + 1, 
214
                msg = "2 evenements devraient être disponibles pour " +
215
                        "l'utilisateur 'manager' mais il y en a %d" %
216
                        (len(vigi_req.events) - 1))
217
        assert_true(vigi_req.events[1][6][0][0] != 'Error', 
218
                msg = "Probleme d'execution des plugins")
219