Project

General

Profile

Statistics
| Branch: | Tag: | Revision:

vigiboard / vigiboard / tests / functional / test_event_table.py @ 5dbfa80d

History | View | Annotate | Download (5.67 KB)

1
# -*- coding: utf-8 -*-
2
# vim:set expandtab tabstop=4 shiftwidth=4:
3
"""
4
Test du tableau d'événements de Vigiboard
5
"""
6

    
7
from nose.tools import assert_true, assert_equal
8
from datetime import datetime
9
import transaction
10

    
11
from vigilo.models.configure import DBSession
12
from vigilo.models import Event, CorrEvent, \
13
                            Permission, StateName, \
14
                            Host, HostGroup, LowLevelService
15
from vigiboard.tests import TestController
16

    
17
def populate_DB():
18
    """ Peuple la base de données. """
19

    
20
    # On ajoute des noms d'états.
21
    DBSession.add(StateName(statename=u'OK', order=0))
22
    DBSession.add(StateName(statename=u'WARNING', order=2))
23
    DBSession.flush()
24
    transaction.commit()
25

    
26
    # On ajoute les groupes et leurs dépendances
27
    hosteditors = HostGroup(name=u'editorsgroup')
28
    DBSession.add(hosteditors)
29
    hostmanagers = HostGroup(name=u'managersgroup', parent=hosteditors)
30
    DBSession.add(hostmanagers)
31
    DBSession.flush()
32

    
33
    manage_perm = Permission.by_permission_name(u'manage')
34
    edit_perm = Permission.by_permission_name(u'edit')
35

    
36
    hostmanagers.permissions.append(manage_perm)
37
    hosteditors.permissions.append(edit_perm)
38
    DBSession.flush()
39

    
40
    # Création des hôtes de test.
41
    host_template = {
42
        'checkhostcmd': u'halt',
43
        'snmpcommunity': u'public',
44
        'hosttpl': u'/dev/null',
45
        'mainip': u'192.168.1.1',
46
        'snmpport': 42,
47
        'weight': 42,
48
    }
49

    
50
    managerhost = Host(name=u'managerhost', **host_template)
51
    editorhost = Host(name=u'editorhost', **host_template)
52
    DBSession.add(managerhost)
53
    DBSession.add(editorhost)
54

    
55
    # Affectation des hôtes aux groupes.
56
    hosteditors.hosts.append(editorhost)
57
    hostmanagers.hosts.append(managerhost)
58
    DBSession.flush()
59

    
60
    # Création des services techniques de test.
61
    service_template = {
62
        'command': u'halt',
63
        'op_dep': u'+',
64
        'weight': 42,
65
    }
66

    
67
    service1 = LowLevelService(
68
        host=managerhost,
69
        servicename=u'managerservice',
70
        **service_template
71
    )
72

    
73
    service2 = LowLevelService(
74
        host=editorhost,
75
        servicename=u'managerservice',
76
        **service_template
77
    )
78

    
79
    service3 = LowLevelService(
80
        host=managerhost,
81
        servicename=u'editorservice',
82
        **service_template
83
    )
84

    
85
    service4 = LowLevelService(
86
        host=editorhost,
87
        servicename=u'editorservice',
88
        **service_template
89
    )
90

    
91
    DBSession.add(service1)
92
    DBSession.add(service2)
93
    DBSession.add(service3)
94
    DBSession.add(service4)
95
    DBSession.flush()
96

    
97
    # Ajout des événements eux-mêmes
98
    event_template = {
99
        'message': u'foo',
100
        'current_state': StateName.statename_to_value(u'WARNING'),
101
    }
102

    
103
    event1 = Event(supitem=service1, **event_template)
104
    event2 = Event(supitem=service2, **event_template)
105
    event3 = Event(supitem=service3, **event_template)
106
    event4 = Event(supitem=service4, **event_template)
107
    event5 = Event(supitem=editorhost, **event_template)
108
    event6 = Event(supitem=managerhost, **event_template)
109

    
110
    DBSession.add(event1)
111
    DBSession.add(event2)
112
    DBSession.add(event3)
113
    DBSession.add(event4)
114
    DBSession.add(event5)
115
    DBSession.add(event6)
116
    DBSession.flush()
117

    
118
    # Ajout des événements corrélés
119
    aggregate_template = {
120
        'timestamp_active': datetime.now(),
121
        'priority': 1,
122
        'status': u'None',
123
    }
124
    aggregate1 = CorrEvent(
125
        idcause=event1.idevent, **aggregate_template)
126
    aggregate2 = CorrEvent(
127
        idcause=event4.idevent, **aggregate_template)
128
    aggregate3 = CorrEvent(
129
        idcause=event5.idevent, **aggregate_template)
130
    aggregate4 = CorrEvent(
131
        idcause=event6.idevent, **aggregate_template)
132

    
133
    aggregate1.events.append(event1)
134
    aggregate1.events.append(event3)
135
    aggregate2.events.append(event4)
136
    aggregate2.events.append(event2)
137
    aggregate3.events.append(event5)
138
    aggregate4.events.append(event6)
139
    DBSession.add(aggregate1)
140
    DBSession.add(aggregate2)
141
    DBSession.add(aggregate3)
142
    DBSession.add(aggregate4)
143
    DBSession.flush()
144
    transaction.commit()
145

    
146
class TestEventTable(TestController):
147
    """
148
    Test du tableau d'événements de Vigiboard
149
    """
150

    
151
    def test_event_table(self):
152
        """
153
        Test du tableau d'évènements de la page d'accueil de Vigiboard.
154
        """
155

    
156
        populate_DB()
157

    
158
        ### 1er cas : L'utilisateur utilisé pour
159
        # se connecter à Vigiboard est 'editor'.
160
        environ = {'REMOTE_USER': 'editor'}
161
        response = self.app.get('/', extra_environ=environ)
162

    
163
        # Il doit y avoir 2 lignes de résultats.
164
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
165
        print "There are %d rows in the result set" % len(rows)
166
        assert_equal(len(rows), 2)
167

    
168
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
169
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
170
        print "There are %d columns in the result set" % len(cols)
171
        assert_true(len(cols) > 1)
172

    
173
        ### 2nd cas : L'utilisateur utilisé pour
174
        # se connecter à Vigiboard est 'manager'.
175
        environ = {'REMOTE_USER': 'manager'}
176
        response = self.app.get('/', extra_environ=environ)
177

    
178
        # Il doit y avoir 4 lignes de résultats.
179
        rows = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr')
180
        print "There are %d rows in the result set" % len(rows)
181
        assert_equal(len(rows), 4)
182

    
183
        # Il doit y avoir plusieurs colonnes dans la ligne de résultats.
184
        cols = response.lxml.xpath('//table[@class="vigitable"]/tbody/tr/td')
185
        print "There are %d columns in the result set" % len(cols)
186
        assert_true(len(cols) > 1)
187