Changeset 39517 in vbox for trunk/src/VBox/Runtime/common/misc/reqpool.cpp
- Timestamp:
- Dec 2, 2011 3:58:27 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/VBox/Runtime/common/misc/reqpool.cpp
r39510 r39517 59 59 /** The submit timestamp of the pending request. */ 60 60 uint64_t uPendingNanoTs; 61 /** The submit timestamp of the request processing. */ 62 uint64_t uProcessingNanoTs; 61 63 /** When this CPU went idle the last time. */ 62 64 uint64_t uIdleNanoTs; … … 71 73 RTCPUID idLastCpu; 72 74 73 /** The thread handle. */74 RTTHREAD hThread;75 76 75 /** The submitter will put an incoming request here when scheduling an idle 77 76 * thread. */ … … 80 79 PRTREQINT volatile pPendingReq; 81 80 81 /** The thread handle. */ 82 RTTHREAD hThread; 83 /** Nano seconds timestamp representing the birth time of the thread. */ 84 uint64_t uBirthNanoTs; 85 /** Pointer to the request thread pool instance the thread is associated 86 * with. */ 87 struct RTREQPOOLINT *pPool; 82 88 } RTREQPOOLTHREAD; 83 89 /** Pointer to a worker thread. */ 90 typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD; 91 92 /** 93 * Request thread pool instance data. 94 */ 84 95 typedef struct RTREQPOOLINT 85 96 { … … 87 98 uint32_t u32Magic; 88 99 100 /** The worker thread type. */ 101 RTTHREADTYPE enmThreadType; 89 102 /** The current number of worker threads. */ 90 103 uint32_t cCurThreads; … … 99 112 * considered for retirement. */ 100 113 uint32_t cMsMinIdle; 114 /** The max number of milliseconds to push back a submitter before creating 115 * a new worker thread once the threshold has been reached. */ 116 uint32_t cMsMaxPushBack; 117 /** The minimum number of milliseconds to push back a submitter before 118 * creating a new worker thread once the threshold has been reached. */ 119 uint32_t cMsMinPushBack; 120 /** The current submitter push back in milliseconds. 121 * This is recalculated when worker threads come and go. */ 122 uint32_t cMsCurPushBack; 101 123 102 124 /** Statistics: The total number of threads created. */ … … 107 129 RTLISTANCHOR WorkerThreads; 108 130 131 /** Event semaphore that submitters block on when pushing back . */ 132 RTSEMEVENT hPushBackEvt; 133 109 134 /** Critical section serializing access to members of this structure. */ 110 135 RTCRITSECT CritSect; 111 136 137 /** Destruction indicator. The worker threads checks in their loop. */ 138 bool volatile fDestructing; 139 112 140 /** Reference counter. */ 113 141 uint32_t volatile cRefs; 142 /** Number of threads pushing back. */ 143 uint32_t volatile cPushingBack; 144 /** The number of idle thread or threads in the process of becoming 145 * idle. This is increased before the to-be-idle thread tries to enter 146 * the critical section and add itself to the list. */ 147 uint32_t volatile cIdleThreads; 114 148 /** Linked list of idle threads. */ 115 149 RTLISTANCHOR IdleThreads; 116 150 151 /** Head of the request FIFO. */ 152 PRTREQINT pPendingRequests; 153 /** Where to insert the next request. */ 154 PRTREQINT *ppPendingRequests; 117 155 118 156 } RTREQPOOLINT; … … 120 158 typedef RTREQPOOLINT *PRTREQPOOLINT; 121 159 160 161 static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool) 162 { 163 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack; 164 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsThreshold; 165 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsThreshold; 166 167 uint32_t cMsCurPushBack; 168 if ((cMsRange >> 2) >= cSteps) 169 cMsCurPushBack = cMsRange / cSteps * iStep; 170 else 171 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS ); 172 cMsCurPushBack += pPool->cMsMinPushBack; 173 174 pPool->cMsCurPushBack = cMsCurPushBack; 175 } 176 177 178 179 static void rtReqPoolThreadProcessRequest(PRTREQPOOLTHREAD pThread, PRTREQINT pReq) 180 { 181 /* 182 * Update thread state. 183 */ 184 pThread->uProcessingNanoTs = RTTimeNanoTS(); 185 pThread->uPendingNanoTs = pReq->uSubmitNanoTs; 186 pThread->pPendingReq = pReq; 187 Assert(pReq->u32Magic == RTREQ_MAGIC); 188 189 /* 190 * Do the actual processing. 191 */ 192 /** @todo */ 193 194 /* 195 * Update thread statistics and state. 196 */ 197 uint64_t const uNsTsEnd = RTTimeNanoTS(); 198 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs; 199 pThread->cNsTotalReqQueued += uNsTsEnd - pThread->uPendingNanoTs; 200 pThread->cReqProcessed++; 201 } 202 203 204 205 static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg) 206 { 207 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg; 208 PRTREQPOOLINT pPool = pThread->pPool; 209 210 /* 211 * The work loop. 212 */ 213 uint64_t cPrevReqProcessed = 0; 214 while (pPool->fDestructing) 215 { 216 /* 217 * Pending work? 218 */ 219 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT); 220 if (pReq) 221 rtReqPoolThreadProcessRequest(pThread, pReq); 222 else 223 { 224 ASMAtomicIncU32(&pPool->cIdleThreads); 225 RTCritSectEnter(&pPool->CritSect); 226 227 /* Recheck the todo request pointer after entering the critsect. */ 228 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT); 229 if (!pReq) 230 { 231 /* Any pending requests in the queue? */ 232 pReq = pPool->pPendingRequests; 233 if (pReq) 234 { 235 pPool->pPendingRequests = pReq->pNext; 236 if (pReq->pNext == NULL) 237 pPool->ppPendingRequests = &pPool->pPendingRequests; 238 } 239 } 240 241 if (pReq) 242 { 243 /* 244 * Un-idle ourselves and process the request. 245 */ 246 if (!RTListIsEmpty(&pThread->IdleNode)) 247 { 248 RTListNodeRemove(&pThread->IdleNode); 249 RTListInit(&pThread->IdleNode); 250 } 251 ASMAtomicDecU32(&pPool->cIdleThreads); 252 RTCritSectLeave(&pPool->CritSect); 253 254 rtReqPoolThreadProcessRequest(pThread, pReq); 255 } 256 else 257 { 258 /* 259 * Nothing to do, go idle. 260 */ 261 if (cPrevReqProcessed != pThread->cReqProcessed) 262 { 263 pThread->cReqProcessed = cPrevReqProcessed; 264 pThread->uIdleNanoTs = RTTimeNanoTS(); 265 } 266 267 if (RTListIsEmpty(&pThread->IdleNode)) 268 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode); 269 RTThreadUserReset(hThreadSelf); 270 271 RTCritSectLeave(&pPool->CritSect); 272 273 RTThreadUserWait(hThreadSelf, 0); 274 275 276 277 } 278 } 279 } 280 281 /* 282 * Clean up on the way out. 283 */ 284 RTCritSectEnter(&pPool->CritSect); 285 286 /** @todo .... */ 287 288 rtReqPoolRecalcPushBack(pPool); 289 290 RTCritSectLeave(&pPool->CritSect); 291 292 return VINF_SUCCESS; 293 } 294 295 296 DECLHIDDEN(int) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq) 297 { 298 /* 299 * Prepare the request. 300 */ 301 pReq->uSubmitNanoTs = RTTimeNanoTS(); 302 303 304 RTCritSectEnter(&pPool->CritSect); 305 306 /* 307 * Try schedule the request to any currently idle thread. 308 */ 309 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode); 310 if (pThread) 311 { 312 /** @todo CPU affinity... */ 313 ASMAtomicWritePtr(&pThread->pTodoReq, pReq); 314 315 RTListNodeRemove(&pThread->IdleNode); 316 RTListInit(&pThread->IdleNode); 317 ASMAtomicDecU32(&pPool->cIdleThreads); 318 319 RTThreadUserSignal(pThread->hThread); 320 321 RTCritSectLeave(&pPool->CritSect); 322 return VINF_SUCCESS; 323 } 324 Assert(RTListIsEmpty(&pPool->IdleThreads)); 325 326 /* 327 * Put the request in the pending queue. 328 */ 329 pReq->pNext = NULL; 330 *pPool->ppPendingRequests = pReq; 331 pPool->ppPendingRequests = (PRTREQINT*)&pReq->pNext; 332 333 /* 334 * If there is an incoming worker thread already or we've reached the 335 * maximum number of worker threads, we're done. 336 */ 337 if ( pPool->cIdleThreads > 0 338 || pPool->cCurThreads >= pPool->cMaxThreads) 339 { 340 RTCritSectLeave(&pPool->CritSect); 341 return VINF_SUCCESS; 342 } 343 344 /* 345 * Push back before creating a new worker thread. 346 */ 347 if ( pPool->cCurThreads > pPool->cThreadsThreshold 348 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack ) 349 { 350 uint32_t const cMsTimeout = pPool->cMsCurPushBack; 351 pPool->cPushingBack++; 352 RTCritSectLeave(&pPool->CritSect); 353 354 /** @todo this is everything but perfect... it makes wake up order 355 * assumptions. A better solution would be having a lazily 356 * allocated push back event on each request. */ 357 int rc = RTSemEventWait(pPool->hPushBackEvt, cMsTimeout); 358 359 RTCritSectEnter(&pPool->CritSect); 360 pPool->cPushingBack--; 361 } 362 363 /* 364 * Create a new thread for processing the request, or should we wait? 365 */ 366 pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD)); 367 if (pThread) 368 { 369 pThread->uBirthNanoTs = RTTimeNanoTS(); 370 pThread->pPool = pPool; 371 pThread->idLastCpu = NIL_RTCPUID; 372 pThread->hThread = NIL_RTTHREAD; 373 RTListInit(&pThread->IdleNode); 374 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode); 375 pPool->cCurThreads++; 376 pPool->cThreadsCreated++; 377 378 static uint32_t s_idThread = 0; 379 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/, 380 pPool->enmThreadType, RTTHREADFLAGS_WAITABLE, "REQPT%02u", ++s_idThread); 381 if (RT_SUCCESS(rc)) 382 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs; 383 else 384 { 385 pPool->cCurThreads--; 386 RTListNodeRemove(&pThread->ListNode); 387 RTMemFree(pThread); 388 } 389 } 390 RTCritSectLeave(&pPool->CritSect); 391 392 return VINF_SUCCESS; 393 } 394
Note:
See TracChangeset
for help on using the changeset viewer.