VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/useraccount.py@ 106061

Last change on this file since 106061 was 106061, checked in by vboxsync, 4 months ago

Copyright year updates by scm.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 10.9 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: useraccount.py 106061 2024-09-16 14:03:52Z vboxsync $
3
4"""
5Test Manager - User DB records management.
6"""
7
8__copyright__ = \
9"""
10Copyright (C) 2012-2024 Oracle and/or its affiliates.
11
12This file is part of VirtualBox base platform packages, as
13available from https://www.virtualbox.org.
14
15This program is free software; you can redistribute it and/or
16modify it under the terms of the GNU General Public License
17as published by the Free Software Foundation, in version 3 of the
18License.
19
20This program is distributed in the hope that it will be useful, but
21WITHOUT ANY WARRANTY; without even the implied warranty of
22MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23General Public License for more details.
24
25You should have received a copy of the GNU General Public License
26along with this program; if not, see <https://www.gnu.org/licenses>.
27
28The contents of this file may alternatively be used under the terms
29of the Common Development and Distribution License Version 1.0
30(CDDL), a copy of it is provided in the "COPYING.CDDL" file included
31in the VirtualBox distribution, in which case the provisions of the
32CDDL are applicable instead of those of the GPL.
33
34You may elect to license modified versions of this file under the
35terms and conditions of either the GPL or the CDDL or both.
36
37SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0
38"""
39__version__ = "$Revision: 106061 $"
40
41
42# Standard python imports.
43import unittest;
44
45# Validation Kit imports.
46from testmanager import config;
47from testmanager.core.base import ModelDataBase, ModelLogicBase, ModelDataBaseTestCase, TMTooManyRows, TMRowNotFound;
48
49
50class UserAccountData(ModelDataBase):
51 """
52 User account data
53 """
54
55 ksIdAttr = 'uid';
56
57 ksParam_uid = 'UserAccount_uid'
58 ksParam_tsExpire = 'UserAccount_tsExpire'
59 ksParam_tsEffective = 'UserAccount_tsEffective'
60 ksParam_uidAuthor = 'UserAccount_uidAuthor'
61 ksParam_sLoginName = 'UserAccount_sLoginName'
62 ksParam_sUsername = 'UserAccount_sUsername'
63 ksParam_sEmail = 'UserAccount_sEmail'
64 ksParam_sFullName = 'UserAccount_sFullName'
65 ksParam_fReadOnly = 'UserAccount_fReadOnly'
66
67 kasAllowNullAttributes = ['uid', 'tsEffective', 'tsExpire', 'uidAuthor'];
68
69
70 def __init__(self):
71 """Init parameters"""
72 ModelDataBase.__init__(self);
73 self.uid = None;
74 self.tsEffective = None;
75 self.tsExpire = None;
76 self.uidAuthor = None;
77 self.sUsername = None;
78 self.sEmail = None;
79 self.sFullName = None;
80 self.sLoginName = None;
81 self.fReadOnly = None;
82
83 def initFromDbRow(self, aoRow):
84 """
85 Init from database table row
86 Returns self. Raises exception of the row is None.
87 """
88 if aoRow is None:
89 raise TMRowNotFound('User not found.');
90
91 self.uid = aoRow[0];
92 self.tsEffective = aoRow[1];
93 self.tsExpire = aoRow[2];
94 self.uidAuthor = aoRow[3];
95 self.sUsername = aoRow[4];
96 self.sEmail = aoRow[5];
97 self.sFullName = aoRow[6];
98 self.sLoginName = aoRow[7];
99 self.fReadOnly = aoRow[8];
100 return self;
101
102 def initFromDbWithId(self, oDb, uid, tsNow = None, sPeriodBack = None):
103 """
104 Initialize the object from the database.
105 """
106 oDb.execute(self.formatSimpleNowAndPeriodQuery(oDb,
107 'SELECT *\n'
108 'FROM Users\n'
109 'WHERE uid = %s\n'
110 , ( uid, ), tsNow, sPeriodBack));
111 aoRow = oDb.fetchOne()
112 if aoRow is None:
113 raise TMRowNotFound('uid=%s not found (tsNow=%s sPeriodBack=%s)' % (uid, tsNow, sPeriodBack,));
114 return self.initFromDbRow(aoRow);
115
116 def _validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb):
117 # Custom handling of the email field.
118 if sAttr == 'sEmail':
119 return ModelDataBase.validateEmail(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull);
120
121 # Automatically lowercase the login name if we're supposed to do case
122 # insensitive matching. (The feature assumes lower case in DB.)
123 if sAttr == 'sLoginName' and oValue is not None and config.g_kfLoginNameCaseInsensitive:
124 oValue = oValue.lower();
125
126 return ModelDataBase._validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb);
127
128
129class UserAccountLogic(ModelLogicBase):
130 """
131 User account logic (for the Users table).
132 """
133
134 def __init__(self, oDb):
135 ModelLogicBase.__init__(self, oDb)
136 self.dCache = None;
137
138 def fetchForListing(self, iStart, cMaxRows, tsNow, aiSortColumns = None):
139 """
140 Fetches user accounts.
141
142 Returns an array (list) of UserAccountData items, empty list if none.
143 Raises exception on error.
144 """
145 _ = aiSortColumns;
146 if tsNow is None:
147 self._oDb.execute('SELECT *\n'
148 'FROM Users\n'
149 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n'
150 'ORDER BY sUsername DESC\n'
151 'LIMIT %s OFFSET %s\n'
152 , (cMaxRows, iStart,));
153 else:
154 self._oDb.execute('SELECT *\n'
155 'FROM Users\n'
156 'WHERE tsExpire > %s\n'
157 ' AND tsEffective <= %s\n'
158 'ORDER BY sUsername DESC\n'
159 'LIMIT %s OFFSET %s\n'
160 , (tsNow, tsNow, cMaxRows, iStart,));
161
162 aoRows = [];
163 for _ in range(self._oDb.getRowCount()):
164 aoRows.append(UserAccountData().initFromDbRow(self._oDb.fetchOne()));
165 return aoRows;
166
167 def addEntry(self, oData, uidAuthor, fCommit = False):
168 """
169 Add user account entry to the DB.
170 """
171 self._oDb.callProc('UserAccountLogic_addEntry',
172 (uidAuthor, oData.sUsername, oData.sEmail, oData.sFullName, oData.sLoginName, oData.fReadOnly));
173 self._oDb.maybeCommit(fCommit);
174 return True;
175
176 def editEntry(self, oData, uidAuthor, fCommit = False):
177 """
178 Modify user account.
179 """
180 self._oDb.callProc('UserAccountLogic_editEntry',
181 ( uidAuthor, oData.uid, oData.sUsername, oData.sEmail,
182 oData.sFullName, oData.sLoginName, oData.fReadOnly));
183 self._oDb.maybeCommit(fCommit);
184 return True;
185
186 def removeEntry(self, uidAuthor, uid, fCascade = False, fCommit = False):
187 """
188 Delete user account
189 """
190 self._oDb.callProc('UserAccountLogic_delEntry', (uidAuthor, uid));
191 self._oDb.maybeCommit(fCommit);
192 _ = fCascade;
193 return True;
194
195 def _getByField(self, sField, sValue):
196 """
197 Get user account record by its field value
198 """
199 self._oDb.execute('SELECT *\n'
200 'FROM Users\n'
201 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n'
202 ' AND ' + sField + ' = %s'
203 , (sValue,))
204
205 aRows = self._oDb.fetchAll()
206 if len(aRows) not in (0, 1):
207 raise TMTooManyRows('Found more than one user account with the same credentials. Database structure is corrupted.')
208
209 try:
210 return aRows[0]
211 except IndexError:
212 return []
213
214 def getById(self, idUserId):
215 """
216 Get user account information by ID.
217 """
218 return self._getByField('uid', idUserId)
219
220 def tryFetchAccountByLoginName(self, sLoginName):
221 """
222 Try get user account information by login name.
223
224 Returns UserAccountData if found, None if not.
225 Raises exception on DB error.
226 """
227 if config.g_kfLoginNameCaseInsensitive:
228 sLoginName = sLoginName.lower();
229
230 self._oDb.execute('SELECT *\n'
231 'FROM Users\n'
232 'WHERE sLoginName = %s\n'
233 ' AND tsExpire = \'infinity\'::TIMESTAMP\n'
234 , (sLoginName, ));
235 if self._oDb.getRowCount() != 1:
236 if self._oDb.getRowCount() != 0:
237 raise self._oDb.integrityException('%u rows in Users with sLoginName="%s"'
238 % (self._oDb.getRowCount(), sLoginName));
239 return None;
240 return UserAccountData().initFromDbRow(self._oDb.fetchOne());
241
242 def cachedLookup(self, uid):
243 """
244 Looks up the current UserAccountData object for uid via an object cache.
245
246 Returns a shared UserAccountData object. None if not found.
247 Raises exception on DB error.
248 """
249 if self.dCache is None:
250 self.dCache = self._oDb.getCache('UserAccount');
251
252 oUser = self.dCache.get(uid, None);
253 if oUser is None:
254 self._oDb.execute('SELECT *\n'
255 'FROM Users\n'
256 'WHERE uid = %s\n'
257 ' AND tsExpire = \'infinity\'::TIMESTAMP\n'
258 , (uid, ));
259 if self._oDb.getRowCount() == 0:
260 # Maybe it was deleted, try get the last entry.
261 self._oDb.execute('SELECT *\n'
262 'FROM Users\n'
263 'WHERE uid = %s\n'
264 'ORDER BY tsExpire DESC\n'
265 'LIMIT 1\n'
266 , (uid, ));
267 elif self._oDb.getRowCount() > 1:
268 raise self._oDb.integrityException('%s infinity rows for %s' % (self._oDb.getRowCount(), uid));
269
270 if self._oDb.getRowCount() == 1:
271 oUser = UserAccountData().initFromDbRow(self._oDb.fetchOne());
272 self.dCache[uid] = oUser;
273 return oUser;
274
275 def resolveChangeLogAuthors(self, aoEntries):
276 """
277 Given an array of ChangeLogEntry instances, set sAuthor to whatever
278 uidAuthor resolves to.
279
280 Returns aoEntries.
281 Raises exception on DB error.
282 """
283 for oEntry in aoEntries:
284 oUser = self.cachedLookup(oEntry.uidAuthor)
285 if oUser is not None:
286 oEntry.sAuthor = oUser.sUsername;
287 return aoEntries;
288
289
290#
291# Unit testing.
292#
293
294# pylint: disable=missing-docstring
295class UserAccountDataTestCase(ModelDataBaseTestCase):
296 def setUp(self):
297 self.aoSamples = [UserAccountData(),];
298
299if __name__ == '__main__':
300 unittest.main();
301 # not reached.
302
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette