VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 61478

Last change on this file since 61478 was 61474, checked in by vboxsync, 9 years ago

testmanager: Preparting TestSets for TestBoxes belonging to more than one scheduling group by storing the idSchedGroup in TestSets as well (this has a slight speed up effect on grouping results by scheduling group too of course). The idSchedGroup column in TestBoxes will be changed into a M:N table later some time. Also corrected a typo regarding orphaned tests.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 62.4 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 61474 2016-06-05 21:02:01Z vboxsync $
3# pylint: disable=C0302
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2015 Oracle Corporation
13
14This file is part of VirtualBox Open Source Edition (OSE), as
15available from http://www.virtualbox.org. This file is free software;
16you can redistribute it and/or modify it under the terms of the GNU
17General Public License (GPL) as published by the Free Software
18Foundation, in version 2 as it comes in the "COPYING" file of the
19VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21
22The contents of this file may alternatively be used under the terms
23of the Common Development and Distribution License Version 1.0
24(CDDL) only, as it comes in the "COPYING.CDDL" file of the
25VirtualBox OSE distribution, in which case the provisions of the
26CDDL are applicable instead of those of the GPL.
27
28You may elect to license modified versions of this file under the
29terms and conditions of either the GPL or the CDDL or both.
30"""
31__version__ = "$Revision: 61474 $"
32
33
34# Standard python imports.
35import unittest;
36
37# Validation Kit imports.
38from common import utils, constants;
39from testmanager import config;
40from testmanager.core.build import BuildDataEx, BuildLogic;
41from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
42from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
43from testmanager.core.globalresource import GlobalResourceLogic;
44from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
45from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
46from testmanager.core.testbox import TestBoxData;
47from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
48from testmanager.core.testcase import TestCaseLogic;
49from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
50from testmanager.core.testset import TestSetData, TestSetLogic;
51
52
53class ReCreateQueueData(object):
54 """
55 Data object for recreating a scheduling queue.
56
57 It's mostly a storage object, but has a few data checking operation
58 associated with it.
59 """
60
61 def __init__(self, oDb, idSchedGroup):
62 #
63 # Load data from the database.
64 #
65
66 # Will extend the entries with aoTestCases and dTestCases members
67 # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
68 self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
69
70 # aoTestCases entries are TestCaseData instance with iSchedPriority
71 # and idTestGroup added for our purposes.
72 # We will add oTestGroup and aoArgsVariations members to each further down.
73 self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
74
75 # Load dependencies.
76 oTestCaseLogic = TestCaseLogic(oDb)
77 for oTestCase in self.aoTestCases:
78 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
79
80 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
81 # and idTestGroup added for our purposes.
82 # We will add oTestGroup and oTestCase members to each further down.
83 self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
84
85 #
86 # Generate global lookups.
87 #
88
89 # Generate a testcase lookup dictionary for use when working on
90 # argument variations.
91 self.dTestCases = dict();
92 for oTestCase in self.aoTestCases:
93 self.dTestCases[oTestCase.idTestCase] = oTestCase;
94 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
95
96 # Generate a testgroup lookup dictionary.
97 self.dTestGroups = dict();
98 for oTestGroup in self.aoTestGroups:
99 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
100 assert len(self.dTestGroups) == len(self.aoTestGroups);
101
102 #
103 # Associate extra members with the base data.
104 #
105 if len(self.aoTestGroups) > 0:
106 # Prep the test groups.
107 for oTestGroup in self.aoTestGroups:
108 oTestGroup.aoTestCases = list();
109 oTestGroup.dTestCases = dict();
110
111 # Link testcases to their group, both directions. Prep testcases for
112 # argument varation association.
113 oTestGroup = self.aoTestGroups[0];
114 for oTestCase in self.aoTestCases:
115 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
116 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
117
118 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
119 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
120 oTestGroup.aoTestCases.append(oTestCase);
121 oTestCase.oTestGroup = oTestGroup;
122 oTestCase.aoArgsVariations = list();
123
124 # Associate testcase argument variations with their testcases (group)
125 # in both directions.
126 oTestGroup = self.aoTestGroups[0];
127 oTestCase = self.aoTestCases[0] if len(self.aoTestCases) > 0 else None;
128 for oArgVariation in self.aoArgsVariations:
129 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
130 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
131 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
132 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
133
134 oTestCase.aoArgsVariations.append(oArgVariation);
135 oArgVariation.oTestCase = oTestCase;
136 oArgVariation.oTestGroup = oTestGroup;
137
138 else:
139 assert len(self.aoTestCases) == 0;
140 assert len(self.aoArgsVariations) == 0;
141 # done.
142
143 @staticmethod
144 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
145 """ Returns a chain of IDs error entry. """
146
147 sMsg += ' Dependency chain: %s' % (aidChain[0],);
148 for i in range(1, len(aidChain)):
149 sMsg += ' -> %s' % (aidChain[i],);
150
151 aoErrors.append([sMsg, oObj]);
152 return aoErrors;
153
154 def checkForGroupDepCycles(self):
155 """
156 Checks for testgroup depencency cycles and any missing testgroup
157 dependencies.
158 Returns array of errors (see SchedulderBase.recreateQueue()).
159 """
160 aoErrors = list();
161 for oTestGroup in self.aoTestGroups:
162 idPreReq = oTestGroup.idTestGroupPreReq;
163 if idPreReq is None:
164 oTestGroup.aidTestGroupPreReqs = list();
165 continue;
166
167 aidChain = [oTestGroup.idTestGroup,];
168 while idPreReq is not None:
169 aidChain.append(idPreReq);
170 if len(aidChain) >= 10:
171 self._addPreReqError(aoErrors, aidChain, oTestGroup,
172 'TestGroup #%s prerequisite chain is too long!'
173 % (oTestGroup.idTestGroup,));
174 break;
175
176 oDep = self.dTestGroups.get(idPreReq, None);
177 if oDep is None:
178 self._addPreReqError(aoErrors, aidChain, oTestGroup,
179 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
180 % (oTestGroup.idTestGroup, idPreReq,));
181 break;
182
183 idPreReq = oDep.idTestGroupPreReq;
184 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
185
186 return aoErrors;
187
188
189 def checkForMissingTestCaseDeps(self):
190 """
191 Checks that testcase dependencies stays within bounds. We do not allow
192 dependencies outside a testgroup, no dependency cycles or even remotely
193 long dependency chains.
194
195 Returns array of errors (see SchedulderBase.recreateQueue()).
196 """
197 aoErrors = list();
198 for oTestGroup in self.aoTestGroups:
199 for oTestCase in oTestGroup.aoTestCases:
200 if len(oTestCase.aidPreReqs) == 0:
201 continue;
202
203 # Stupid recursion code using special stack(s).
204 aiIndexes = [(oTestCase, 0), ];
205 aidChain = [oTestCase.idTestGroup,];
206 while len(aiIndexes) > 0:
207 (oCur, i) = aiIndexes[-1];
208 if i >= len(oCur.aidPreReqs):
209 aiIndexes.pop();
210 aidChain.pop();
211 else:
212 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
213
214 idPreReq = oTestCase.aidPreReqs[i];
215 oDep = oTestGroup.dTestCases.get(idPreReq, None);
216 if oDep is None:
217 self._addPreReqError(aoErrors, aidChain, oTestCase,
218 'TestCase #%s prerequisite #%s is not in the scheduling group!'
219 % (oTestCase.idTestCase, idPreReq));
220 elif idPreReq in aidChain:
221 self._addPreReqError(aoErrors, aidChain, oTestCase,
222 'TestCase #%s prerequisite #%s creates a cycle!'
223 % (oTestCase.idTestCase, idPreReq));
224 elif len(oDep.aiPreReqs) == 0:
225 pass;
226 elif len(aidChain) >= 10:
227 self._addPreReqError(aoErrors, aidChain, oTestCase,
228 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
229 else:
230 aiIndexes.append((oDep, 0));
231 aidChain.append(idPreReq);
232
233 return aoErrors;
234
235 def deepTestGroupSort(self):
236 """
237 Sorts the testgroups and their testcases by priority and dependencies.
238 Note! Don't call this before checking for dependency cycles!
239 """
240 if len(self.aoTestGroups) == 0:
241 return;
242
243 #
244 # ASSUMES groups as well as testcases are sorted by priority by the
245 # database. So we only have to concern ourselves with the dependency
246 # sorting.
247 #
248 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
249 for oTestGroup in self.aoTestGroups:
250 if oTestGroup.iSchedPriority > iGrpPrio:
251 raise TMExceptionBase('Incorrectly sorted testgroups returned by database.');
252 iGrpPrio = oTestGroup.iSchedPriority;
253
254 if len(oTestGroup.aoTestCases) > 0:
255 iTstPrio = oTestGroup.aoTestCases[0];
256 for oTestCase in oTestGroup.aoTestCases:
257 if oTestCase.iSchedPriority > iTstPrio:
258 raise TMExceptionBase('Incorrectly sorted testcases returned by database.');
259
260 #
261 # Sort the testgroups by dependencies.
262 #
263 i = 0;
264 while i < len(self.aoTestGroups):
265 oTestGroup = self.aoTestGroups[i];
266 if oTestGroup.idTestGroupPreReq is not None:
267 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
268 if iPreReq > i:
269 # The prerequisite is after the current entry. Move the
270 # current entry so that it's following it's prereq entry.
271 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
272 self.aoTestGroups.pop(i);
273 continue;
274 assert iPreReq < i;
275 i += 1; # Advance.
276
277 #
278 # Sort the testcases by dependencies.
279 # Same algorithm as above, just more prerequisites.
280 #
281 for oTestGroup in self.aoTestGroups:
282 i = 0;
283 while i < len(oTestGroup.aoTestCases):
284 oTestCase = oTestGroup.aoTestCases[i];
285 if len(oTestCase.aidPreReqs) > 0:
286 for idPreReq in oTestCase.aidPreReqs:
287 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
288 if iPreReq > i:
289 # The prerequisite is after the current entry. Move the
290 # current entry so that it's following it's prereq entry.
291 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
292 oTestGroup.aoTestGroups.pop(i);
293 i -= 1; # Don't advance.
294 break;
295 assert iPreReq < i;
296 i += 1; # Advance.
297
298
299 return True;
300
301
302
303class SchedQueueData(ModelDataBase):
304 """
305 Scheduling queue data item.
306 """
307
308 ksIdAttr = 'idSchedGroup';
309
310 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
311 ksParam_idItem = 'SchedQueueData_idItem';
312 ksParam_offQueue = 'SchedQueueData_offQueue';
313 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
314 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
315 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
316 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
317 ksParam_tsConfig = 'SchedQueueData_tsConfig';
318 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
319 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
320 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
321
322 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
323 'tsConfig', 'tsLastScheduled' ];
324
325
326 def __init__(self):
327 ModelDataBase.__init__(self);
328
329 #
330 # Initialize with defaults.
331 # See the database for explanations of each of these fields.
332 #
333 self.idSchedGroup = None;
334 self.idItem = None;
335 self.offQueue = None;
336 self.idGenTestCaseArgs = None;
337 self.idTestGroup = None;
338 self.aidTestGroupPreReqs = None;
339 self.bmHourlySchedule = None;
340 self.tsConfig = None;
341 self.tsLastScheduled = None;
342 self.idTestSetGangLeader = None;
343 self.cMissingGangMembers = 1;
344
345 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=R0913
346 bmHourlySchedule, cMissingGangMembers,
347 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
348 """
349 Reinitialize with all attributes potentially given as inputs.
350 Return self.
351 """
352 self.idSchedGroup = idSchedGroup;
353 self.idItem = idItem;
354 self.offQueue = offQueue;
355 self.idGenTestCaseArgs = idGenTestCaseArgs;
356 self.idTestGroup = idTestGroup;
357 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
358 self.bmHourlySchedule = bmHourlySchedule;
359 self.tsConfig = tsConfig;
360 self.tsLastScheduled = tsLastScheduled;
361 self.idTestSetGangLeader = idTestSetGangLeader;
362 self.cMissingGangMembers = cMissingGangMembers;
363 return self;
364
365 def initFromDbRow(self, aoRow):
366 """
367 Initialize from database row (SELECT * FROM SchedQueues).
368 Returns self.
369 Raises exception if no row is specfied.
370 """
371 if aoRow is None:
372 raise TMExceptionBase('SchedQueueData not found.');
373
374 self.idSchedGroup = aoRow[0];
375 self.idItem = aoRow[1];
376 self.offQueue = aoRow[2];
377 self.idGenTestCaseArgs = aoRow[3];
378 self.idTestGroup = aoRow[4];
379 self.aidTestGroupPreReqs = aoRow[5];
380 self.bmHourlySchedule = aoRow[6];
381 self.tsConfig = aoRow[7];
382 self.tsLastScheduled = aoRow[8];
383 self.idTestSetGangLeader = aoRow[9];
384 self.cMissingGangMembers = aoRow[10];
385 return self;
386
387
388
389
390
391
392class SchedulerBase(object):
393 """
394 The scheduler base class.
395
396 The scheduler classes have two functions:
397 1. Recreate the scheduling queue.
398 2. Pick the next task from the queue.
399
400 The first is scheduler specific, the latter isn't.
401 """
402
403 class BuildCache(object):
404 """ Build cache. """
405
406 class BuildCacheIterator(object):
407 """ Build class iterator. """
408 def __init__(self, oCache):
409 self.oCache = oCache;
410 self.iCur = 0;
411
412 def __iter__(self):
413 """Returns self, required by the language."""
414 return self;
415
416 def next(self):
417 """Returns the next build, raises StopIteration when the end has been reached."""
418 while True:
419 if self.iCur >= len(self.oCache.aoEntries):
420 oEntry = self.oCache.fetchFromCursor();
421 if oEntry is None:
422 raise StopIteration;
423 else:
424 oEntry = self.oCache.aoEntries[self.iCur];
425 self.iCur += 1;
426 if not oEntry.fRemoved:
427 return oEntry;
428 # end
429
430 class BuildCacheEntry(object):
431 """ Build cache entry. """
432
433 def __init__(self, oBuild, fMaybeBlacklisted):
434 self.oBuild = oBuild;
435 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
436 self.fRemoved = False;
437 self._dPreReqDecisions = dict();
438
439 def remove(self):
440 """
441 Marks the cache entry as removed.
442 This doesn't actually remove it from the cache array, only marks
443 it as removed. It has no effect on open iterators.
444 """
445 self.fRemoved = True;
446
447 def getPreReqDecision(self, sPreReqSet):
448 """
449 Retrieves a cached prerequisite decision.
450 Returns boolean if found, None if not.
451 """
452 return self._dPreReqDecisions.get(sPreReqSet);
453
454 def setPreReqDecision(self, sPreReqSet, fDecision):
455 """
456 Caches a prerequistie decision.
457 """
458 self._dPreReqDecisions[sPreReqSet] = fDecision;
459 return fDecision;
460
461 def isBlacklisted(self, oDb):
462 """ Checks if the build is blacklisted. """
463 if self._fBlacklisted is None:
464 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
465 return self._fBlacklisted;
466
467
468 def __init__(self):
469 self.aoEntries = [];
470 self.oCursor = None;
471
472 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
473 """ Configures the build cursor for the cache. """
474 if len(self.aoEntries) == 0 and self.oCursor is None:
475 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
476 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
477 return True;
478
479 def __iter__(self):
480 """Return an iterator."""
481 return self.BuildCacheIterator(self);
482
483 def fetchFromCursor(self):
484 """ Fetches a build from the cursor and adds it to the cache."""
485 if self.oCursor is None:
486 return None;
487
488 try:
489 aoRow = self.oCursor.fetchOne();
490 except:
491 return None;
492 if aoRow is None:
493 return None;
494
495 oBuild = BuildDataEx().initFromDbRow(aoRow);
496 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
497 self.aoEntries.append(oEntry);
498 return oEntry;
499
500 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
501 self._oDb = oDb;
502 self._oSchedGrpData = oSchedGrpData;
503 self._iVerbosity = iVerbosity;
504 self._asMessages = [];
505 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
506 self.oBuildCache = self.BuildCache();
507 self.dTestGroupMembers = dict();
508
509 @staticmethod
510 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
511 """
512 Instantiate the scheduler specified by the scheduling group.
513 Returns scheduler child class instance. May raise exception if
514 the input is invalid.
515 """
516 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinousItegration:
517 from testmanager.core.schedulerbeci import SchdulerBeci;
518 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
519 else:
520 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
521 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
522 return oScheduler;
523
524
525 #
526 # Misc.
527 #
528
529 def msgDebug(self, sText):
530 """Debug printing."""
531 if self._iVerbosity > 1:
532 self._asMessages.append('debug:' + sText);
533 return None;
534
535 def msgInfo(self, sText):
536 """Info printing."""
537 if self._iVerbosity > 1:
538 self._asMessages.append('info: ' + sText);
539 return None;
540
541 def dprint(self, sMsg):
542 """Prints a debug message to the srv glue log (see config.py). """
543 if config.g_kfSrvGlueDebugScheduler:
544 self._oDb.dprint(sMsg);
545 return None;
546
547 def getElapsedSecs(self):
548 """ Returns the number of seconds this scheduling task has been running. """
549 tsSecNow = utils.timestampSecond();
550 if tsSecNow < self._tsSecStart: # paranoia
551 self._tsSecStart = tsSecNow;
552 return tsSecNow - self._tsSecStart;
553
554
555 #
556 # Create schedule.
557 #
558
559 def _recreateQueueCancelGatherings(self):
560 """
561 Cancels all pending gang gatherings on the current queue.
562 """
563 self._oDb.execute('SELECT idTestSetGangLeader\n'
564 'FROM SchedQueues\n'
565 'WHERE idSchedGroup = %s\n'
566 ' AND idTestSetGangLeader is not NULL\n'
567 , (self._oSchedGrpData.idSchedGroup,));
568 if self._oDb.getRowCount() > 0:
569 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
570 for aoRow in self._oDb.fetchAll():
571 idTestSetGangLeader = aoRow[0];
572 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
573 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
574 fCommit = False);
575 return True;
576
577 def _recreateQueueItems(self, oData):
578 """
579 Returns an array of queue items (SchedQueueData).
580 Child classes must override this.
581 """
582 _ = oData;
583 return [];
584
585 def recreateQueueWorker(self):
586 """
587 Worker for recreateQueue.
588 """
589
590 #
591 # Collect the necessary data and validate it.
592 #
593 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
594 aoErrors = oData.checkForGroupDepCycles();
595 aoErrors.extend(oData.checkForMissingTestCaseDeps());
596 if len(aoErrors) == 0:
597 oData.deepTestGroupSort();
598
599 #
600 # The creation of the scheduling queue is done by the child class.
601 #
602 # We will try guess where in queue we're currently at and rotate
603 # the items such that we will resume execution in the approximately
604 # same position. The goal of the scheduler is to provide a 100%
605 # deterministic result so that if we regenerate the queue when there
606 # are no changes to the testcases, testgroups or scheduling groups
607 # involved, test execution will be unchanged (save for maybe just a
608 # little for gang gathering).
609 #
610 aoItems = list();
611 if len(oData.aoArgsVariations) > 0:
612 aoItems = self._recreateQueueItems(oData);
613 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
614 #for i in range(len(aoItems)):
615 # self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
616 if len(aoItems) > 0:
617 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
618 , (self._oSchedGrpData.idSchedGroup,));
619 if self._oDb.getRowCount() > 0:
620 offQueue = self._oDb.fetchOne()[0];
621 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
622 , (self._oSchedGrpData.idSchedGroup,));
623 cItems = self._oDb.fetchOne()[0];
624 offQueueNew = (offQueue * cItems) / len(aoItems);
625 if offQueueNew != 0:
626 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
627
628 #
629 # Replace the scheduling queue.
630 # Care need to be take to first timeout/abort any gangs in the
631 # gathering state since these use the queue to set up the date.
632 #
633 self._recreateQueueCancelGatherings();
634 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
635 self._oDb.insertList('INSERT INTO SchedQueues (\n'
636 ' idSchedGroup,\n'
637 ' offQueue,\n'
638 ' idGenTestCaseArgs,\n'
639 ' idTestGroup,\n'
640 ' aidTestGroupPreReqs,\n'
641 ' bmHourlySchedule,\n'
642 ' cMissingGangMembers )\n',
643 aoItems, self._formatItemForInsert);
644 return (aoErrors, self._asMessages);
645
646 def _formatItemForInsert(self, oItem):
647 """
648 Used by recreateQueueWorker together with TMDatabaseConnect::insertList
649 """
650 return self._oDb.formatBindArgs('(%s,%s,%s,%s,%s,%s,%s)'
651 , ( oItem.idSchedGroup,
652 oItem.offQueue,
653 oItem.idGenTestCaseArgs,
654 oItem.idTestGroup,
655 oItem.aidTestGroupPreReqs if len(oItem.aidTestGroupPreReqs) > 0 else None,
656 oItem.bmHourlySchedule,
657 oItem.cMissingGangMembers
658 ));
659
660 @staticmethod
661 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
662 """
663 (Re-)creates the scheduling queue for the given group.
664
665 Returns (asMessages, asMessages). On success the array with the error
666 will be empty, on failure it will contain (sError, oRelatedObject)
667 entries. The messages is for debugging and are simple strings.
668
669 Raises exception database error.
670 """
671
672 aoExtraMsgs = [];
673 if oDb.debugIsExplainEnabled():
674 aoExtraMsgs += ['Warning! Disabling SQL explain to avoid deadlocking against locked tables.'];
675 oDb.debugDisableExplain();
676
677 aoErrors = [];
678 asMessages = [];
679 try:
680 #
681 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
682 # we lock quite a few tables while doing this work. We access more
683 # data than scheduleNewTask so we lock some additional tables.
684 #
685 oDb.rollback();
686 oDb.begin();
687 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
688 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
689 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
690
691 #
692 # Instantiate the scheduler and call the worker function.
693 #
694 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
695 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
696
697 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
698 if len(aoErrors) == 0:
699 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
700 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
701 oDb.commit();
702 else:
703 oDb.rollback();
704
705 except:
706 oDb.rollback();
707 raise;
708
709 return (aoErrors, aoExtraMsgs + asMessages);
710
711
712
713 #
714 # Schedule Task.
715 #
716
717 def _composeGangArguments(self, idTestSet):
718 """
719 Composes the gang specific testdriver arguments.
720 Returns command line string, including a leading space.
721 """
722
723 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
724 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
725
726 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
727 for i, _ in enumerate(aoGangMembers):
728 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
729
730 return sArgs;
731
732
733 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
734 """
735 Given all the bits of data, compose an EXEC command response to the testbox.
736 """
737 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
738 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
739 assert oValidationKitBuild;
740 if sScriptZips is None:
741 sScriptZips = oValidationKitBuild.sBinaries;
742 else:
743 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
744 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
745
746 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
747 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
748 sCmdLine = sCmdLine.strip();
749 if oTestEx.cGangMembers > 1:
750 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
751
752 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
753 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
754
755 dResponse = \
756 {
757 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
758 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
759 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
760 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
761 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
762 };
763 return dResponse;
764
765 @staticmethod
766 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
767 """
768 Composes an EXEC response for a gang member (other than the last).
769 Returns a EXEC response or raises an exception (DB/input error).
770 """
771 #
772 # Gather the necessary data.
773 #
774 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
775 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
776 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTestSet.idGenTestCaseArgs);
777 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
778 oValidationKitBuild = None;
779 if oTestSet.idBuildTestSuite is not None:
780 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
781
782 #
783 # Instantiate the specified scheduler and let it do the rest.
784 #
785 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBox.idSchedGroup, oTestSet.tsCreated);
786 assert oSchedGrpData.fEnabled is True;
787 assert oSchedGrpData.idBuildSrc is not None;
788 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
789
790 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
791
792
793 def _updateTask(self, oTask, tsNow):
794 """
795 Updates a gang schedule task.
796 """
797 assert oTask.cMissingGangMembers >= 1;
798 assert oTask.idTestSetGangLeader is not None;
799 assert oTask.idTestSetGangLeader >= 1;
800 if tsNow is not None:
801 self._oDb.execute('UPDATE SchedQueues\n'
802 ' SET idTestSetGangLeader = %s,\n'
803 ' cMissingGangMembers = %s,\n'
804 ' tsLastScheduled = %s\n'
805 'WHERE idItem = %s\n'
806 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
807 else:
808 self._oDb.execute('UPDATE SchedQueues\n'
809 ' SET cMissingGangMembers = %s\n'
810 'WHERE idItem = %s\n'
811 , (oTask.cMissingGangMembers, oTask.idItem,) );
812 return True;
813
814 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
815 """
816 The task has been scheduled successfully, reset it's data move it to
817 the end of the queue.
818 """
819 if cGangMembers > 1:
820 self._oDb.execute('UPDATE SchedQueues\n'
821 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
822 ' idTestSetGangLeader = NULL,\n'
823 ' cMissingGangMembers = %s\n'
824 'WHERE idItem = %s\n'
825 , (cGangMembers, oTask.idItem,) );
826 else:
827 self._oDb.execute('UPDATE SchedQueues\n'
828 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
829 ' idTestSetGangLeader = NULL,\n'
830 ' cMissingGangMembers = 1,\n'
831 ' tsLastScheduled = %s\n'
832 'WHERE idItem = %s\n'
833 , (tsNow, oTask.idItem,) );
834 return True;
835
836
837
838
839 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
840 """
841 Creates a test set for using the given data.
842 Will not commit, someone up the callstack will that later on.
843 Returns the test set ID, may raise an exception on database error.
844 """
845 # Lazy bird doesn't want to write testset.py and does it all here.
846
847 #
848 # We're getting the TestSet ID first in order to include it in the base
849 # file name (that way we can directly relate files on the disk to the
850 # test set when doing batch work), and also for idTesetSetGangLeader.
851 #
852 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
853 idTestSet = self._oDb.fetchOne()[0];
854
855 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
856 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
857
858 #
859 # Gang scheduling parameters. Changes the oTask data for updating by caller.
860 #
861 iGangMemberNo = 0;
862
863 if oTestEx.cGangMembers <= 1:
864 assert oTask.idTestSetGangLeader is None;
865 assert oTask.cMissingGangMembers <= 1;
866 elif oTask.idTestSetGangLeader is None:
867 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
868 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
869 oTask.idTestSetGangLeader = idTestSet;
870 else:
871 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
872 oTask.cMissingGangMembers -= 1;
873
874 #
875 # Do the database stuff.
876 #
877 self._oDb.execute('INSERT INTO TestSets (\n'
878 ' idTestSet,\n'
879 ' tsConfig,\n'
880 ' tsCreated,\n'
881 ' idBuild,\n'
882 ' idBuildCategory,\n'
883 ' idBuildTestSuite,\n'
884 ' idGenTestBox,\n'
885 ' idTestBox,\n'
886 ' idSchedGroup,\n'
887 ' idTestGroup,\n'
888 ' idGenTestCase,\n'
889 ' idTestCase,\n'
890 ' idGenTestCaseArgs,\n'
891 ' idTestCaseArgs,\n'
892 ' sBaseFilename,\n'
893 ' iGangMemberNo,\n'
894 ' idTestSetGangLeader )\n'
895 'VALUES ( %s,\n' # idTestSet
896 ' %s,\n' # tsConfig
897 ' %s,\n' # tsCreated
898 ' %s,\n' # idBuild
899 ' %s,\n' # idBuildCategory
900 ' %s,\n' # idBuildTestSuite
901 ' %s,\n' # idGenTestBox
902 ' %s,\n' # idTestBox
903 ' %s,\n' # idSchedGroup
904 ' %s,\n' # idTestGroup
905 ' %s,\n' # idGenTestCase
906 ' %s,\n' # idTestCase
907 ' %s,\n' # idGenTestCaseArgs
908 ' %s,\n' # idTestCaseArgs
909 ' %s,\n' # sBaseFilename
910 ' %s,\n' # iGangMemberNo
911 ' %s)\n' # idTestSetGangLeader
912 , ( idTestSet,
913 oTask.tsConfig,
914 tsNow,
915 oBuild.idBuild,
916 oBuild.idBuildCategory,
917 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
918 oTestBoxData.idGenTestBox,
919 oTestBoxData.idTestBox,
920 oTestBoxData.idSchedGroup,
921 oTask.idTestGroup,
922 oTestEx.oTestCase.idGenTestCase,
923 oTestEx.oTestCase.idTestCase,
924 oTestEx.idGenTestCaseArgs,
925 oTestEx.idTestCaseArgs,
926 sBaseFilename,
927 iGangMemberNo,
928 oTask.idTestSetGangLeader,
929 ));
930
931 self._oDb.execute('INSERT INTO TestResults (\n'
932 ' idTestResultParent,\n'
933 ' idTestSet,\n'
934 ' tsCreated,\n'
935 ' idStrName,\n'
936 ' cErrors,\n'
937 ' enmStatus,\n'
938 ' iNestingDepth)\n'
939 'VALUES ( NULL,\n' # idTestResultParent
940 ' %s,\n' # idTestSet
941 ' %s,\n' # tsCreated
942 ' 0,\n' # idStrName
943 ' 0,\n' # cErrors
944 ' \'running\'::TestStatus_T,\n'
945 ' 0)\n' # iNestingDepth
946 'RETURNING idTestResult'
947 , ( idTestSet, tsNow, ));
948 idTestResult = self._oDb.fetchOne()[0];
949
950 self._oDb.execute('UPDATE TestSets\n'
951 ' SET idTestResult = %s\n'
952 'WHERE idTestSet = %s\n'
953 , (idTestResult, idTestSet, ));
954
955 return idTestSet;
956
957 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
958 """
959 Tries to find the most recent validation kit build suitable for the given testbox.
960 Returns BuildDataEx or None. Raise exception on database error.
961
962 Can be overridden by child classes to change the default build requirements.
963 """
964 oBuildLogic = BuildLogic(self._oDb);
965 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
966 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
967 for _ in range(oCursor.getRowCount()):
968 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
969 if not oBuildLogic.isBuildBlacklisted(oBuild):
970 return oBuild;
971 return None;
972
973 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
974 """
975 Tries to find a fitting build.
976 Returns BuildDataEx or None. Raise exception on database error.
977
978 Can be overridden by child classes to change the default build requirements.
979 """
980
981 #
982 # Gather the set of prerequisites we have and turn them into a value
983 # set for use in the loop below.
984 #
985 # Note! We're scheduling on testcase level and ignoring argument variation
986 # selections in TestGroupMembers is intentional.
987 #
988 dPreReqs = {};
989
990 # Direct prerequisites. We assume they're all enabled as this can be
991 # checked at queue creation time.
992 for oPreReq in oTestEx.aoTestCasePreReqs:
993 dPreReqs[oPreReq.idTestCase] = 1;
994
995 # Testgroup dependencies from the scheduling group config.
996 if oTask.aidTestGroupPreReqs is not None:
997 for iTestGroup in oTask.aidTestGroupPreReqs:
998 # Make sure the _active_ test group members are in the cache.
999 if iTestGroup not in self.dTestGroupMembers:
1000 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
1001 'FROM TestGroupMembers, TestCases\n'
1002 'WHERE TestGroupMembers.idTestGroup = %s\n'
1003 ' AND TestGroupMembers.tsExpire > %s\n'
1004 ' AND TestGroupMembers.tsEffective <= %s\n'
1005 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
1006 ' AND TestCases.tsExpire > %s\n'
1007 ' AND TestCases.tsEffective <= %s\n'
1008 ' AND TestCases.fEnabled is TRUE\n'
1009 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
1010 aidTestCases = [];
1011 for aoRow in self._oDb.fetchAll():
1012 aidTestCases.append(aoRow[0]);
1013 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1014
1015 # Add the testgroup members to the prerequisites.
1016 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1017 dPreReqs[idTestCase] = 1;
1018
1019 # Create a SQL values table out of them.
1020 sPreReqSet = ''
1021 if len(dPreReqs) > 0:
1022 for idPreReq in sorted(dPreReqs.keys()):
1023 sPreReqSet += ', (' + str(idPreReq) + ')';
1024 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1025
1026 #
1027 # Try the builds.
1028 #
1029 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1030 for oEntry in self.oBuildCache:
1031 #
1032 # Check build requirements set by the test.
1033 #
1034 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1035 continue;
1036
1037 if oEntry.isBlacklisted(self._oDb):
1038 oEntry.remove();
1039 continue;
1040
1041 #
1042 # Check prerequisites. The default scheduler is satisfied if one
1043 # argument variation has been executed successfully. It is not
1044 # satisfied if there are any failure runs.
1045 #
1046 if len(sPreReqSet) > 0:
1047 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1048 if fDecision is None:
1049 # Check for missing prereqs.
1050 self._oDb.execute('SELECT COUNT(*)\n'
1051 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1052 'LEFT OUTER JOIN (SELECT idTestSet\n'
1053 ' FROM TestSets\n'
1054 ' WHERE enmStatus IN (%s, %s)\n'
1055 ' AND idBuild = %s\n'
1056 ' ) AS TestSets\n'
1057 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1058 'WHERE TestSets.idTestSet is NULL\n'
1059 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1060 oEntry.oBuild.idBuild, ));
1061 cMissingPreReqs = self._oDb.fetchOne()[0];
1062 if cMissingPreReqs > 0:
1063 self.dprint('build %s is missing %u prerequisites (out of %s)'
1064 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1065 oEntry.setPreReqDecision(sPreReqSet, False);
1066 continue;
1067
1068 # Check for failed prereq runs.
1069 self._oDb.execute('SELECT COUNT(*)\n'
1070 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1071 ' TestSets\n'
1072 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1073 ' AND TestSets.idBuild = %s\n'
1074 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1075 , ( oEntry.oBuild.idBuild,
1076 TestSetData.ksTestStatus_Failure,
1077 TestSetData.ksTestStatus_TimedOut,
1078 TestSetData.ksTestStatus_Rebooted,
1079 )
1080 );
1081 cFailedPreReqs = self._oDb.fetchOne()[0];
1082 if cFailedPreReqs > 0:
1083 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1084 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1085 oEntry.setPreReqDecision(sPreReqSet, False);
1086 continue;
1087
1088 oEntry.setPreReqDecision(sPreReqSet, True);
1089 elif not fDecision:
1090 continue;
1091
1092 #
1093 # If we can, check if the build files still exist.
1094 #
1095 if oEntry.oBuild.areFilesStillThere() is False:
1096 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1097 oEntry.remove();
1098 continue;
1099
1100 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1101 return oEntry.oBuild;
1102 return None;
1103
1104 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1105 """
1106 Tries to find a matching build for gang scheduling.
1107 Returns BuildDataEx or None. Raise exception on database error.
1108
1109 Can be overridden by child classes to change the default build requirements.
1110 """
1111 #
1112 # Note! Should probably check build prerequisites if we get a different
1113 # build back, so that we don't use a build which hasn't passed
1114 # the smoke test.
1115 #
1116 _ = idBuildSrc;
1117 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1118
1119
1120 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1121 """
1122 Try schedule the task as a gang leader (can be a gang of one).
1123 Returns response or None. May raise exception on DB error.
1124 """
1125
1126 # We don't wait for busy resources, we just try the next test.
1127 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1128 if not oTestArgsLogic.areResourcesFree(oTestEx):
1129 self.dprint('Cannot get global test resources!');
1130 return None;
1131
1132 #
1133 # Find a matching build (this is the difficult bit).
1134 #
1135 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1136 if oBuild is None:
1137 self.dprint('No build!');
1138 return None;
1139 if oTestEx.oTestCase.needValidationKitBit():
1140 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1141 if oValidationKitBuild is None:
1142 self.dprint('No validation kit build!');
1143 return None;
1144 else:
1145 oValidationKitBuild = None;
1146
1147 #
1148 # Create a testset, allocate the resources and update the state.
1149 # Note! Since resource allocation may still fail, we create a nested
1150 # transaction so we can roll back. (Heed lock warning in docs!)
1151 #
1152 self._oDb.execute('SAVEPOINT tryAsLeader');
1153 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1154
1155 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1156 is not True:
1157 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1158 self.dprint('Failed to allocate global resources!');
1159 return False;
1160
1161 if oTestEx.cGangMembers <= 1:
1162 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1163 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1164 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1165 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1166 else:
1167 # We're missing gang members, issue WAIT cmd.
1168 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1169 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1170 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1171
1172 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1173 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1174 return dResponse;
1175
1176 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1177 """
1178 Try schedule the task as a gang member.
1179 Returns response or None. May raise exception on DB error.
1180 """
1181
1182 #
1183 # The leader has choosen a build, we need to find a matching one for our platform.
1184 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1185 # upon subordinate group members.)
1186 #
1187 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1188
1189 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1190 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1191 if oBuild is None:
1192 return None;
1193
1194 oValidationKitBuild = None;
1195 if oLeaderTestSet.idBuildTestSuite is not None:
1196 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1197 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1198 self._oSchedGrpData.idBuildSrcTestSuite);
1199
1200 #
1201 # Create a testset and update the state(s).
1202 #
1203 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1204
1205 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1206 if oTask.cMissingGangMembers < 1:
1207 # The whole gang is there, move the task to the end of the queue
1208 # and update the status on the other gang members.
1209 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1210 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1211 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1212 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1213 else:
1214 # We're still missing some gang members, issue WAIT cmd.
1215 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1216 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1217 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1218
1219 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1220 return dResponse;
1221
1222
1223 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1224 """
1225 Worker for schduling a new task.
1226 """
1227
1228 #
1229 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1230 # queries), trying out each task to see if the testbox can execute it.
1231 #
1232 dRejected = {}; # variations we've already checked out and rejected.
1233 self._oDb.execute('SELECT *\n'
1234 'FROM SchedQueues\n'
1235 'WHERE idSchedGroup = %s\n'
1236 ' AND ( bmHourlySchedule IS NULL\n'
1237 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1238 'ORDER BY idItem ASC\n'
1239 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1240 aaoRows = self._oDb.fetchAll();
1241 for aoRow in aaoRows:
1242 # Don't loop forever.
1243 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1244 break;
1245
1246 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1247 oTask = SchedQueueData().initFromDbRow(aoRow);
1248 if config.g_kfSrvGlueDebugScheduler:
1249 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1250 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1251 oTask.tsLastScheduled, oTask.tsConfig,));
1252
1253 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1254 if sRejectNm in dRejected:
1255 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1256 continue;
1257 dRejected[sRejectNm] = 1;
1258
1259 # Fetch all the test case info (too much, but who cares right now).
1260 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1261 tsConfigEff = oTask.tsConfig,
1262 tsRsrcEff = oTask.tsConfig);
1263 if config.g_kfSrvGlueDebugScheduler:
1264 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1265
1266 # This shouldn't happen, but just in case it does...
1267 if oTestEx.oTestCase.fEnabled is not True:
1268 self.dprint('Testcase is not enabled!!');
1269 continue;
1270
1271 # Check if the testbox properties matches the test.
1272 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1273 self.dprint('Testbox mismatch!');
1274 continue;
1275
1276 # Try schedule it.
1277 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1278 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1279 elif oTask.cMissingGangMembers > 1:
1280 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1281 else:
1282 dResponse = None; # Shouldn't happen!
1283 if dResponse is not None:
1284 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1285 return dResponse;
1286
1287 # Found no suitable task.
1288 return None;
1289
1290 @staticmethod
1291 def scheduleNewTask(oDb, oTestBoxData, sBaseUrl, iVerbosity = 0):
1292 """
1293 Schedules a new task.
1294 """
1295 try:
1296 #
1297 # To avoid concurrency issues in SchedQueues we lock all the rows
1298 # related to our scheduling queue. Also, since this is a very
1299 # expensive operation we lock the testbox status row to fend of
1300 # repeated retires by fault testbox script.
1301 #
1302 tsSecStart = utils.timestampSecond();
1303 oDb.rollback();
1304 oDb.begin();
1305 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1306 % (oTestBoxData.idTestBox,));
1307 oDb.execute('SELECT idSchedGroup FROM SchedQueues WHERE idSchedGroup = %s FOR UPDATE'
1308 % (oTestBoxData.idSchedGroup,));
1309
1310 # We need the current timestamp.
1311 tsNow = oDb.getCurrentTimestamp();
1312
1313 # Re-read the testbox data ...
1314 oTestBoxDataCur = TestBoxData().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1315 if oTestBoxDataCur.fEnabled \
1316 and oTestBoxDataCur.idGenTestBox == oTestBoxData.idGenTestBox \
1317 and oTestBoxDataCur.idSchedGroup == oTestBoxData.idSchedGroup: # (paranoia wrt idSchedGroup)
1318
1319 # ... and schedule group data.
1320 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBoxDataCur.idSchedGroup, tsNow);
1321 if oSchedGrpData.fEnabled and oSchedGrpData.idBuildSrc is not None:
1322
1323 #
1324 # Instantiate the specified scheduler and let it do the rest.
1325 #
1326 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity, tsSecStart);
1327 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataCur, tsNow, sBaseUrl);
1328 if dResponse is not None:
1329 oDb.commit();
1330 return dResponse;
1331 except:
1332 oDb.rollback();
1333 raise;
1334
1335 # Not enabled, rollback and return no task.
1336 oDb.rollback();
1337 return None;
1338
1339 @staticmethod
1340 def tryCancelGangGathering(oDb, oStatusData):
1341 """
1342 Try canceling a gang gathering.
1343
1344 Returns True if successfully cancelled.
1345 Returns False if not (someone raced us to the SchedQueue table).
1346
1347 Note! oStatusData is re-initialized.
1348 """
1349 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1350 try:
1351 #
1352 # Lock the tables we're updating so we don't run into concurrency
1353 # issues (we're racing both scheduleNewTask and other callers of
1354 # this method).
1355 #
1356 oDb.rollback();
1357 oDb.begin();
1358 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1359
1360 #
1361 # Re-read the testbox data and check that we're still in the same state.
1362 #
1363 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1364 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1365 #
1366 # Get the leader thru the test set and change the state of the whole gang.
1367 #
1368 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1369
1370 oTBStatusLogic = TestBoxStatusLogic(oDb);
1371 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1372 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1373 fCommit = False);
1374
1375 #
1376 # Move the scheduling queue item to the end.
1377 #
1378 oDb.execute('SELECT *\n'
1379 'FROM SchedQueues\n'
1380 'WHERE idTestSetGangLeader = %s\n'
1381 , (oTestSetData.idTestSetGangLeader,) );
1382 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1383 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTask.idGenTestCaseArgs);
1384
1385 oDb.execute('UPDATE SchedQueues\n'
1386 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1387 ' idTestSetGangLeader = NULL,\n'
1388 ' cMissingGangMembers = %s\n'
1389 'WHERE idItem = %s\n'
1390 , (oTestEx.cGangMembers, oTask.idItem,) );
1391
1392 oDb.commit();
1393 return True;
1394
1395 elif oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1396 oDb.rollback();
1397 return True;
1398 except:
1399 oDb.rollback();
1400 raise;
1401
1402 # Not enabled, rollback and return no task.
1403 oDb.rollback();
1404 return False;
1405
1406
1407#
1408# Unit testing.
1409#
1410
1411# pylint: disable=C0111
1412class SchedQueueDataTestCase(ModelDataBaseTestCase):
1413 def setUp(self):
1414 self.aoSamples = [SchedQueueData(),];
1415
1416if __name__ == '__main__':
1417 unittest.main();
1418 # not reached.
1419
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette