VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 68395

Last change on this file since 68395 was 67001, checked in by vboxsync, 8 years ago

ReCreateQueueData:deepTestGroupSort: more error diagnotics

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 66.8 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 67001 2017-05-22 09:36:53Z vboxsync $
3# pylint: disable=C0302
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2016 Oracle Corporation
13
14This file is part of VirtualBox Open Source Edition (OSE), as
15available from http://www.virtualbox.org. This file is free software;
16you can redistribute it and/or modify it under the terms of the GNU
17General Public License (GPL) as published by the Free Software
18Foundation, in version 2 as it comes in the "COPYING" file of the
19VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21
22The contents of this file may alternatively be used under the terms
23of the Common Development and Distribution License Version 1.0
24(CDDL) only, as it comes in the "COPYING.CDDL" file of the
25VirtualBox OSE distribution, in which case the provisions of the
26CDDL are applicable instead of those of the GPL.
27
28You may elect to license modified versions of this file under the
29terms and conditions of either the GPL or the CDDL or both.
30"""
31__version__ = "$Revision: 67001 $"
32
33
34# Standard python imports.
35import unittest;
36
37# Validation Kit imports.
38from common import utils, constants;
39from testmanager import config;
40from testmanager.core.build import BuildDataEx, BuildLogic;
41from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
42from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
43from testmanager.core.globalresource import GlobalResourceLogic;
44from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
45from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
46from testmanager.core.testbox import TestBoxData, TestBoxDataEx;
47from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
48from testmanager.core.testcase import TestCaseLogic;
49from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
50from testmanager.core.testset import TestSetData, TestSetLogic;
51
52
53class ReCreateQueueData(object):
54 """
55 Data object for recreating a scheduling queue.
56
57 It's mostly a storage object, but has a few data checking operation
58 associated with it.
59 """
60
61 def __init__(self, oDb, idSchedGroup):
62 #
63 # Load data from the database.
64 #
65
66 # Will extend the entries with aoTestCases and dTestCases members
67 # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
68 self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
69
70 # aoTestCases entries are TestCaseData instance with iSchedPriority
71 # and idTestGroup added for our purposes.
72 # We will add oTestGroup and aoArgsVariations members to each further down.
73 self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
74
75 # Load dependencies.
76 oTestCaseLogic = TestCaseLogic(oDb)
77 for oTestCase in self.aoTestCases:
78 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
79
80 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
81 # and idTestGroup added for our purposes.
82 # We will add oTestGroup and oTestCase members to each further down.
83 self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
84
85 #
86 # Generate global lookups.
87 #
88
89 # Generate a testcase lookup dictionary for use when working on
90 # argument variations.
91 self.dTestCases = dict();
92 for oTestCase in self.aoTestCases:
93 self.dTestCases[oTestCase.idTestCase] = oTestCase;
94 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
95
96 # Generate a testgroup lookup dictionary.
97 self.dTestGroups = dict();
98 for oTestGroup in self.aoTestGroups:
99 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
100 assert len(self.dTestGroups) == len(self.aoTestGroups);
101
102 #
103 # Associate extra members with the base data.
104 #
105 if self.aoTestGroups:
106 # Prep the test groups.
107 for oTestGroup in self.aoTestGroups:
108 oTestGroup.aoTestCases = list();
109 oTestGroup.dTestCases = dict();
110
111 # Link testcases to their group, both directions. Prep testcases for
112 # argument varation association.
113 oTestGroup = self.aoTestGroups[0];
114 for oTestCase in self.aoTestCases:
115 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
116 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
117
118 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
119 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
120 oTestGroup.aoTestCases.append(oTestCase);
121 oTestCase.oTestGroup = oTestGroup;
122 oTestCase.aoArgsVariations = list();
123
124 # Associate testcase argument variations with their testcases (group)
125 # in both directions.
126 oTestGroup = self.aoTestGroups[0];
127 oTestCase = self.aoTestCases[0] if self.aoTestCases else None;
128 for oArgVariation in self.aoArgsVariations:
129 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
130 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
131 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
132 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
133
134 oTestCase.aoArgsVariations.append(oArgVariation);
135 oArgVariation.oTestCase = oTestCase;
136 oArgVariation.oTestGroup = oTestGroup;
137
138 else:
139 assert not self.aoTestCases;
140 assert not self.aoArgsVariations;
141 # done.
142
143 @staticmethod
144 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
145 """ Returns a chain of IDs error entry. """
146
147 sMsg += ' Dependency chain: %s' % (aidChain[0],);
148 for i in range(1, len(aidChain)):
149 sMsg += ' -> %s' % (aidChain[i],);
150
151 aoErrors.append([sMsg, oObj]);
152 return aoErrors;
153
154 def checkForGroupDepCycles(self):
155 """
156 Checks for testgroup depencency cycles and any missing testgroup
157 dependencies.
158 Returns array of errors (see SchedulderBase.recreateQueue()).
159 """
160 aoErrors = list();
161 for oTestGroup in self.aoTestGroups:
162 idPreReq = oTestGroup.idTestGroupPreReq;
163 if idPreReq is None:
164 oTestGroup.aidTestGroupPreReqs = list();
165 continue;
166
167 aidChain = [oTestGroup.idTestGroup,];
168 while idPreReq is not None:
169 aidChain.append(idPreReq);
170 if len(aidChain) >= 10:
171 self._addPreReqError(aoErrors, aidChain, oTestGroup,
172 'TestGroup #%s prerequisite chain is too long!'
173 % (oTestGroup.idTestGroup,));
174 break;
175
176 oDep = self.dTestGroups.get(idPreReq, None);
177 if oDep is None:
178 self._addPreReqError(aoErrors, aidChain, oTestGroup,
179 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
180 % (oTestGroup.idTestGroup, idPreReq,));
181 break;
182
183 idPreReq = oDep.idTestGroupPreReq;
184 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
185
186 return aoErrors;
187
188
189 def checkForMissingTestCaseDeps(self):
190 """
191 Checks that testcase dependencies stays within bounds. We do not allow
192 dependencies outside a testgroup, no dependency cycles or even remotely
193 long dependency chains.
194
195 Returns array of errors (see SchedulderBase.recreateQueue()).
196 """
197 aoErrors = list();
198 for oTestGroup in self.aoTestGroups:
199 for oTestCase in oTestGroup.aoTestCases:
200 if not oTestCase.aidPreReqs:
201 continue;
202
203 # Stupid recursion code using special stack(s).
204 aiIndexes = [[oTestCase, 0], ];
205 aidChain = [oTestCase.idTestGroup,];
206 while aiIndexes:
207 (oCur, i) = aiIndexes[-1];
208 if i >= len(oCur.aidPreReqs):
209 aiIndexes.pop();
210 aidChain.pop();
211 else:
212 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
213
214 idPreReq = oTestCase.aidPreReqs[i];
215 oDep = oTestGroup.dTestCases.get(idPreReq, None);
216 if oDep is None:
217 self._addPreReqError(aoErrors, aidChain, oTestCase,
218 'TestCase #%s prerequisite #%s is not in the scheduling group!'
219 % (oTestCase.idTestCase, idPreReq));
220 elif idPreReq in aidChain:
221 self._addPreReqError(aoErrors, aidChain, oTestCase,
222 'TestCase #%s prerequisite #%s creates a cycle!'
223 % (oTestCase.idTestCase, idPreReq));
224 elif not oDep.aiPreReqs:
225 pass;
226 elif len(aidChain) >= 10:
227 self._addPreReqError(aoErrors, aidChain, oTestCase,
228 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
229 else:
230 aiIndexes.append([oDep, 0]);
231 aidChain.append(idPreReq);
232
233 return aoErrors;
234
235 def deepTestGroupSort(self):
236 """
237 Sorts the testgroups and their testcases by priority and dependencies.
238 Note! Don't call this before checking for dependency cycles!
239 """
240 if not self.aoTestGroups:
241 return;
242
243 #
244 # ASSUMES groups as well as testcases are sorted by priority by the
245 # database. So we only have to concern ourselves with the dependency
246 # sorting.
247 #
248 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
249 for iTestGroup, oTestGroup in enumerate(self.aoTestGroups):
250 if oTestGroup.iSchedPriority > iGrpPrio:
251 raise TMExceptionBase('Incorrectly sorted testgroups returned by database: iTestGroup=%s prio=%s %s'
252 % ( iTestGroup, iGrpPrio,
253 ', '.join(['(%s: %s)' % (oCur.idTestGroup, oCur.iSchedPriority)
254 for oCur in self.aoTestGroups]), ) );
255 iGrpPrio = oTestGroup.iSchedPriority;
256
257 if oTestGroup.aoTestCases:
258 iTstPrio = oTestGroup.aoTestCases[0];
259 for iTestCase, oTestCase in enumerate(oTestGroup.aoTestCases):
260 if oTestCase.iSchedPriority > iTstPrio:
261 raise TMExceptionBase('Incorrectly sorted testcases returned by database: i=%s prio=%s idGrp=%s %s'
262 % ( iTestCase, iTstPrio, oTestGroup.idTestGroup,
263 ', '.join(['(%s: %s)' % (oCur.idTestCase, oCur.iSchedPriority)
264 for oCur in oTestGroup.aoTestCases]),));
265
266 #
267 # Sort the testgroups by dependencies.
268 #
269 i = 0;
270 while i < len(self.aoTestGroups):
271 oTestGroup = self.aoTestGroups[i];
272 if oTestGroup.idTestGroupPreReq is not None:
273 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
274 if iPreReq > i:
275 # The prerequisite is after the current entry. Move the
276 # current entry so that it's following it's prereq entry.
277 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
278 self.aoTestGroups.pop(i);
279 continue;
280 assert iPreReq < i;
281 i += 1; # Advance.
282
283 #
284 # Sort the testcases by dependencies.
285 # Same algorithm as above, just more prerequisites.
286 #
287 for oTestGroup in self.aoTestGroups:
288 i = 0;
289 while i < len(oTestGroup.aoTestCases):
290 oTestCase = oTestGroup.aoTestCases[i];
291 if oTestCase.aidPreReqs:
292 for idPreReq in oTestCase.aidPreReqs:
293 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
294 if iPreReq > i:
295 # The prerequisite is after the current entry. Move the
296 # current entry so that it's following it's prereq entry.
297 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
298 oTestGroup.aoTestGroups.pop(i);
299 i -= 1; # Don't advance.
300 break;
301 assert iPreReq < i;
302 i += 1; # Advance.
303
304
305 return True;
306
307
308
309class SchedQueueData(ModelDataBase):
310 """
311 Scheduling queue data item.
312 """
313
314 ksIdAttr = 'idSchedGroup';
315
316 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
317 ksParam_idItem = 'SchedQueueData_idItem';
318 ksParam_offQueue = 'SchedQueueData_offQueue';
319 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
320 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
321 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
322 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
323 ksParam_tsConfig = 'SchedQueueData_tsConfig';
324 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
325 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
326 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
327
328 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
329 'tsConfig', 'tsLastScheduled' ];
330
331
332 def __init__(self):
333 ModelDataBase.__init__(self);
334
335 #
336 # Initialize with defaults.
337 # See the database for explanations of each of these fields.
338 #
339 self.idSchedGroup = None;
340 self.idItem = None;
341 self.offQueue = None;
342 self.idGenTestCaseArgs = None;
343 self.idTestGroup = None;
344 self.aidTestGroupPreReqs = None;
345 self.bmHourlySchedule = None;
346 self.tsConfig = None;
347 self.tsLastScheduled = None;
348 self.idTestSetGangLeader = None;
349 self.cMissingGangMembers = 1;
350
351 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=R0913
352 bmHourlySchedule, cMissingGangMembers,
353 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
354 """
355 Reinitialize with all attributes potentially given as inputs.
356 Return self.
357 """
358 self.idSchedGroup = idSchedGroup;
359 self.idItem = idItem;
360 self.offQueue = offQueue;
361 self.idGenTestCaseArgs = idGenTestCaseArgs;
362 self.idTestGroup = idTestGroup;
363 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
364 self.bmHourlySchedule = bmHourlySchedule;
365 self.tsConfig = tsConfig;
366 self.tsLastScheduled = tsLastScheduled;
367 self.idTestSetGangLeader = idTestSetGangLeader;
368 self.cMissingGangMembers = cMissingGangMembers;
369 return self;
370
371 def initFromDbRow(self, aoRow):
372 """
373 Initialize from database row (SELECT * FROM SchedQueues).
374 Returns self.
375 Raises exception if no row is specfied.
376 """
377 if aoRow is None:
378 raise TMExceptionBase('SchedQueueData not found.');
379
380 self.idSchedGroup = aoRow[0];
381 self.idItem = aoRow[1];
382 self.offQueue = aoRow[2];
383 self.idGenTestCaseArgs = aoRow[3];
384 self.idTestGroup = aoRow[4];
385 self.aidTestGroupPreReqs = aoRow[5];
386 self.bmHourlySchedule = aoRow[6];
387 self.tsConfig = aoRow[7];
388 self.tsLastScheduled = aoRow[8];
389 self.idTestSetGangLeader = aoRow[9];
390 self.cMissingGangMembers = aoRow[10];
391 return self;
392
393
394
395
396
397
398class SchedulerBase(object):
399 """
400 The scheduler base class.
401
402 The scheduler classes have two functions:
403 1. Recreate the scheduling queue.
404 2. Pick the next task from the queue.
405
406 The first is scheduler specific, the latter isn't.
407 """
408
409 class BuildCache(object):
410 """ Build cache. """
411
412 class BuildCacheIterator(object):
413 """ Build class iterator. """
414 def __init__(self, oCache):
415 self.oCache = oCache;
416 self.iCur = 0;
417
418 def __iter__(self):
419 """Returns self, required by the language."""
420 return self;
421
422 def next(self):
423 """Returns the next build, raises StopIteration when the end has been reached."""
424 while True:
425 if self.iCur >= len(self.oCache.aoEntries):
426 oEntry = self.oCache.fetchFromCursor();
427 if oEntry is None:
428 raise StopIteration;
429 else:
430 oEntry = self.oCache.aoEntries[self.iCur];
431 self.iCur += 1;
432 if not oEntry.fRemoved:
433 return oEntry;
434 # end
435
436 class BuildCacheEntry(object):
437 """ Build cache entry. """
438
439 def __init__(self, oBuild, fMaybeBlacklisted):
440 self.oBuild = oBuild;
441 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
442 self.fRemoved = False;
443 self._dPreReqDecisions = dict();
444
445 def remove(self):
446 """
447 Marks the cache entry as removed.
448 This doesn't actually remove it from the cache array, only marks
449 it as removed. It has no effect on open iterators.
450 """
451 self.fRemoved = True;
452
453 def getPreReqDecision(self, sPreReqSet):
454 """
455 Retrieves a cached prerequisite decision.
456 Returns boolean if found, None if not.
457 """
458 return self._dPreReqDecisions.get(sPreReqSet);
459
460 def setPreReqDecision(self, sPreReqSet, fDecision):
461 """
462 Caches a prerequistie decision.
463 """
464 self._dPreReqDecisions[sPreReqSet] = fDecision;
465 return fDecision;
466
467 def isBlacklisted(self, oDb):
468 """ Checks if the build is blacklisted. """
469 if self._fBlacklisted is None:
470 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
471 return self._fBlacklisted;
472
473
474 def __init__(self):
475 self.aoEntries = [];
476 self.oCursor = None;
477
478 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
479 """ Configures the build cursor for the cache. """
480 if not self.aoEntries and self.oCursor is None:
481 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
482 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
483 return True;
484
485 def __iter__(self):
486 """Return an iterator."""
487 return self.BuildCacheIterator(self);
488
489 def fetchFromCursor(self):
490 """ Fetches a build from the cursor and adds it to the cache."""
491 if self.oCursor is None:
492 return None;
493
494 try:
495 aoRow = self.oCursor.fetchOne();
496 except:
497 return None;
498 if aoRow is None:
499 return None;
500
501 oBuild = BuildDataEx().initFromDbRow(aoRow);
502 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
503 self.aoEntries.append(oEntry);
504 return oEntry;
505
506 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
507 self._oDb = oDb;
508 self._oSchedGrpData = oSchedGrpData;
509 self._iVerbosity = iVerbosity;
510 self._asMessages = [];
511 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
512 self.oBuildCache = self.BuildCache();
513 self.dTestGroupMembers = dict();
514
515 @staticmethod
516 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
517 """
518 Instantiate the scheduler specified by the scheduling group.
519 Returns scheduler child class instance. May raise exception if
520 the input is invalid.
521 """
522 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinuousIntegration:
523 from testmanager.core.schedulerbeci import SchdulerBeci;
524 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
525 else:
526 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
527 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
528 return oScheduler;
529
530
531 #
532 # Misc.
533 #
534
535 def msgDebug(self, sText):
536 """Debug printing."""
537 if self._iVerbosity > 1:
538 self._asMessages.append('debug:' + sText);
539 return None;
540
541 def msgInfo(self, sText):
542 """Info printing."""
543 if self._iVerbosity > 1:
544 self._asMessages.append('info: ' + sText);
545 return None;
546
547 def dprint(self, sMsg):
548 """Prints a debug message to the srv glue log (see config.py). """
549 if config.g_kfSrvGlueDebugScheduler:
550 self._oDb.dprint(sMsg);
551 return None;
552
553 def getElapsedSecs(self):
554 """ Returns the number of seconds this scheduling task has been running. """
555 tsSecNow = utils.timestampSecond();
556 if tsSecNow < self._tsSecStart: # paranoia
557 self._tsSecStart = tsSecNow;
558 return tsSecNow - self._tsSecStart;
559
560
561 #
562 # Create schedule.
563 #
564
565 def _recreateQueueCancelGatherings(self):
566 """
567 Cancels all pending gang gatherings on the current queue.
568 """
569 self._oDb.execute('SELECT idTestSetGangLeader\n'
570 'FROM SchedQueues\n'
571 'WHERE idSchedGroup = %s\n'
572 ' AND idTestSetGangLeader is not NULL\n'
573 , (self._oSchedGrpData.idSchedGroup,));
574 if self._oDb.getRowCount() > 0:
575 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
576 for aoRow in self._oDb.fetchAll():
577 idTestSetGangLeader = aoRow[0];
578 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
579 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
580 fCommit = False);
581 return True;
582
583 def _recreateQueueItems(self, oData):
584 """
585 Returns an array of queue items (SchedQueueData).
586 Child classes must override this.
587 """
588 _ = oData;
589 return [];
590
591 def recreateQueueWorker(self):
592 """
593 Worker for recreateQueue.
594 """
595
596 #
597 # Collect the necessary data and validate it.
598 #
599 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
600 aoErrors = oData.checkForGroupDepCycles();
601 aoErrors.extend(oData.checkForMissingTestCaseDeps());
602 if not aoErrors:
603 oData.deepTestGroupSort();
604
605 #
606 # The creation of the scheduling queue is done by the child class.
607 #
608 # We will try guess where in queue we're currently at and rotate
609 # the items such that we will resume execution in the approximately
610 # same position. The goal of the scheduler is to provide a 100%
611 # deterministic result so that if we regenerate the queue when there
612 # are no changes to the testcases, testgroups or scheduling groups
613 # involved, test execution will be unchanged (save for maybe just a
614 # little for gang gathering).
615 #
616 aoItems = list();
617 if oData.aoArgsVariations:
618 aoItems = self._recreateQueueItems(oData);
619 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
620 #for i in range(len(aoItems)):
621 # self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
622 if aoItems:
623 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
624 , (self._oSchedGrpData.idSchedGroup,));
625 if self._oDb.getRowCount() > 0:
626 offQueue = self._oDb.fetchOne()[0];
627 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
628 , (self._oSchedGrpData.idSchedGroup,));
629 cItems = self._oDb.fetchOne()[0];
630 offQueueNew = (offQueue * cItems) / len(aoItems);
631 if offQueueNew != 0:
632 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
633
634 #
635 # Replace the scheduling queue.
636 # Care need to be take to first timeout/abort any gangs in the
637 # gathering state since these use the queue to set up the date.
638 #
639 self._recreateQueueCancelGatherings();
640 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
641 self._oDb.insertList('INSERT INTO SchedQueues (\n'
642 ' idSchedGroup,\n'
643 ' offQueue,\n'
644 ' idGenTestCaseArgs,\n'
645 ' idTestGroup,\n'
646 ' aidTestGroupPreReqs,\n'
647 ' bmHourlySchedule,\n'
648 ' cMissingGangMembers )\n',
649 aoItems, self._formatItemForInsert);
650 return (aoErrors, self._asMessages);
651
652 def _formatItemForInsert(self, oItem):
653 """
654 Used by recreateQueueWorker together with TMDatabaseConnect::insertList
655 """
656 return self._oDb.formatBindArgs('(%s,%s,%s,%s,%s,%s,%s)'
657 , ( oItem.idSchedGroup,
658 oItem.offQueue,
659 oItem.idGenTestCaseArgs,
660 oItem.idTestGroup,
661 oItem.aidTestGroupPreReqs if oItem.aidTestGroupPreReqs else None,
662 oItem.bmHourlySchedule,
663 oItem.cMissingGangMembers
664 ));
665
666 @staticmethod
667 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
668 """
669 (Re-)creates the scheduling queue for the given group.
670
671 Returns (asMessages, asMessages). On success the array with the error
672 will be empty, on failure it will contain (sError, oRelatedObject)
673 entries. The messages is for debugging and are simple strings.
674
675 Raises exception database error.
676 """
677
678 aoExtraMsgs = [];
679 if oDb.debugIsExplainEnabled():
680 aoExtraMsgs += ['Warning! Disabling SQL explain to avoid deadlocking against locked tables.'];
681 oDb.debugDisableExplain();
682
683 aoErrors = [];
684 asMessages = [];
685 try:
686 #
687 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
688 # we lock quite a few tables while doing this work. We access more
689 # data than scheduleNewTask so we lock some additional tables.
690 #
691 oDb.rollback();
692 oDb.begin();
693 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
694 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
695 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
696
697 #
698 # Instantiate the scheduler and call the worker function.
699 #
700 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
701 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
702
703 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
704 if not aoErrors:
705 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
706 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
707 oDb.commit();
708 else:
709 oDb.rollback();
710
711 except:
712 oDb.rollback();
713 raise;
714
715 return (aoErrors, aoExtraMsgs + asMessages);
716
717
718
719 #
720 # Schedule Task.
721 #
722
723 def _composeGangArguments(self, idTestSet):
724 """
725 Composes the gang specific testdriver arguments.
726 Returns command line string, including a leading space.
727 """
728
729 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
730 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
731
732 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
733 for i, _ in enumerate(aoGangMembers):
734 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
735
736 return sArgs;
737
738
739 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
740 """
741 Given all the bits of data, compose an EXEC command response to the testbox.
742 """
743 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
744 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
745 assert oValidationKitBuild;
746 if sScriptZips is None:
747 sScriptZips = oValidationKitBuild.sBinaries;
748 else:
749 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
750 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
751
752 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
753 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
754 sCmdLine = sCmdLine.strip();
755 if oTestEx.cGangMembers > 1:
756 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
757
758 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
759 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
760
761 dResponse = \
762 {
763 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
764 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
765 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
766 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
767 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
768 };
769 return dResponse;
770
771 @staticmethod
772 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
773 """
774 Composes an EXEC response for a gang member (other than the last).
775 Returns a EXEC response or raises an exception (DB/input error).
776 """
777 #
778 # Gather the necessary data.
779 #
780 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
781 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
782 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTestSet.idGenTestCaseArgs);
783 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
784 oValidationKitBuild = None;
785 if oTestSet.idBuildTestSuite is not None:
786 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
787
788 #
789 # Instantiate the specified scheduler and let it do the rest.
790 #
791 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestSet.idSchedGroup, oTestSet.tsCreated);
792 assert oSchedGrpData.fEnabled is True;
793 assert oSchedGrpData.idBuildSrc is not None;
794 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
795
796 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
797
798
799 def _updateTask(self, oTask, tsNow):
800 """
801 Updates a gang schedule task.
802 """
803 assert oTask.cMissingGangMembers >= 1;
804 assert oTask.idTestSetGangLeader is not None;
805 assert oTask.idTestSetGangLeader >= 1;
806 if tsNow is not None:
807 self._oDb.execute('UPDATE SchedQueues\n'
808 ' SET idTestSetGangLeader = %s,\n'
809 ' cMissingGangMembers = %s,\n'
810 ' tsLastScheduled = %s\n'
811 'WHERE idItem = %s\n'
812 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
813 else:
814 self._oDb.execute('UPDATE SchedQueues\n'
815 ' SET cMissingGangMembers = %s\n'
816 'WHERE idItem = %s\n'
817 , (oTask.cMissingGangMembers, oTask.idItem,) );
818 return True;
819
820 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
821 """
822 The task has been scheduled successfully, reset it's data move it to
823 the end of the queue.
824 """
825 if cGangMembers > 1:
826 self._oDb.execute('UPDATE SchedQueues\n'
827 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
828 ' idTestSetGangLeader = NULL,\n'
829 ' cMissingGangMembers = %s\n'
830 'WHERE idItem = %s\n'
831 , (cGangMembers, oTask.idItem,) );
832 else:
833 self._oDb.execute('UPDATE SchedQueues\n'
834 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
835 ' idTestSetGangLeader = NULL,\n'
836 ' cMissingGangMembers = 1,\n'
837 ' tsLastScheduled = %s\n'
838 'WHERE idItem = %s\n'
839 , (tsNow, oTask.idItem,) );
840 return True;
841
842
843
844
845 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
846 # type: (SchedQueueData, TestCaseArgsDataEx, TestBoxData, BuildDataEx, BuildDataEx, datetime.datetime) -> int
847 """
848 Creates a test set for using the given data.
849 Will not commit, someone up the callstack will that later on.
850
851 Returns the test set ID, may raise an exception on database error.
852 """
853 # Lazy bird doesn't want to write testset.py and does it all here.
854
855 #
856 # We're getting the TestSet ID first in order to include it in the base
857 # file name (that way we can directly relate files on the disk to the
858 # test set when doing batch work), and also for idTesetSetGangLeader.
859 #
860 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
861 idTestSet = self._oDb.fetchOne()[0];
862
863 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
864 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
865
866 #
867 # Gang scheduling parameters. Changes the oTask data for updating by caller.
868 #
869 iGangMemberNo = 0;
870
871 if oTestEx.cGangMembers <= 1:
872 assert oTask.idTestSetGangLeader is None;
873 assert oTask.cMissingGangMembers <= 1;
874 elif oTask.idTestSetGangLeader is None:
875 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
876 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
877 oTask.idTestSetGangLeader = idTestSet;
878 else:
879 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
880 oTask.cMissingGangMembers -= 1;
881
882 #
883 # Do the database stuff.
884 #
885 self._oDb.execute('INSERT INTO TestSets (\n'
886 ' idTestSet,\n'
887 ' tsConfig,\n'
888 ' tsCreated,\n'
889 ' idBuild,\n'
890 ' idBuildCategory,\n'
891 ' idBuildTestSuite,\n'
892 ' idGenTestBox,\n'
893 ' idTestBox,\n'
894 ' idSchedGroup,\n'
895 ' idTestGroup,\n'
896 ' idGenTestCase,\n'
897 ' idTestCase,\n'
898 ' idGenTestCaseArgs,\n'
899 ' idTestCaseArgs,\n'
900 ' sBaseFilename,\n'
901 ' iGangMemberNo,\n'
902 ' idTestSetGangLeader )\n'
903 'VALUES ( %s,\n' # idTestSet
904 ' %s,\n' # tsConfig
905 ' %s,\n' # tsCreated
906 ' %s,\n' # idBuild
907 ' %s,\n' # idBuildCategory
908 ' %s,\n' # idBuildTestSuite
909 ' %s,\n' # idGenTestBox
910 ' %s,\n' # idTestBox
911 ' %s,\n' # idSchedGroup
912 ' %s,\n' # idTestGroup
913 ' %s,\n' # idGenTestCase
914 ' %s,\n' # idTestCase
915 ' %s,\n' # idGenTestCaseArgs
916 ' %s,\n' # idTestCaseArgs
917 ' %s,\n' # sBaseFilename
918 ' %s,\n' # iGangMemberNo
919 ' %s)\n' # idTestSetGangLeader
920 , ( idTestSet,
921 oTask.tsConfig,
922 tsNow,
923 oBuild.idBuild,
924 oBuild.idBuildCategory,
925 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
926 oTestBoxData.idGenTestBox,
927 oTestBoxData.idTestBox,
928 oTask.idSchedGroup,
929 oTask.idTestGroup,
930 oTestEx.oTestCase.idGenTestCase,
931 oTestEx.oTestCase.idTestCase,
932 oTestEx.idGenTestCaseArgs,
933 oTestEx.idTestCaseArgs,
934 sBaseFilename,
935 iGangMemberNo,
936 oTask.idTestSetGangLeader,
937 ));
938
939 self._oDb.execute('INSERT INTO TestResults (\n'
940 ' idTestResultParent,\n'
941 ' idTestSet,\n'
942 ' tsCreated,\n'
943 ' idStrName,\n'
944 ' cErrors,\n'
945 ' enmStatus,\n'
946 ' iNestingDepth)\n'
947 'VALUES ( NULL,\n' # idTestResultParent
948 ' %s,\n' # idTestSet
949 ' %s,\n' # tsCreated
950 ' 0,\n' # idStrName
951 ' 0,\n' # cErrors
952 ' \'running\'::TestStatus_T,\n'
953 ' 0)\n' # iNestingDepth
954 'RETURNING idTestResult'
955 , ( idTestSet, tsNow, ));
956 idTestResult = self._oDb.fetchOne()[0];
957
958 self._oDb.execute('UPDATE TestSets\n'
959 ' SET idTestResult = %s\n'
960 'WHERE idTestSet = %s\n'
961 , (idTestResult, idTestSet, ));
962
963 return idTestSet;
964
965 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
966 """
967 Tries to find the most recent validation kit build suitable for the given testbox.
968 Returns BuildDataEx or None. Raise exception on database error.
969
970 Can be overridden by child classes to change the default build requirements.
971 """
972 oBuildLogic = BuildLogic(self._oDb);
973 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
974 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
975 for _ in range(oCursor.getRowCount()):
976 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
977 if not oBuildLogic.isBuildBlacklisted(oBuild):
978 return oBuild;
979 return None;
980
981 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
982 """
983 Tries to find a fitting build.
984 Returns BuildDataEx or None. Raise exception on database error.
985
986 Can be overridden by child classes to change the default build requirements.
987 """
988
989 #
990 # Gather the set of prerequisites we have and turn them into a value
991 # set for use in the loop below.
992 #
993 # Note! We're scheduling on testcase level and ignoring argument variation
994 # selections in TestGroupMembers is intentional.
995 #
996 dPreReqs = {};
997
998 # Direct prerequisites. We assume they're all enabled as this can be
999 # checked at queue creation time.
1000 for oPreReq in oTestEx.aoTestCasePreReqs:
1001 dPreReqs[oPreReq.idTestCase] = 1;
1002
1003 # Testgroup dependencies from the scheduling group config.
1004 if oTask.aidTestGroupPreReqs is not None:
1005 for iTestGroup in oTask.aidTestGroupPreReqs:
1006 # Make sure the _active_ test group members are in the cache.
1007 if iTestGroup not in self.dTestGroupMembers:
1008 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
1009 'FROM TestGroupMembers, TestCases\n'
1010 'WHERE TestGroupMembers.idTestGroup = %s\n'
1011 ' AND TestGroupMembers.tsExpire > %s\n'
1012 ' AND TestGroupMembers.tsEffective <= %s\n'
1013 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
1014 ' AND TestCases.tsExpire > %s\n'
1015 ' AND TestCases.tsEffective <= %s\n'
1016 ' AND TestCases.fEnabled is TRUE\n'
1017 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
1018 aidTestCases = [];
1019 for aoRow in self._oDb.fetchAll():
1020 aidTestCases.append(aoRow[0]);
1021 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1022
1023 # Add the testgroup members to the prerequisites.
1024 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1025 dPreReqs[idTestCase] = 1;
1026
1027 # Create a SQL values table out of them.
1028 sPreReqSet = ''
1029 if dPreReqs:
1030 for idPreReq in sorted(dPreReqs):
1031 sPreReqSet += ', (' + str(idPreReq) + ')';
1032 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1033
1034 #
1035 # Try the builds.
1036 #
1037 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1038 for oEntry in self.oBuildCache:
1039 #
1040 # Check build requirements set by the test.
1041 #
1042 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1043 continue;
1044
1045 if oEntry.isBlacklisted(self._oDb):
1046 oEntry.remove();
1047 continue;
1048
1049 #
1050 # Check prerequisites. The default scheduler is satisfied if one
1051 # argument variation has been executed successfully. It is not
1052 # satisfied if there are any failure runs.
1053 #
1054 if sPreReqSet:
1055 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1056 if fDecision is None:
1057 # Check for missing prereqs.
1058 self._oDb.execute('SELECT COUNT(*)\n'
1059 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1060 'LEFT OUTER JOIN (SELECT idTestSet\n'
1061 ' FROM TestSets\n'
1062 ' WHERE enmStatus IN (%s, %s)\n'
1063 ' AND idBuild = %s\n'
1064 ' ) AS TestSets\n'
1065 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1066 'WHERE TestSets.idTestSet is NULL\n'
1067 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1068 oEntry.oBuild.idBuild, ));
1069 cMissingPreReqs = self._oDb.fetchOne()[0];
1070 if cMissingPreReqs > 0:
1071 self.dprint('build %s is missing %u prerequisites (out of %s)'
1072 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1073 oEntry.setPreReqDecision(sPreReqSet, False);
1074 continue;
1075
1076 # Check for failed prereq runs.
1077 self._oDb.execute('SELECT COUNT(*)\n'
1078 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1079 ' TestSets\n'
1080 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1081 ' AND TestSets.idBuild = %s\n'
1082 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1083 , ( oEntry.oBuild.idBuild,
1084 TestSetData.ksTestStatus_Failure,
1085 TestSetData.ksTestStatus_TimedOut,
1086 TestSetData.ksTestStatus_Rebooted,
1087 )
1088 );
1089 cFailedPreReqs = self._oDb.fetchOne()[0];
1090 if cFailedPreReqs > 0:
1091 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1092 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1093 oEntry.setPreReqDecision(sPreReqSet, False);
1094 continue;
1095
1096 oEntry.setPreReqDecision(sPreReqSet, True);
1097 elif not fDecision:
1098 continue;
1099
1100 #
1101 # If we can, check if the build files still exist.
1102 #
1103 if oEntry.oBuild.areFilesStillThere() is False:
1104 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1105 oEntry.remove();
1106 continue;
1107
1108 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1109 return oEntry.oBuild;
1110 return None;
1111
1112 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1113 """
1114 Tries to find a matching build for gang scheduling.
1115 Returns BuildDataEx or None. Raise exception on database error.
1116
1117 Can be overridden by child classes to change the default build requirements.
1118 """
1119 #
1120 # Note! Should probably check build prerequisites if we get a different
1121 # build back, so that we don't use a build which hasn't passed
1122 # the smoke test.
1123 #
1124 _ = idBuildSrc;
1125 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1126
1127
1128 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1129 """
1130 Try schedule the task as a gang leader (can be a gang of one).
1131 Returns response or None. May raise exception on DB error.
1132 """
1133
1134 # We don't wait for busy resources, we just try the next test.
1135 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1136 if not oTestArgsLogic.areResourcesFree(oTestEx):
1137 self.dprint('Cannot get global test resources!');
1138 return None;
1139
1140 #
1141 # Find a matching build (this is the difficult bit).
1142 #
1143 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1144 if oBuild is None:
1145 self.dprint('No build!');
1146 return None;
1147 if oTestEx.oTestCase.needValidationKitBit():
1148 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1149 if oValidationKitBuild is None:
1150 self.dprint('No validation kit build!');
1151 return None;
1152 else:
1153 oValidationKitBuild = None;
1154
1155 #
1156 # Create a testset, allocate the resources and update the state.
1157 # Note! Since resource allocation may still fail, we create a nested
1158 # transaction so we can roll back. (Heed lock warning in docs!)
1159 #
1160 self._oDb.execute('SAVEPOINT tryAsLeader');
1161 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1162
1163 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1164 is not True:
1165 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1166 self.dprint('Failed to allocate global resources!');
1167 return False;
1168
1169 if oTestEx.cGangMembers <= 1:
1170 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1171 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1172 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1173 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1174 else:
1175 # We're missing gang members, issue WAIT cmd.
1176 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1177 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1178 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1179
1180 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1181 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1182 return dResponse;
1183
1184 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1185 """
1186 Try schedule the task as a gang member.
1187 Returns response or None. May raise exception on DB error.
1188 """
1189
1190 #
1191 # The leader has choosen a build, we need to find a matching one for our platform.
1192 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1193 # upon subordinate group members.)
1194 #
1195 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1196
1197 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1198 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1199 if oBuild is None:
1200 return None;
1201
1202 oValidationKitBuild = None;
1203 if oLeaderTestSet.idBuildTestSuite is not None:
1204 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1205 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1206 self._oSchedGrpData.idBuildSrcTestSuite);
1207
1208 #
1209 # Create a testset and update the state(s).
1210 #
1211 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1212
1213 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1214 if oTask.cMissingGangMembers < 1:
1215 # The whole gang is there, move the task to the end of the queue
1216 # and update the status on the other gang members.
1217 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1218 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1219 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1220 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1221 else:
1222 # We're still missing some gang members, issue WAIT cmd.
1223 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1224 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1225 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1226
1227 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1228 return dResponse;
1229
1230
1231 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1232 """
1233 Worker for schduling a new task.
1234 """
1235
1236 #
1237 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1238 # queries), trying out each task to see if the testbox can execute it.
1239 #
1240 dRejected = {}; # variations we've already checked out and rejected.
1241 self._oDb.execute('SELECT *\n'
1242 'FROM SchedQueues\n'
1243 'WHERE idSchedGroup = %s\n'
1244 ' AND ( bmHourlySchedule IS NULL\n'
1245 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1246 'ORDER BY idItem ASC\n'
1247 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1248 aaoRows = self._oDb.fetchAll();
1249 for aoRow in aaoRows:
1250 # Don't loop forever.
1251 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1252 break;
1253
1254 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1255 oTask = SchedQueueData().initFromDbRow(aoRow);
1256 if config.g_kfSrvGlueDebugScheduler:
1257 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1258 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1259 oTask.tsLastScheduled, oTask.tsConfig,));
1260
1261 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1262 if sRejectNm in dRejected:
1263 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1264 continue;
1265 dRejected[sRejectNm] = 1;
1266
1267 # Fetch all the test case info (too much, but who cares right now).
1268 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1269 tsConfigEff = oTask.tsConfig,
1270 tsRsrcEff = oTask.tsConfig);
1271 if config.g_kfSrvGlueDebugScheduler:
1272 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1273
1274 # This shouldn't happen, but just in case it does...
1275 if oTestEx.oTestCase.fEnabled is not True:
1276 self.dprint('Testcase is not enabled!!');
1277 continue;
1278
1279 # Check if the testbox properties matches the test.
1280 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1281 self.dprint('Testbox mismatch!');
1282 continue;
1283
1284 # Try schedule it.
1285 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1286 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1287 elif oTask.cMissingGangMembers > 1:
1288 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1289 else:
1290 dResponse = None; # Shouldn't happen!
1291 if dResponse is not None:
1292 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1293 return dResponse;
1294
1295 # Found no suitable task.
1296 return None;
1297
1298 @staticmethod
1299 def _pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds):
1300 """
1301 Picks the next scheduling group for the given testbox.
1302 """
1303 if len(oTestBoxDataEx.aoInSchedGroups) == 1:
1304 oSchedGroup = oTestBoxDataEx.aoInSchedGroups[0].oSchedGroup;
1305 if oSchedGroup.fEnabled \
1306 and oSchedGroup.idBuildSrc is not None \
1307 and oSchedGroup.idSchedGroup not in dIgnoreSchedGroupIds:
1308 return (oSchedGroup, 0);
1309 iWorkItem = 0;
1310
1311 elif oTestBoxDataEx.aoInSchedGroups:
1312 # Construct priority table of currently enabled scheduling groups.
1313 aaoList1 = [];
1314 for oInGroup in oTestBoxDataEx.aoInSchedGroups:
1315 oSchedGroup = oInGroup.oSchedGroup;
1316 if oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None:
1317 iSchedPriority = oInGroup.iSchedPriority;
1318 if iSchedPriority > 31: # paranoia
1319 iSchedPriority = 31;
1320 elif iSchedPriority < 0: # paranoia
1321 iSchedPriority = 0;
1322
1323 for iSchedPriority in xrange(min(iSchedPriority, len(aaoList1))):
1324 aaoList1[iSchedPriority].append(oSchedGroup);
1325 while len(aaoList1) <= iSchedPriority:
1326 aaoList1.append([oSchedGroup,]);
1327
1328 # Flatten it into a single list, mixing the priorities a little so it doesn't
1329 # take forever before low priority stuff is executed.
1330 aoFlat = [];
1331 iLo = 0;
1332 iHi = len(aaoList1) - 1;
1333 while iHi >= iLo:
1334 aoFlat += aaoList1[iHi];
1335 if iLo < iHi:
1336 aoFlat += aaoList1[iLo];
1337 iLo += 1;
1338 iHi -= 1;
1339
1340 # Pick the next one.
1341 cLeft = len(aoFlat);
1342 while cLeft > 0:
1343 cLeft -= 1;
1344 iWorkItem += 1;
1345 if iWorkItem >= len(aoFlat) or iWorkItem < 0:
1346 iWorkItem = 0;
1347 if aoFlat[iWorkItem].idSchedGroup not in dIgnoreSchedGroupIds:
1348 return (aoFlat[iWorkItem], iWorkItem);
1349 else:
1350 iWorkItem = 0;
1351
1352 # No active group.
1353 return (None, iWorkItem);
1354
1355 @staticmethod
1356 def scheduleNewTask(oDb, oTestBoxData, iWorkItem, sBaseUrl, iVerbosity = 0):
1357 # type: (TMDatabaseConnection, TestBoxData, int, str, int) -> None
1358 """
1359 Schedules a new task for a testbox.
1360 """
1361 oTBStatusLogic = TestBoxStatusLogic(oDb);
1362
1363 try:
1364 #
1365 # To avoid concurrency issues in SchedQueues we lock all the rows
1366 # related to our scheduling queue. Also, since this is a very
1367 # expensive operation we lock the testbox status row to fend of
1368 # repeated retires by faulty testbox scripts.
1369 #
1370 tsSecStart = utils.timestampSecond();
1371 oDb.rollback();
1372 oDb.begin();
1373 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1374 % (oTestBoxData.idTestBox,));
1375 oDb.execute('SELECT SchedQueues.idSchedGroup\n'
1376 ' FROM SchedQueues, TestBoxesInSchedGroups\n'
1377 'WHERE TestBoxesInSchedGroups.idTestBox = %s\n'
1378 ' AND TestBoxesInSchedGroups.tsExpire = \'infinity\'::TIMESTAMP\n'
1379 ' AND TestBoxesInSchedGroups.idSchedGroup = SchedQueues.idSchedGroup\n'
1380 ' FOR UPDATE'
1381 % (oTestBoxData.idTestBox,));
1382
1383 # We need the current timestamp.
1384 tsNow = oDb.getCurrentTimestamp();
1385
1386 # Re-read the testbox data with scheduling group relations.
1387 oTestBoxDataEx = TestBoxDataEx().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1388 if oTestBoxDataEx.fEnabled \
1389 and oTestBoxDataEx.idGenTestBox == oTestBoxData.idGenTestBox:
1390
1391 # We may have to skip scheduling groups that are out of work (e.g. 'No build').
1392 iInitialWorkItem = iWorkItem;
1393 dIgnoreSchedGroupIds = {};
1394 while True:
1395 # Now, pick the scheduling group.
1396 (oSchedGroup, iWorkItem) = SchedulerBase._pickSchedGroup(oTestBoxDataEx, iWorkItem, dIgnoreSchedGroupIds);
1397 if oSchedGroup is None:
1398 break;
1399 assert oSchedGroup.fEnabled and oSchedGroup.idBuildSrc is not None;
1400
1401 # Instantiate the specified scheduler and let it do the rest.
1402 oScheduler = SchedulerBase._instantiate(oDb, oSchedGroup, iVerbosity, tsSecStart);
1403 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataEx, tsNow, sBaseUrl);
1404 if dResponse is not None:
1405 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1406 oDb.commit();
1407 return dResponse;
1408
1409 # Check out the next work item?
1410 if oScheduler.getElapsedSecs() > config.g_kcSecMaxNewTask:
1411 break;
1412 dIgnoreSchedGroupIds[oSchedGroup.idSchedGroup] = oSchedGroup;
1413
1414 # No luck, but best if we update the work item if we've made progress.
1415 # Note! In case of a config.g_kcSecMaxNewTask timeout, this may accidentally skip
1416 # a work item with actually work to do. But that's a small price to pay.
1417 if iWorkItem != iInitialWorkItem:
1418 oTBStatusLogic.updateWorkItem(oTestBoxDataEx.idTestBox, iWorkItem);
1419 oDb.commit();
1420 return None;
1421 except:
1422 oDb.rollback();
1423 raise;
1424
1425 # Not enabled, rollback and return no task.
1426 oDb.rollback();
1427 return None;
1428
1429 @staticmethod
1430 def tryCancelGangGathering(oDb, oStatusData):
1431 """
1432 Try canceling a gang gathering.
1433
1434 Returns True if successfully cancelled.
1435 Returns False if not (someone raced us to the SchedQueue table).
1436
1437 Note! oStatusData is re-initialized.
1438 """
1439 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1440 try:
1441 #
1442 # Lock the tables we're updating so we don't run into concurrency
1443 # issues (we're racing both scheduleNewTask and other callers of
1444 # this method).
1445 #
1446 oDb.rollback();
1447 oDb.begin();
1448 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1449
1450 #
1451 # Re-read the testbox data and check that we're still in the same state.
1452 #
1453 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1454 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1455 #
1456 # Get the leader thru the test set and change the state of the whole gang.
1457 #
1458 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1459
1460 oTBStatusLogic = TestBoxStatusLogic(oDb);
1461 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1462 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1463 fCommit = False);
1464
1465 #
1466 # Move the scheduling queue item to the end.
1467 #
1468 oDb.execute('SELECT *\n'
1469 'FROM SchedQueues\n'
1470 'WHERE idTestSetGangLeader = %s\n'
1471 , (oTestSetData.idTestSetGangLeader,) );
1472 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1473 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTask.idGenTestCaseArgs);
1474
1475 oDb.execute('UPDATE SchedQueues\n'
1476 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1477 ' idTestSetGangLeader = NULL,\n'
1478 ' cMissingGangMembers = %s\n'
1479 'WHERE idItem = %s\n'
1480 , (oTestEx.cGangMembers, oTask.idItem,) );
1481
1482 oDb.commit();
1483 return True;
1484
1485 elif oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1486 oDb.rollback();
1487 return True;
1488 except:
1489 oDb.rollback();
1490 raise;
1491
1492 # Not enabled, rollback and return no task.
1493 oDb.rollback();
1494 return False;
1495
1496
1497#
1498# Unit testing.
1499#
1500
1501# pylint: disable=C0111
1502class SchedQueueDataTestCase(ModelDataBaseTestCase):
1503 def setUp(self):
1504 self.aoSamples = [SchedQueueData(),];
1505
1506if __name__ == '__main__':
1507 unittest.main();
1508 # not reached.
1509
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette