VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/useraccount.py@ 72539

Last change on this file since 72539 was 69111, checked in by vboxsync, 7 years ago

(C) year

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 10.6 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: useraccount.py 69111 2017-10-17 14:26:02Z vboxsync $
3
4"""
5Test Manager - User DB records management.
6"""
7
8__copyright__ = \
9"""
10Copyright (C) 2012-2017 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 69111 $"
30
31
32# Standard python imports.
33import unittest;
34
35# Validation Kit imports.
36from testmanager import config;
37from testmanager.core.base import ModelDataBase, ModelLogicBase, ModelDataBaseTestCase, TMTooManyRows, TMRowNotFound;
38
39
40class UserAccountData(ModelDataBase):
41 """
42 User account data
43 """
44
45 ksIdAttr = 'uid';
46
47 ksParam_uid = 'UserAccount_uid'
48 ksParam_tsExpire = 'UserAccount_tsExpire'
49 ksParam_tsEffective = 'UserAccount_tsEffective'
50 ksParam_uidAuthor = 'UserAccount_uidAuthor'
51 ksParam_sLoginName = 'UserAccount_sLoginName'
52 ksParam_sUsername = 'UserAccount_sUsername'
53 ksParam_sEmail = 'UserAccount_sEmail'
54 ksParam_sFullName = 'UserAccount_sFullName'
55 ksParam_fReadOnly = 'UserAccount_fReadOnly'
56
57 kasAllowNullAttributes = ['uid', 'tsEffective', 'tsExpire', 'uidAuthor'];
58
59
60 def __init__(self):
61 """Init parameters"""
62 ModelDataBase.__init__(self);
63 self.uid = None;
64 self.tsEffective = None;
65 self.tsExpire = None;
66 self.uidAuthor = None;
67 self.sUsername = None;
68 self.sEmail = None;
69 self.sFullName = None;
70 self.sLoginName = None;
71 self.fReadOnly = None;
72
73 def initFromDbRow(self, aoRow):
74 """
75 Init from database table row
76 Returns self. Raises exception of the row is None.
77 """
78 if aoRow is None:
79 raise TMRowNotFound('User not found.');
80
81 self.uid = aoRow[0];
82 self.tsEffective = aoRow[1];
83 self.tsExpire = aoRow[2];
84 self.uidAuthor = aoRow[3];
85 self.sUsername = aoRow[4];
86 self.sEmail = aoRow[5];
87 self.sFullName = aoRow[6];
88 self.sLoginName = aoRow[7];
89 self.fReadOnly = aoRow[8];
90 return self;
91
92 def initFromDbWithId(self, oDb, uid, tsNow = None, sPeriodBack = None):
93 """
94 Initialize the object from the database.
95 """
96 oDb.execute(self.formatSimpleNowAndPeriodQuery(oDb,
97 'SELECT *\n'
98 'FROM Users\n'
99 'WHERE uid = %s\n'
100 , ( uid, ), tsNow, sPeriodBack));
101 aoRow = oDb.fetchOne()
102 if aoRow is None:
103 raise TMRowNotFound('uid=%s not found (tsNow=%s sPeriodBack=%s)' % (uid, tsNow, sPeriodBack,));
104 return self.initFromDbRow(aoRow);
105
106 def _validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb):
107 # Custom handling of the email field.
108 if sAttr == 'sEmail':
109 return ModelDataBase.validateEmail(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull);
110
111 # Automatically lowercase the login name if we're supposed to do case
112 # insensitive matching. (The feature assumes lower case in DB.)
113 if sAttr == 'sLoginName' and oValue is not None and config.g_kfLoginNameCaseInsensitive:
114 oValue = oValue.lower();
115
116 return ModelDataBase._validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb);
117
118
119class UserAccountLogic(ModelLogicBase):
120 """
121 User account logic (for the Users table).
122 """
123
124 def __init__(self, oDb):
125 ModelLogicBase.__init__(self, oDb)
126 self.dCache = None;
127
128 def fetchForListing(self, iStart, cMaxRows, tsNow, aiSortColumns = None):
129 """
130 Fetches user accounts.
131
132 Returns an array (list) of UserAccountData items, empty list if none.
133 Raises exception on error.
134 """
135 _ = aiSortColumns;
136 if tsNow is None:
137 self._oDb.execute('SELECT *\n'
138 'FROM Users\n'
139 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n'
140 'ORDER BY sUsername DESC\n'
141 'LIMIT %s OFFSET %s\n'
142 , (cMaxRows, iStart,));
143 else:
144 self._oDb.execute('SELECT *\n'
145 'FROM Users\n'
146 'WHERE tsExpire > %s\n'
147 ' AND tsEffective <= %s\n'
148 'ORDER BY sUsername DESC\n'
149 'LIMIT %s OFFSET %s\n'
150 , (tsNow, tsNow, cMaxRows, iStart,));
151
152 aoRows = [];
153 for _ in range(self._oDb.getRowCount()):
154 aoRows.append(UserAccountData().initFromDbRow(self._oDb.fetchOne()));
155 return aoRows;
156
157 def addEntry(self, oData, uidAuthor, fCommit = False):
158 """
159 Add user account entry to the DB.
160 """
161 self._oDb.callProc('UserAccountLogic_addEntry',
162 (uidAuthor, oData.sUsername, oData.sEmail, oData.sFullName, oData.sLoginName, oData.fReadOnly));
163 self._oDb.maybeCommit(fCommit);
164 return True;
165
166 def editEntry(self, oData, uidAuthor, fCommit = False):
167 """
168 Modify user account.
169 """
170 self._oDb.callProc('UserAccountLogic_editEntry',
171 ( uidAuthor, oData.uid, oData.sUsername, oData.sEmail,
172 oData.sFullName, oData.sLoginName, oData.fReadOnly));
173 self._oDb.maybeCommit(fCommit);
174 return True;
175
176 def removeEntry(self, uidAuthor, uid, fCascade = False, fCommit = False):
177 """
178 Delete user account
179 """
180 self._oDb.callProc('UserAccountLogic_delEntry', (uidAuthor, uid));
181 self._oDb.maybeCommit(fCommit);
182 _ = fCascade;
183 return True;
184
185 def _getByField(self, sField, sValue):
186 """
187 Get user account record by its field value
188 """
189 self._oDb.execute('SELECT *\n'
190 'FROM Users\n'
191 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n'
192 ' AND ' + sField + ' = %s'
193 , (sValue,))
194
195 aRows = self._oDb.fetchAll()
196 if len(aRows) not in (0, 1):
197 raise TMTooManyRows('Found more than one user account with the same credentials. Database structure is corrupted.')
198
199 try:
200 return aRows[0]
201 except IndexError:
202 return []
203
204 def getById(self, idUserId):
205 """
206 Get user account information by ID.
207 """
208 return self._getByField('uid', idUserId)
209
210 def tryFetchAccountByLoginName(self, sLoginName):
211 """
212 Try get user account information by login name.
213
214 Returns UserAccountData if found, None if not.
215 Raises exception on DB error.
216 """
217 if config.g_kfLoginNameCaseInsensitive:
218 sLoginName = sLoginName.lower();
219
220 self._oDb.execute('SELECT *\n'
221 'FROM Users\n'
222 'WHERE sLoginName = %s\n'
223 ' AND tsExpire = \'infinity\'::TIMESTAMP\n'
224 , (sLoginName, ));
225 if self._oDb.getRowCount() != 1:
226 if self._oDb.getRowCount() != 0:
227 raise self._oDb.integrityException('%u rows in Users with sLoginName="%s"'
228 % (self._oDb.getRowCount(), sLoginName));
229 return None;
230 return UserAccountData().initFromDbRow(self._oDb.fetchOne());
231
232 def cachedLookup(self, uid):
233 """
234 Looks up the current UserAccountData object for uid via an object cache.
235
236 Returns a shared UserAccountData object. None if not found.
237 Raises exception on DB error.
238 """
239 if self.dCache is None:
240 self.dCache = self._oDb.getCache('UserAccount');
241
242 oUser = self.dCache.get(uid, None);
243 if oUser is None:
244 self._oDb.execute('SELECT *\n'
245 'FROM Users\n'
246 'WHERE uid = %s\n'
247 ' AND tsExpire = \'infinity\'::TIMESTAMP\n'
248 , (uid, ));
249 if self._oDb.getRowCount() == 0:
250 # Maybe it was deleted, try get the last entry.
251 self._oDb.execute('SELECT *\n'
252 'FROM Users\n'
253 'WHERE uid = %s\n'
254 'ORDER BY tsExpire DESC\n'
255 'LIMIT 1\n'
256 , (uid, ));
257 elif self._oDb.getRowCount() > 1:
258 raise self._oDb.integrityException('%s infinity rows for %s' % (self._oDb.getRowCount(), uid));
259
260 if self._oDb.getRowCount() == 1:
261 oUser = UserAccountData().initFromDbRow(self._oDb.fetchOne());
262 self.dCache[uid] = oUser;
263 return oUser;
264
265 def resolveChangeLogAuthors(self, aoEntries):
266 """
267 Given an array of ChangeLogEntry instances, set sAuthor to whatever
268 uidAuthor resolves to.
269
270 Returns aoEntries.
271 Raises exception on DB error.
272 """
273 for oEntry in aoEntries:
274 oUser = self.cachedLookup(oEntry.uidAuthor)
275 if oUser is not None:
276 oEntry.sAuthor = oUser.sUsername;
277 return aoEntries;
278
279
280#
281# Unit testing.
282#
283
284# pylint: disable=C0111
285class UserAccountDataTestCase(ModelDataBaseTestCase):
286 def setUp(self):
287 self.aoSamples = [UserAccountData(),];
288
289if __name__ == '__main__':
290 unittest.main();
291 # not reached.
292
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette