VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 98103

Last change on this file since 98103 was 98103, checked in by vboxsync, 2 years ago

Copyright year updates by scm.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 69.2 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 98103 2023-01-17 14:15:46Z vboxsync $
3# pylint: disable=too-many-lines
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2023 Oracle and/or its affiliates.
13
14This file is part of VirtualBox base platform packages, as
15available from https://www.virtualbox.org.
16
17This program is free software; you can redistribute it and/or
18modify it under the terms of the GNU General Public License
19as published by the Free Software Foundation, in version 3 of the
20License.
21
22This program is distributed in the hope that it will be useful, but
23WITHOUT ANY WARRANTY; without even the implied warranty of
24MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25General Public License for more details.
26
27You should have received a copy of the GNU General Public License
28along with this program; if not, see <https://www.gnu.org/licenses>.
29
30The contents of this file may alternatively be used under the terms
31of the Common Development and Distribution License Version 1.0
32(CDDL), a copy of it is provided in the "COPYING.CDDL" file included
33in the VirtualBox distribution, in which case the provisions of the
34CDDL are applicable instead of those of the GPL.
35
36You may elect to license modified versions of this file under the
37terms and conditions of either the GPL or the CDDL or both.
38
39SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0
40"""
41__version__ = "$Revision: 98103 $"
42
43
44# Standard python imports.
45import sys;
46import unittest;
47
48# Validation Kit imports.
49from common import utils, constants;
50from testmanager import config;
51from testmanager.core.build import BuildDataEx, BuildLogic;
52from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
53from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
54from testmanager.core.globalresource import GlobalResourceLogic;
55from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
56from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
57from testmanager.core.testbox import TestBoxData, TestBoxDataEx;
58from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
59from testmanager.core.testcase import TestCaseLogic;
60from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
61from testmanager.core.testset import TestSetData, TestSetLogic;
62
63# Python 3 hacks:
64if sys.version_info[0] >= 3:
65 xrange = range; # pylint: disable=redefined-builtin,invalid-name
66
67
68
69class ReCreateQueueData(object):
70 """
71 Data object for recreating a scheduling queue.
72
73 It's mostly a storage object, but has a few data checking operation
74 associated with it.
75 """
76
77 def __init__(self, oDb, idSchedGroup):
78 #
79 # Load data from the database.
80 #
81 oSchedGroupLogic = SchedGroupLogic(oDb);
82 self.oSchedGroup = oSchedGroupLogic.cachedLookup(idSchedGroup);
83
84 # Will extend the entries with aoTestCases and dTestCases members
85 # further down (SchedGroupMemberDataEx). checkForGroupDepCycles
86 # will add aidTestGroupPreReqs.
87 self.aoTestGroups = oSchedGroupLogic.getMembers(idSchedGroup);
88
89 # aoTestCases entries are TestCaseData instance with iSchedPriority
90 # and idTestGroup added for our purposes.
91 # We will add oTestGroup and aoArgsVariations members to each further down.
92 self.aoTestCases = oSchedGroupLogic.getTestCasesForGroup(idSchedGroup, cMax = 4096);
93
94 # Load dependencies.
95 oTestCaseLogic = TestCaseLogic(oDb)
96 for oTestCase in self.aoTestCases:
97 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
98
99 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
100 # and idTestGroup added for our purposes.
101 # We will add oTestGroup and oTestCase members to each further down.
102 self.aoArgsVariations = oSchedGroupLogic.getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
103
104 #
105 # Generate global lookups.
106 #
107
108 # Generate a testcase lookup dictionary for use when working on
109 # argument variations.
110 self.dTestCases = {};
111 for oTestCase in self.aoTestCases:
112 self.dTestCases[oTestCase.idTestCase] = oTestCase;
113 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
114
115 # Generate a testgroup lookup dictionary.
116 self.dTestGroups = {};
117 for oTestGroup in self.aoTestGroups:
118 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
119 assert len(self.dTestGroups) == len(self.aoTestGroups);
120
121 #
122 # Associate extra members with the base data.
123 #
124 if self.aoTestGroups:
125 # Prep the test groups.
126 for oTestGroup in self.aoTestGroups:
127 oTestGroup.aoTestCases = [];
128 oTestGroup.dTestCases = {};
129
130 # Link testcases to their group, both directions. Prep testcases for
131 # argument varation association.
132 oTestGroup = self.aoTestGroups[0];
133 for oTestCase in self.aoTestCases:
134 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
135 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
136
137 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
138 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
139 oTestGroup.aoTestCases.append(oTestCase);
140 oTestCase.oTestGroup = oTestGroup;
141 oTestCase.aoArgsVariations = [];
142
143 # Associate testcase argument variations with their testcases (group)
144 # in both directions.
145 oTestGroup = self.aoTestGroups[0];
146 oTestCase = self.aoTestCases[0] if self.aoTestCases else None;
147 for oArgVariation in self.aoArgsVariations:
148 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
149 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
150 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
151 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
152
153 oTestCase.aoArgsVariations.append(oArgVariation);
154 oArgVariation.oTestCase = oTestCase;
155 oArgVariation.oTestGroup = oTestGroup;
156
157 else:
158 assert not self.aoTestCases;
159 assert not self.aoArgsVariations;
160 # done.
161
162 @staticmethod
163 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
164 """ Returns a chain of IDs error entry. """
165
166 sMsg += ' Dependency chain: %s' % (aidChain[0],);
167 for i in range(1, len(aidChain)):
168 sMsg += ' -> %s' % (aidChain[i],);
169
170 aoErrors.append([sMsg, oObj]);
171 return aoErrors;
172
173 def checkForGroupDepCycles(self):
174 """
175 Checks for testgroup depencency cycles and any missing testgroup
176 dependencies.
177 Returns array of errors (see SchedulderBase.recreateQueue()).
178 """
179 aoErrors = [];
180 for oTestGroup in self.aoTestGroups:
181 idPreReq = oTestGroup.idTestGroupPreReq;
182 if idPreReq is None:
183 oTestGroup.aidTestGroupPreReqs = [];
184 continue;
185
186 aidChain = [oTestGroup.idTestGroup,];
187 while idPreReq is not None:
188 aidChain.append(idPreReq);
189 if len(aidChain) >= 10:
190 self._addPreReqError(aoErrors, aidChain, oTestGroup,
191 'TestGroup #%s prerequisite chain is too long!'
192 % (oTestGroup.idTestGroup,));
193 break;
194
195 oDep = self.dTestGroups.get(idPreReq, None);
196 if oDep is None:
197 self._addPreReqError(aoErrors, aidChain, oTestGroup,
198 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
199 % (oTestGroup.idTestGroup, idPreReq,));
200 break;
201
202 idPreReq = oDep.idTestGroupPreReq;
203 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
204
205 return aoErrors;
206
207
208 def checkForMissingTestCaseDeps(self):
209 """
210 Checks that testcase dependencies stays within bounds. We do not allow
211 dependencies outside a testgroup, no dependency cycles or even remotely
212 long dependency chains.
213
214 Returns array of errors (see SchedulderBase.recreateQueue()).
215 """
216 aoErrors = [];
217 for oTestGroup in self.aoTestGroups:
218 for oTestCase in oTestGroup.aoTestCases:
219 if not oTestCase.aidPreReqs:
220 continue;
221
222 # Stupid recursion code using special stack(s).
223 aiIndexes = [[oTestCase, 0], ];
224 aidChain = [oTestCase.idTestGroup,];
225 while aiIndexes:
226 (oCur, i) = aiIndexes[-1];
227 if i >= len(oCur.aidPreReqs):
228 aiIndexes.pop();
229 aidChain.pop();
230 else:
231 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
232
233 idPreReq = oTestCase.aidPreReqs[i];
234 oDep = oTestGroup.dTestCases.get(idPreReq, None);
235 if oDep is None:
236 self._addPreReqError(aoErrors, aidChain, oTestCase,
237 'TestCase #%s prerequisite #%s is not in the scheduling group!'
238 % (oTestCase.idTestCase, idPreReq));
239 elif idPreReq in aidChain:
240 self._addPreReqError(aoErrors, aidChain, oTestCase,
241 'TestCase #%s prerequisite #%s creates a cycle!'
242 % (oTestCase.idTestCase, idPreReq));
243 elif not oDep.aiPreReqs:
244 pass;
245 elif len(aidChain) >= 10:
246 self._addPreReqError(aoErrors, aidChain, oTestCase,
247 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
248 else:
249 aiIndexes.append([oDep, 0]);
250 aidChain.append(idPreReq);
251
252 return aoErrors;
253
254 def deepTestGroupSort(self):
255 """
256 Sorts the testgroups and their testcases by priority and dependencies.
257 Note! Don't call this before checking for dependency cycles!
258 """
259 if not self.aoTestGroups:
260 return;
261
262 #
263 # ASSUMES groups as well as testcases are sorted by priority by the
264 # database. So we only have to concern ourselves with the dependency
265 # sorting.
266 #
267 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
268 for iTestGroup, oTestGroup in enumerate(self.aoTestGroups):
269 if oTestGroup.iSchedPriority > iGrpPrio:
270 raise TMExceptionBase('Incorrectly sorted testgroups returned by database: iTestGroup=%s prio=%s %s'
271 % ( iTestGroup, iGrpPrio,
272 ', '.join(['(%s: %s)' % (oCur.idTestGroup, oCur.iSchedPriority)
273 for oCur in self.aoTestGroups]), ) );
274 iGrpPrio = oTestGroup.iSchedPriority;
275
276 if oTestGroup.aoTestCases:
277 iTstPrio = oTestGroup.aoTestCases[0].iSchedPriority;
278 for iTestCase, oTestCase in enumerate(oTestGroup.aoTestCases):
279 if oTestCase.iSchedPriority > iTstPrio:
280 raise TMExceptionBase('Incorrectly sorted testcases returned by database: i=%s prio=%s idGrp=%s %s'
281 % ( iTestCase, iTstPrio, oTestGroup.idTestGroup,
282 ', '.join(['(%s: %s)' % (oCur.idTestCase, oCur.iSchedPriority)
283 for oCur in oTestGroup.aoTestCases]),));
284
285 #
286 # Sort the testgroups by dependencies.
287 #
288 i = 0;
289 while i < len(self.aoTestGroups):
290 oTestGroup = self.aoTestGroups[i];
291 if oTestGroup.idTestGroupPreReq is not None:
292 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
293 if iPreReq > i:
294 # The prerequisite is after the current entry. Move the
295 # current entry so that it's following it's prereq entry.
296 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
297 self.aoTestGroups.pop(i);
298 continue;
299 assert iPreReq < i;
300 i += 1; # Advance.
301
302 #
303 # Sort the testcases by dependencies.
304 # Same algorithm as above, just more prerequisites.
305 #
306 for oTestGroup in self.aoTestGroups:
307 i = 0;
308 while i < len(oTestGroup.aoTestCases):
309 oTestCase = oTestGroup.aoTestCases[i];
310 if oTestCase.aidPreReqs:
311 for idPreReq in oTestCase.aidPreReqs:
312 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
313 if iPreReq > i:
314 # The prerequisite is after the current entry. Move the
315 # current entry so that it's following it's prereq entry.
316 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
317 oTestGroup.aoTestGroups.pop(i);
318 i -= 1; # Don't advance.
319 break;
320 assert iPreReq < i;
321 i += 1; # Advance.
322
323
324
325class SchedQueueData(ModelDataBase):
326 """
327 Scheduling queue data item.
328 """
329
330 ksIdAttr = 'idSchedGroup';
331
332 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
333 ksParam_idItem = 'SchedQueueData_idItem';
334 ksParam_offQueue = 'SchedQueueData_offQueue';
335 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
336 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
337 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
338 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
339 ksParam_tsConfig = 'SchedQueueData_tsConfig';
340 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
341 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
342 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
343
344 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
345 'tsConfig', 'tsLastScheduled' ];
346
347
348 def __init__(self):
349 ModelDataBase.__init__(self);
350
351 #
352 # Initialize with defaults.
353 # See the database for explanations of each of these fields.
354 #
355 self.idSchedGroup = None;
356 self.idItem = None;
357 self.offQueue = None;
358 self.idGenTestCaseArgs = None;
359 self.idTestGroup = None;
360 self.aidTestGroupPreReqs = None;
361 self.bmHourlySchedule = None;
362 self.tsConfig = None;
363 self.tsLastScheduled = None;
364 self.idTestSetGangLeader = None;
365 self.cMissingGangMembers = 1;
366
367 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=too-many-arguments
368 bmHourlySchedule, cMissingGangMembers,
369 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
370 """
371 Reinitialize with all attributes potentially given as inputs.
372 Return self.
373 """
374 self.idSchedGroup = idSchedGroup;
375 self.idItem = idItem;
376 self.offQueue = offQueue;
377 self.idGenTestCaseArgs = idGenTestCaseArgs;
378 self.idTestGroup = idTestGroup;
379 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
380 self.bmHourlySchedule = bmHourlySchedule;
381 self.tsConfig = tsConfig;
382 self.tsLastScheduled = tsLastScheduled;
383 self.idTestSetGangLeader = idTestSetGangLeader;
384 self.cMissingGangMembers = cMissingGangMembers;
385 return self;
386
387 def initFromDbRow(self, aoRow):
388 """
389 Initialize from database row (SELECT * FROM SchedQueues).
390 Returns self.
391 Raises exception if no row is specfied.
392 """
393 if aoRow is None:
394 raise TMExceptionBase('SchedQueueData not found.');
395
396 self.idSchedGroup = aoRow[0];
397 self.idItem = aoRow[1];
398 self.offQueue = aoRow[2];
399 self.idGenTestCaseArgs = aoRow[3];
400 self.idTestGroup = aoRow[4];
401 self.aidTestGroupPreReqs = aoRow[5];
402 self.bmHourlySchedule = aoRow[6];
403 self.tsConfig = aoRow[7];
404 self.tsLastScheduled = aoRow[8];
405 self.idTestSetGangLeader = aoRow[9];
406 self.cMissingGangMembers = aoRow[10];
407 return self;
408
409
410
411
412
413
414class SchedulerBase(object):
415 """
416 The scheduler base class.
417
418 The scheduler classes have two functions:
419 1. Recreate the scheduling queue.
420 2. Pick the next task from the queue.
421
422 The first is scheduler specific, the latter isn't.
423 """
424
425 class BuildCache(object):
426 """ Build cache. """
427
428 class BuildCacheIterator(object):
429 """ Build class iterator. """
430 def __init__(self, oCache):
431 self.oCache = oCache;
432 self.iCur = 0;
433
434 def __iter__(self):
435 """Returns self, required by the language."""
436 return self;
437
438 def __next__(self):
439 """Returns the next build, raises StopIteration when the end has been reached."""
440 while True:
441 if self.iCur >= len(self.oCache.aoEntries):
442 oEntry = self.oCache.fetchFromCursor();
443 if oEntry is None:
444 raise StopIteration;
445 else:
446 oEntry = self.oCache.aoEntries[self.iCur];
447 self.iCur += 1;
448 if not oEntry.fRemoved:
449 return oEntry;
450 return None; # not reached, but make pylint happy (for now).
451
452 def next(self):
453 """ For python 2.x. """
454 return self.__next__();
455
456 class BuildCacheEntry(object):
457 """ Build cache entry. """
458
459 def __init__(self, oBuild, fMaybeBlacklisted):
460 self.oBuild = oBuild;
461 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
462 self.fRemoved = False;
463 self._dPreReqDecisions = {};
464
465 def remove(self):
466 """
467 Marks the cache entry as removed.
468 This doesn't actually remove it from the cache array, only marks
469 it as removed. It has no effect on open iterators.
470 """
471 self.fRemoved = True;
472
473 def getPreReqDecision(self, sPreReqSet):
474 """
475 Retrieves a cached prerequisite decision.
476 Returns boolean if found, None if not.
477 """
478 return self._dPreReqDecisions.get(sPreReqSet);
479
480 def setPreReqDecision(self, sPreReqSet, fDecision):
481 """
482 Caches a prerequistie decision.
483 """
484 self._dPreReqDecisions[sPreReqSet] = fDecision;
485 return fDecision;
486
487 def isBlacklisted(self, oDb):
488 """ Checks if the build is blacklisted. """
489 if self._fBlacklisted is None:
490 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
491 return self._fBlacklisted;
492
493
494 def __init__(self):
495 self.aoEntries = [];
496 self.oCursor = None;
497
498 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
499 """ Configures the build cursor for the cache. """
500 if not self.aoEntries and self.oCursor is None:
501 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
502 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
503 return True;
504
505 def __iter__(self):
506 """Return an iterator."""
507 return self.BuildCacheIterator(self);
508
509 def fetchFromCursor(self):
510 """ Fetches a build from the cursor and adds it to the cache."""
511 if self.oCursor is None:
512 return None;
513
514 try:
515 aoRow = self.oCursor.fetchOne();
516 except:
517 return None;
518 if aoRow is None:
519 return None;
520
521 oBuild = BuildDataEx().initFromDbRow(aoRow);
522 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
523 self.aoEntries.append(oEntry);
524 return oEntry;
525
526 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
527 self._oDb = oDb;
528 self._oSchedGrpData = oSchedGrpData;
529 self._iVerbosity = iVerbosity;
530 self._asMessages = [];
531 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
532 self.oBuildCache = self.BuildCache();
533 self.dTestGroupMembers = {};
534
535 @staticmethod
536 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
537 """
538 Instantiate the scheduler specified by the scheduling group.
539 Returns scheduler child class instance. May raise exception if
540 the input is invalid.
541 """
542 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinuousIntegration:
543 from testmanager.core.schedulerbeci import SchdulerBeci;
544 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
545 else:
546 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
547 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
548 return oScheduler;
549
550
551 #
552 # Misc.
553 #
554
555 def msgDebug(self, sText):
556 """Debug printing."""
557 if self._iVerbosity > 1:
558 self._asMessages.append('debug:' + sText);
559 return None;
560
561 def msgInfo(self, sText):
562 """Info printing."""
563 if self._iVerbosity > 1:
564 self._asMessages.append('info: ' + sText);
565 return None;
566
567 def dprint(self, sMsg):
568 """Prints a debug message to the srv glue log (see config.py). """
569 if config.g_kfSrvGlueDebugScheduler:
570 self._oDb.dprint(sMsg);
571 return None;
572
573 def getElapsedSecs(self):
574 """ Returns the number of seconds this scheduling task has been running. """
575 tsSecNow = utils.timestampSecond();
576 if tsSecNow < self._tsSecStart: # paranoia
577 self._tsSecStart = tsSecNow;
578 return tsSecNow - self._tsSecStart;
579
580
581 #
582 # Create schedule.
583 #
584
585 def _recreateQueueCancelGatherings(self):
586 """
587 Cancels all pending gang gatherings on the current queue.
588 """
589 self._oDb.execute('SELECT idTestSetGangLeader\n'
590 'FROM SchedQueues\n'
591 'WHERE idSchedGroup = %s\n'
592 ' AND idTestSetGangLeader is not NULL\n'
593 , (self._oSchedGrpData.idSchedGroup,));
594 if self._oDb.getRowCount() > 0:
595 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
596 for aoRow in self._oDb.fetchAll():
597 idTestSetGangLeader = aoRow[0];
598 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
599 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
600 fCommit = False);
601 return True;
602
603 def _recreateQueueItems(self, oData):
604 """
605 Returns an array of queue items (SchedQueueData).
606 Child classes must override this.
607 """
608 _ = oData;
609 return [];
610
611 def recreateQueueWorker(self):
612 """
613 Worker for recreateQueue.
614 """
615
616 #
617 # Collect the necessary data and validate it.
618 #
619 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
620 aoErrors = oData.checkForGroupDepCycles();
621 aoErrors.extend(oData.checkForMissingTestCaseDeps());
622 if not aoErrors:
623 oData.deepTestGroupSort();
624
625 #
626 # The creation of the scheduling queue is done by the child class.
627 #
628 # We will try guess where in queue we're currently at and rotate
629 # the items such that we will resume execution in the approximately
630 # same position. The goal of the scheduler is to provide a 100%
631 # deterministic result so that if we regenerate the queue when there
632 # are no changes to the testcases, testgroups or scheduling groups
633 # involved, test execution will be unchanged (save for maybe just a
634 # little for gang gathering).
635 #
636 aoItems = [];
637 if not oData.oSchedGroup.fEnabled:
638 self.msgInfo('Disabled.');
639 elif not oData.aoArgsVariations:
640 self.msgInfo('Found no test case argument variations.');
641 else:
642 aoItems = self._recreateQueueItems(oData);
643 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
644 #for i in range(len(aoItems)):
645 # self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
646 if aoItems:
647 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
648 , (self._oSchedGrpData.idSchedGroup,));
649 if self._oDb.getRowCount() > 0:
650 offQueue = self._oDb.fetchOne()[0];
651 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
652 , (self._oSchedGrpData.idSchedGroup,));
653 cItems = self._oDb.fetchOne()[0];
654 offQueueNew = (offQueue * cItems) // len(aoItems);
655 if offQueueNew != 0:
656 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
657
658 #
659 # Replace the scheduling queue.
660 # Care need to be take to first timeout/abort any gangs in the
661 # gathering state since these use the queue to set up the date.
662 #
663 self._recreateQueueCancelGatherings();
664 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
665 if aoItems:
666 self._oDb.insertList('INSERT INTO SchedQueues (\n'
667 ' idSchedGroup,\n'
668 ' offQueue,\n'
669 ' idGenTestCaseArgs,\n'
670 ' idTestGroup,\n'
671 ' aidTestGroupPreReqs,\n'
672 ' bmHourlySchedule,\n'
673 ' cMissingGangMembers )\n',
674 aoItems, self._formatItemForInsert);
675 return (aoErrors, self._asMessages);
676
677 def _formatItemForInsert(self, oItem):
678 """
679 Used by recreateQueueWorker together with TMDatabaseConnect::insertList
680 """
681 return self._oDb.formatBindArgs('(%s,%s,%s,%s,%s,%s,%s)'
682 , ( oItem.idSchedGroup,
683 oItem.offQueue,
684 oItem.idGenTestCaseArgs,
685 oItem.idTestGroup,
686 oItem.aidTestGroupPreReqs if oItem.aidTestGroupPreReqs else None,
687 oItem.bmHourlySchedule,
688 oItem.cMissingGangMembers
689 ));
690
691 @staticmethod
692 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
693 """
694 (Re-)creates the scheduling queue for the given group.
695
696 Returns (asMessages, asMessages). On success the array with the error
697 will be empty, on failure it will contain (sError, oRelatedObject)
698 entries. The messages is for debugging and are simple strings.
699
700 Raises exception database error.
701 """
702
703 aoExtraMsgs = [];
704 if oDb.debugIsExplainEnabled():
705 aoExtraMsgs += ['Warning! Disabling SQL explain to avoid deadlocking against locked tables.'];
706 oDb.debugDisableExplain();
707
708 aoErrors = [];
709 asMessages = [];
710 try:
711 #
712 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
713 # we lock quite a few tables while doing this work. We access more
714 # data than scheduleNewTask so we lock some additional tables.
715 #
716 oDb.rollback();
717 oDb.begin();
718 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
719 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
720 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
721
722 #
723 # Instantiate the scheduler and call the worker function.
724 #
725 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
726 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
727
728 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
729 if not aoErrors:
730 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
731 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
732 oDb.commit();
733 else:
734 oDb.rollback();
735
736 except:
737 oDb.rollback();
738 raise;
739
740 return (aoErrors, aoExtraMsgs + asMessages);
741
742
743 @staticmethod
744 def cleanUpOrphanedQueues(oDb):
745 """
746 Removes orphan scheduling queues from the SchedQueues table.
747
748 Queues becomes orphaned when the scheduling group they belongs to has been deleted.
749
750 Returns number of orphaned queues.
751 Raises exception database error.
752 """
753 cRet = 0;
754 try:
755 oDb.rollback();
756 oDb.begin();
757 oDb.execute('''
758SELECT SchedQueues.idSchedGroup
759FROM SchedQueues
760 LEFT OUTER JOIN SchedGroups
761 ON SchedGroups.idSchedGroup = SchedQueues.idSchedGroup
762 AND SchedGroups.tsExpire = 'infinity'::TIMESTAMP
763WHERE SchedGroups.idSchedGroup is NULL
764GROUP BY SchedQueues.idSchedGroup''');
765 aaoOrphanRows = oDb.fetchAll();
766 cRet = len(aaoOrphanRows);
767 if cRet > 0:
768 oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup IN (%s)'
769 % (','.join([str(aoRow[0]) for aoRow in aaoOrphanRows]),));
770 oDb.commit();
771 except:
772 oDb.rollback();
773 raise;
774 return cRet;
775
776
777 #
778 # Schedule Task.
779 #
780
781 def _composeGangArguments(self, idTestSet):
782 """
783 Composes the gang specific testdriver arguments.
784 Returns command line string, including a leading space.
785 """
786
787 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
788 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
789
790 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
791 for i, _ in enumerate(aoGangMembers):
792 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
793
794 return sArgs;
795
796
797 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
798 """
799 Given all the bits of data, compose an EXEC command response to the testbox.
800 """
801 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
802 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
803 assert oValidationKitBuild;
804 if sScriptZips is None:
805 sScriptZips = oValidationKitBuild.sBinaries;
806 else:
807 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
808 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
809
810 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
811 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
812 sCmdLine = sCmdLine.strip();
813 if oTestEx.cGangMembers > 1:
814 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
815
816 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
817 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout // 100;
818
819 dResponse = \
820 {
821 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
822 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
823 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
824 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
825 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
826 };
827 return dResponse;
828
829 @staticmethod
830 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
831 """
832 Composes an EXEC response for a gang member (other than the last).
833 Returns a EXEC response or raises an exception (DB/input error).
834 """
835 #
836 # Gather the necessary data.
837 #
838 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
839 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
840 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(oDb, oTestSet.idGenTestCaseArgs,
841 tsConfigEff = oTestSet.tsConfig,
842 tsRsrcEff = oTestSet.tsConfig);
843 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
844 oValidationKitBuild = None;
845 if oTestSet.idBuildTestSuite is not None:
846 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
847
848 #
849 # Instantiate the specified scheduler and let it do the rest.
850 #
851 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestSet.idSchedGroup, oTestSet.tsCreated);
852 assert oSchedGrpData.fEnabled is True;
853 assert oSchedGrpData.idBuildSrc is not None;
854 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
855
856 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
857
858
859 def _updateTask(self, oTask, tsNow):
860 """
861 Updates a gang schedule task.
862 """
863 assert oTask.cMissingGangMembers >= 1;
864 assert oTask.idTestSetGangLeader is not None;
865 assert oTask.idTestSetGangLeader >= 1;
866 if tsNow is not None:
867 self._oDb.execute('UPDATE SchedQueues\n'
868 ' SET idTestSetGangLeader = %s,\n'
869 ' cMissingGangMembers = %s,\n'
870 ' tsLastScheduled = %s\n'
871 'WHERE idItem = %s\n'
872 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
873 else:
874 self._oDb.execute('UPDATE SchedQueues\n'
875 ' SET cMissingGangMembers = %s\n'
876 'WHERE idItem = %s\n'
877 , (oTask.cMissingGangMembers, oTask.idItem,) );
878 return True;
879
880 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
881 """
882 The task has been scheduled successfully, reset it's data move it to
883 the end of the queue.
884 """
885 if cGangMembers > 1:
886 self._oDb.execute('UPDATE SchedQueues\n'
887 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
888 ' idTestSetGangLeader = NULL,\n'
889 ' cMissingGangMembers = %s\n'
890 'WHERE idItem = %s\n'
891 , (cGangMembers, oTask.idItem,) );
892 else:
893 self._oDb.execute('UPDATE SchedQueues\n'
894 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
895 ' idTestSetGangLeader = NULL,\n'
896 ' cMissingGangMembers = 1,\n'
897 ' tsLastScheduled = %s\n'
898 'WHERE idItem = %s\n'
899 , (tsNow, oTask.idItem,) );
900 return True;
901
902
903
904
905 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
906 # type: (SchedQueueData, TestCaseArgsDataEx, TestBoxData, BuildDataEx, BuildDataEx, datetime.datetime) -> int
907 """
908 Creates a test set for using the given data.
909 Will not commit, someone up the callstack will that later on.
910
911 Returns the test set ID, may raise an exception on database error.
912 """
913 # Lazy bird doesn't want to write testset.py and does it all here.
914
915 #
916 # We're getting the TestSet ID first in order to include it in the base
917 # file name (that way we can directly relate files on the disk to the
918 # test set when doing batch work), and also for idTesetSetGangLeader.
919 #
920 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
921 idTestSet = self._oDb.fetchOne()[0];
922
923 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
924 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour // 6) * 6, idTestSet);
925
926 #
927 # Gang scheduling parameters. Changes the oTask data for updating by caller.
928 #
929 iGangMemberNo = 0;
930
931 if oTestEx.cGangMembers <= 1:
932 assert oTask.idTestSetGangLeader is None;
933 assert oTask.cMissingGangMembers <= 1;
934 elif oTask.idTestSetGangLeader is None:
935 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
936 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
937 oTask.idTestSetGangLeader = idTestSet;
938 else:
939 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
940 oTask.cMissingGangMembers -= 1;
941
942 #
943 # Do the database stuff.
944 #
945 self._oDb.execute('INSERT INTO TestSets (\n'
946 ' idTestSet,\n'
947 ' tsConfig,\n'
948 ' tsCreated,\n'
949 ' idBuild,\n'
950 ' idBuildCategory,\n'
951 ' idBuildTestSuite,\n'
952 ' idGenTestBox,\n'
953 ' idTestBox,\n'
954 ' idSchedGroup,\n'
955 ' idTestGroup,\n'
956 ' idGenTestCase,\n'
957 ' idTestCase,\n'
958 ' idGenTestCaseArgs,\n'
959 ' idTestCaseArgs,\n'
960 ' sBaseFilename,\n'
961 ' iGangMemberNo,\n'
962 ' idTestSetGangLeader )\n'
963 'VALUES ( %s,\n' # idTestSet
964 ' %s,\n' # tsConfig
965 ' %s,\n' # tsCreated
966 ' %s,\n' # idBuild
967 ' %s,\n' # idBuildCategory
968 ' %s,\n' # idBuildTestSuite
969 ' %s,\n' # idGenTestBox
970 ' %s,\n' # idTestBox
971 ' %s,\n' # idSchedGroup
972 ' %s,\n' # idTestGroup
973 ' %s,\n' # idGenTestCase
974 ' %s,\n' # idTestCase
975 ' %s,\n' # idGenTestCaseArgs
976 ' %s,\n' # idTestCaseArgs
977 ' %s,\n' # sBaseFilename
978 ' %s,\n' # iGangMemberNo
979 ' %s)\n' # idTestSetGangLeader
980 , ( idTestSet,
981 oTask.tsConfig,
982 tsNow,
983 oBuild.idBuild,
984 oBuild.idBuildCategory,
985 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
986 oTestBoxData.idGenTestBox,
987 oTestBoxData.idTestBox,
988 oTask.idSchedGroup,
989 oTask.idTestGroup,
990 oTestEx.oTestCase.idGenTestCase,
991 oTestEx.oTestCase.idTestCase,
992 oTestEx.idGenTestCaseArgs,
993 oTestEx.idTestCaseArgs,
994 sBaseFilename,
995 iGangMemberNo,
996 oTask.idTestSetGangLeader,
997 ));
998
999 self._oDb.execute('INSERT INTO TestResults (\n'
1000 ' idTestResultParent,\n'
1001 ' idTestSet,\n'
1002 ' tsCreated,\n'
1003 ' idStrName,\n'
1004 ' cErrors,\n'
1005 ' enmStatus,\n'
1006 ' iNestingDepth)\n'
1007 'VALUES ( NULL,\n' # idTestResultParent
1008 ' %s,\n' # idTestSet
1009 ' %s,\n' # tsCreated
1010 ' 0,\n' # idStrName
1011 ' 0,\n' # cErrors
1012 ' \'running\'::TestStatus_T,\n'
1013 ' 0)\n' # iNestingDepth
1014 'RETURNING idTestResult'
1015 , ( idTestSet, tsNow, ));
1016 idTestResult = self._oDb.fetchOne()[0];
1017
1018 self._oDb.execute('UPDATE TestSets\n'
1019 ' SET idTestResult = %s\n'
1020 'WHERE idTestSet = %s\n'
1021 , (idTestResult, idTestSet, ));
1022
1023 return idTestSet;
1024
1025 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
1026 """
1027 Tries to find the most recent validation kit build suitable for the given testbox.
1028 Returns BuildDataEx or None. Raise exception on database error.
1029
1030 Can be overridden by child classes to change the default build requirements.
1031 """
1032 oBuildLogic = BuildLogic(self._oDb);
1033 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
1034 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1035 for _ in range(oCursor.getRowCount()):
1036 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
1037 if not oBuildLogic.isBuildBlacklisted(oBuild):
1038 return oBuild;
1039 return None;
1040
1041 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
1042 """
1043 Tries to find a fitting build.
1044 Returns BuildDataEx or None. Raise exception on database error.
1045
1046 Can be overridden by child classes to change the default build requirements.
1047 """
1048
1049 #
1050 # Gather the set of prerequisites we have and turn them into a value
1051 # set for use in the loop below.
1052 #
1053 # Note! We're scheduling on testcase level and ignoring argument variation
1054 # selections in TestGroupMembers is intentional.
1055 #
1056 dPreReqs = {};
1057
1058 # Direct prerequisites. We assume they're all enabled as this can be
1059 # checked at queue creation time.
1060 for oPreReq in oTestEx.aoTestCasePreReqs:
1061 dPreReqs[oPreReq.idTestCase] = 1;
1062
1063 # Testgroup dependencies from the scheduling group config.
1064 if oTask.aidTestGroupPreReqs is not None:
1065 for iTestGroup in oTask.aidTestGroupPreReqs:
1066 # Make sure the _active_ test group members are in the cache.
1067 if iTestGroup not in self.dTestGroupMembers:
1068 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
1069 'FROM TestGroupMembers, TestCases\n'
1070 'WHERE TestGroupMembers.idTestGroup = %s\n'
1071 ' AND TestGroupMembers.tsExpire > %s\n'
1072 ' AND TestGroupMembers.tsEffective <= %s\n'
1073 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
1074 ' AND TestCases.tsExpire > %s\n'
1075 ' AND TestCases.tsEffective <= %s\n'
1076 ' AND TestCases.fEnabled is TRUE\n'
1077 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
1078 aidTestCases = [];
1079 for aoRow in self._oDb.fetchAll():
1080 aidTestCases.append(aoRow[0]);
1081 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1082
1083 # Add the testgroup members to the prerequisites.
1084 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1085 dPreReqs[idTestCase] = 1;
1086
1087 # Create a SQL values table out of them.
1088 sPreReqSet = ''
1089 if dPreReqs:
1090 for idPreReq in sorted(dPreReqs):
1091 sPreReqSet += ', (' + str(idPreReq) + ')';
1092 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1093
1094 #
1095 # Try the builds.
1096 #
1097 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1098 for oEntry in self.oBuildCache:
1099 #
1100 # Check build requirements set by the test.
1101 #
1102 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1103 continue;
1104
1105 if oEntry.isBlacklisted(self._oDb):
1106 oEntry.remove();
1107 continue;
1108
1109 #
1110 # Check prerequisites. The default scheduler is satisfied if one
1111 # argument variation has been executed successfully. It is not
1112 # satisfied if there are any failure runs.
1113 #
1114 if sPreReqSet:
1115 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1116 if fDecision is None:
1117 # Check for missing prereqs.
1118 self._oDb.execute('SELECT COUNT(*)\n'
1119 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1120 'LEFT OUTER JOIN (SELECT idTestSet\n'
1121 ' FROM TestSets\n'
1122 ' WHERE enmStatus IN (%s, %s)\n'
1123 ' AND idBuild = %s\n'
1124 ' ) AS TestSets\n'
1125 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1126 'WHERE TestSets.idTestSet is NULL\n'
1127 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1128 oEntry.oBuild.idBuild, ));
1129 cMissingPreReqs = self._oDb.fetchOne()[0];
1130 if cMissingPreReqs > 0:
1131 self.dprint('build %s is missing %u prerequisites (out of %s)'
1132 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1133 oEntry.setPreReqDecision(sPreReqSet, False);
1134 continue;
1135
1136 # Check for failed prereq runs.
1137 self._oDb.execute('SELECT COUNT(*)\n'
1138 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1139 ' TestSets\n'
1140 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1141 ' AND TestSets.idBuild = %s\n'
1142 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1143 , ( oEntry.oBuild.idBuild,
1144 TestSetData.ksTestStatus_Failure,
1145 TestSetData.ksTestStatus_TimedOut,
1146 TestSetData.ksTestStatus_Rebooted,
1147 )
1148 );
1149 cFailedPreReqs = self._oDb.fetchOne()[0];
1150 if cFailedPreReqs > 0:
1151 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1152 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1153 oEntry.setPreReqDecision(sPreReqSet, False);
1154 continue;
1155
1156 oEntry.setPreReqDecision(sPreReqSet, True);
1157 elif not fDecision:
1158 continue;
1159
1160 #
1161 # If we can, check if the build files still exist.
1162 #
1163 if oEntry.oBuild.areFilesStillThere() is False:
1164 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1165 oEntry.remove();
1166 continue;
1167
1168 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1169 return oEntry.oBuild;
1170 return None;
1171
1172 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1173 """
1174 Tries to find a matching build for gang scheduling.
1175 Returns BuildDataEx or None. Raise exception on database error.
1176
1177 Can be overridden by child classes to change the default build requirements.
1178 """
1179 #
1180 # Note! Should probably check build prerequisites if we get a different
1181 # build back, so that we don't use a build which hasn't passed
1182 # the smoke test.
1183 #
1184 _ = idBuildSrc;
1185 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1186
1187
1188 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1189 """
1190 Try schedule the task as a gang leader (can be a gang of one).
1191 Returns response or None. May raise exception on DB error.
1192 """
1193
1194 # We don't wait for busy resources, we just try the next test.
1195 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1196 if not oTestArgsLogic.areResourcesFree(oTestEx):
1197 self.dprint('Cannot get global test resources!');
1198 return None;
1199
1200 #
1201 # Find a matching build (this is the difficult bit).
1202 #
1203 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1204 if oBuild is None:
1205 self.dprint('No build!');
1206 return None;
1207 if oTestEx.oTestCase.needValidationKitBit():
1208 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1209 if oValidationKitBuild is None:
1210 self.dprint('No validation kit build!');
1211 return None;
1212 else:
1213 oValidationKitBuild = None;
1214
1215 #
1216 # Create a testset, allocate the resources and update the state.
1217 # Note! Since resource allocation may still fail, we create a nested
1218 # transaction so we can roll back. (Heed lock warning in docs!)
1219 #
1220 self._oDb.execute('SAVEPOINT tryAsLeader');
1221 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1222
1223 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1224 is not True:
1225 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1226 self.dprint('Failed to allocate global resources!');
1227 return False;
1228
1229 if oTestEx.cGangMembers <= 1:
1230 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1231 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1232 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1233 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1234 else:
1235 # We're missing gang members, issue WAIT cmd.
1236 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1237 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1238 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1239
1240 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1241 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1242 return dResponse;
1243
1244 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1245 """
1246 Try schedule the task as a gang member.
1247 Returns response or None. May raise exception on DB error.
1248 """
1249
1250 #
1251 # The leader has choosen a build, we need to find a matching one for our platform.
1252 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1253 # upon subordinate group members.)
1254 #
1255 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1256
1257 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1258 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1259 if oBuild is None:
1260 return None;
1261
1262 oValidationKitBuild = None;
1263 if oLeaderTestSet.idBuildTestSuite is not None:
1264 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1265 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1266 self._oSchedGrpData.idBuildSrcTestSuite);
1267
1268 #
1269 # Create a testset and update the state(s).
1270 #
1271 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1272
1273 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1274 if oTask.cMissingGangMembers < 1:
1275 # The whole gang is there, move the task to the end of the queue
1276 # and update the status on the other gang members.
1277 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1278 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1279 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1280 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1281 else:
1282 # We're still missing some gang members, issue WAIT cmd.
1283 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1284 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1285 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1286
1287 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1288 return dResponse;
1289
1290
1291 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1292 """
1293 Worker for schduling a new task.
1294 """
1295
1296 #
1297 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1298 # queries), trying out each task to see if the testbox can execute it.
1299 #
1300 dRejected = {}; # variations we've already checked out and rejected.
1301 self._oDb.execute('SELECT *\n'
1302 'FROM SchedQueues\n'
1303 'WHERE idSchedGroup = %s\n'
1304 ' AND ( bmHourlySchedule IS NULL\n'
1305 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1306 'ORDER BY idItem ASC\n'
1307 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1308 aaoRows = self._oDb.fetchAll();
1309 for aoRow in aaoRows:
1310 # Don't loop forever.
1311 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1312 break;
1313
1314 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1315 oTask = SchedQueueData().initFromDbRow(aoRow);
1316 if config.g_kfSrvGlueDebugScheduler:
1317 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1318 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1319 oTask.tsLastScheduled, oTask.tsConfig,));
1320
1321 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1322 if sRejectNm in dRejected:
1323 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1324 continue;
1325 dRejected[sRejectNm] = 1;
1326
1327 # Fetch all the test case info (too much, but who cares right now).
1328 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1329 tsConfigEff = oTask.tsConfig,
1330 tsRsrcEff = oTask.tsConfig);
1331 if config.g_kfSrvGlueDebugScheduler:
1332 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1333
1334 # This shouldn't happen, but just in case it does...
1335 if oTestEx.oTestCase.fEnabled is not True:
1336 self.dprint('Testcase is not enabled!!');
1337 continue;
1338
1339 # Check if the testbox properties matches the test.
1340 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1341 self.dprint('Testbox mismatch!');
1342 continue;
1343
1344 # Try schedule it.
1345 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1346 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1347 elif oTask.cMissingGangMembers > 1:
1348 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1349 else:
1350 dResponse = None; # Shouldn't happen!
1351 if dResponse is not None:
1352 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1353 return dResponse;
1354
1355 # Found no suitable task.
1356 return None;
1357
1358 @staticmethod
1359 def _pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds):
1360 """
1361 Picks the next scheduling group for the given testbox.
1362 """
1363 if len(oTestBoxDataEx.aoInSchedGroups) == 1:
1364 oSchedGroup = oTestBoxDataEx.aoInSchedGroups[0].oSchedGroup;
1365 if oSchedGroup.fEnabled \
1366 and oSchedGroup.idBuildSrc is not None \
1367 and oSchedGroup.idSchedGroup not in dIgnoreSchedGroupIds:
1368 return (oSchedGroup, 0);
1369 iWorkItem = 0;
1370
1371 elif oTestBoxDataEx.aoInSchedGroups:
1372 # Construct priority table of currently enabled scheduling groups.
1373 aaoList1 = [];
1374 for oInGroup in oTestBoxDataEx.aoInSchedGroups:
1375 oSchedGroup = oInGroup.oSchedGroup;
1376 if oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None:
1377 iSchedPriority = oInGroup.iSchedPriority;
1378 if iSchedPriority > 31: # paranoia
1379 iSchedPriority = 31;
1380 elif iSchedPriority < 0: # paranoia
1381 iSchedPriority = 0;
1382
1383 for iSchedPriority in xrange(min(iSchedPriority, len(aaoList1))):
1384 aaoList1[iSchedPriority].append(oSchedGroup);
1385 while len(aaoList1) <= iSchedPriority:
1386 aaoList1.append([oSchedGroup,]);
1387
1388 # Flatten it into a single list, mixing the priorities a little so it doesn't
1389 # take forever before low priority stuff is executed.
1390 aoFlat = [];
1391 iLo = 0;
1392 iHi = len(aaoList1) - 1;
1393 while iHi >= iLo:
1394 aoFlat += aaoList1[iHi];
1395 if iLo < iHi:
1396 aoFlat += aaoList1[iLo];
1397 iLo += 1;
1398 iHi -= 1;
1399
1400 # Pick the next one.
1401 cLeft = len(aoFlat);
1402 while cLeft > 0:
1403 cLeft -= 1;
1404 iWorkItem += 1;
1405 if iWorkItem >= len(aoFlat) or iWorkItem < 0:
1406 iWorkItem = 0;
1407 if aoFlat[iWorkItem].idSchedGroup not in dIgnoreSchedGroupIds:
1408 return (aoFlat[iWorkItem], iWorkItem);
1409 else:
1410 iWorkItem = 0;
1411
1412 # No active group.
1413 return (None, iWorkItem);
1414
1415 @staticmethod
1416 def scheduleNewTask(oDb, oTestBoxData, iWorkItem, sBaseUrl, iVerbosity = 0):
1417 # type: (TMDatabaseConnection, TestBoxData, int, str, int) -> None
1418 """
1419 Schedules a new task for a testbox.
1420 """
1421 oTBStatusLogic = TestBoxStatusLogic(oDb);
1422
1423 try:
1424 #
1425 # To avoid concurrency issues in SchedQueues we lock all the rows
1426 # related to our scheduling queue. Also, since this is a very
1427 # expensive operation we lock the testbox status row to fend of
1428 # repeated retires by faulty testbox scripts.
1429 #
1430 tsSecStart = utils.timestampSecond();
1431 oDb.rollback();
1432 oDb.begin();
1433 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1434 % (oTestBoxData.idTestBox,));
1435 oDb.execute('SELECT SchedQueues.idSchedGroup\n'
1436 ' FROM SchedQueues, TestBoxesInSchedGroups\n'
1437 'WHERE TestBoxesInSchedGroups.idTestBox = %s\n'
1438 ' AND TestBoxesInSchedGroups.tsExpire = \'infinity\'::TIMESTAMP\n'
1439 ' AND TestBoxesInSchedGroups.idSchedGroup = SchedQueues.idSchedGroup\n'
1440 ' FOR UPDATE'
1441 % (oTestBoxData.idTestBox,));
1442
1443 # We need the current timestamp.
1444 tsNow = oDb.getCurrentTimestamp();
1445
1446 # Re-read the testbox data with scheduling group relations.
1447 oTestBoxDataEx = TestBoxDataEx().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1448 if oTestBoxDataEx.fEnabled \
1449 and oTestBoxDataEx.idGenTestBox == oTestBoxData.idGenTestBox:
1450
1451 # We may have to skip scheduling groups that are out of work (e.g. 'No build').
1452 iInitialWorkItem = iWorkItem;
1453 dIgnoreSchedGroupIds = {};
1454 while True:
1455 # Now, pick the scheduling group.
1456 (oSchedGroup, iWorkItem) = SchedulerBase._pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds);
1457 if oSchedGroup is None:
1458 break;
1459 assert oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None;
1460
1461 # Instantiate the specified scheduler and let it do the rest.
1462 oScheduler = SchedulerBase._instantiate(oDb, oSchedGroup, iVerbosity, tsSecStart);
1463 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataEx, tsNow, sBaseUrl);
1464 if dResponse is not None:
1465 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1466 oDb.commit();
1467 return dResponse;
1468
1469 # Check out the next work item?
1470 if oScheduler.getElapsedSecs() > config.g_kcSecMaxNewTask:
1471 break;
1472 dIgnoreSchedGroupIds[oSchedGroup.idSchedGroup] = oSchedGroup;
1473
1474 # No luck, but best if we update the work item if we've made progress.
1475 # Note! In case of a config.g_kcSecMaxNewTask timeout, this may accidentally skip
1476 # a work item with actually work to do. But that's a small price to pay.
1477 if iWorkItem != iInitialWorkItem:
1478 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1479 oDb.commit();
1480 return None;
1481 except:
1482 oDb.rollback();
1483 raise;
1484
1485 # Not enabled, rollback and return no task.
1486 oDb.rollback();
1487 return None;
1488
1489 @staticmethod
1490 def tryCancelGangGathering(oDb, oStatusData):
1491 """
1492 Try canceling a gang gathering.
1493
1494 Returns True if successfully cancelled.
1495 Returns False if not (someone raced us to the SchedQueue table).
1496
1497 Note! oStatusData is re-initialized.
1498 """
1499 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1500 try:
1501 #
1502 # Lock the tables we're updating so we don't run into concurrency
1503 # issues (we're racing both scheduleNewTask and other callers of
1504 # this method).
1505 #
1506 oDb.rollback();
1507 oDb.begin();
1508 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1509
1510 #
1511 # Re-read the testbox data and check that we're still in the same state.
1512 #
1513 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1514 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1515 #
1516 # Get the leader thru the test set and change the state of the whole gang.
1517 #
1518 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1519
1520 oTBStatusLogic = TestBoxStatusLogic(oDb);
1521 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1522 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1523 fCommit = False);
1524
1525 #
1526 # Move the scheduling queue item to the end.
1527 #
1528 oDb.execute('SELECT *\n'
1529 'FROM SchedQueues\n'
1530 'WHERE idTestSetGangLeader = %s\n'
1531 , (oTestSetData.idTestSetGangLeader,) );
1532 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1533 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(oDb, oTask.idGenTestCaseArgs,
1534 tsConfigEff = oTask.tsConfig,
1535 tsRsrcEff = oTask.tsConfig);
1536 oDb.execute('UPDATE SchedQueues\n'
1537 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1538 ' idTestSetGangLeader = NULL,\n'
1539 ' cMissingGangMembers = %s\n'
1540 'WHERE idItem = %s\n'
1541 , (oTestEx.cGangMembers, oTask.idItem,) );
1542
1543 oDb.commit();
1544 return True;
1545
1546 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1547 oDb.rollback();
1548 return True;
1549 except:
1550 oDb.rollback();
1551 raise;
1552
1553 # Not enabled, rollback and return no task.
1554 oDb.rollback();
1555 return False;
1556
1557
1558#
1559# Unit testing.
1560#
1561
1562# pylint: disable=missing-docstring
1563class SchedQueueDataTestCase(ModelDataBaseTestCase):
1564 def setUp(self):
1565 self.aoSamples = [SchedQueueData(),];
1566
1567if __name__ == '__main__':
1568 unittest.main();
1569 # not reached.
1570
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette