Merge remote-tracking branch 'public/master' into vulkan
[mesa.git] / src / gallium / drivers / swr / rasterizer / core / threads.cpp
1 /****************************************************************************
2 * Copyright (C) 2014-2015 Intel Corporation. All Rights Reserved.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice (including the next
12 * paragraph) shall be included in all copies or substantial portions of the
13 * Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 * IN THE SOFTWARE.
22 ****************************************************************************/
23
24 #include <stdio.h>
25 #include <thread>
26 #include <algorithm>
27 #include <float.h>
28 #include <vector>
29 #include <utility>
30 #include <fstream>
31 #include <string>
32
33 #if defined(__linux__) || defined(__gnu_linux__)
34 #include <pthread.h>
35 #include <sched.h>
36 #include <unistd.h>
37 #endif
38
39 #include "common/os.h"
40 #include "context.h"
41 #include "frontend.h"
42 #include "backend.h"
43 #include "rasterizer.h"
44 #include "rdtsc_core.h"
45 #include "tilemgr.h"
46
47
48
49
50 // ThreadId
51 struct Core
52 {
53 uint32_t procGroup = 0;
54 std::vector<uint32_t> threadIds;
55 };
56
57 struct NumaNode
58 {
59 std::vector<Core> cores;
60 };
61
62 typedef std::vector<NumaNode> CPUNumaNodes;
63
64 void CalculateProcessorTopology(CPUNumaNodes& out_nodes, uint32_t& out_numThreadsPerProcGroup)
65 {
66 out_nodes.clear();
67 out_numThreadsPerProcGroup = 0;
68
69 #if defined(_WIN32)
70
71 static std::mutex m;
72 std::lock_guard<std::mutex> l(m);
73
74 static SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX buffer[KNOB_MAX_NUM_THREADS];
75 DWORD bufSize = sizeof(buffer);
76
77 BOOL ret = GetLogicalProcessorInformationEx(RelationProcessorCore, buffer, &bufSize);
78 SWR_ASSERT(ret != FALSE, "Failed to get Processor Topology Information");
79
80 uint32_t count = bufSize / buffer->Size;
81 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX pBuffer = buffer;
82
83 for (uint32_t i = 0; i < count; ++i)
84 {
85 SWR_ASSERT(pBuffer->Relationship == RelationProcessorCore);
86 for (uint32_t g = 0; g < pBuffer->Processor.GroupCount; ++g)
87 {
88 auto& gmask = pBuffer->Processor.GroupMask[g];
89 uint32_t threadId = 0;
90 uint32_t procGroup = gmask.Group;
91
92 Core* pCore = nullptr;
93
94 uint32_t numThreads = (uint32_t)_mm_popcount_sizeT(gmask.Mask);
95
96 while (BitScanForwardSizeT((unsigned long*)&threadId, gmask.Mask))
97 {
98 // clear mask
99 gmask.Mask &= ~(KAFFINITY(1) << threadId);
100
101 // Find Numa Node
102 PROCESSOR_NUMBER procNum = {};
103 procNum.Group = WORD(procGroup);
104 procNum.Number = UCHAR(threadId);
105
106 uint32_t numaId = 0;
107 ret = GetNumaProcessorNodeEx(&procNum, (PUSHORT)&numaId);
108 SWR_ASSERT(ret);
109
110 // Store data
111 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
112 auto& numaNode = out_nodes[numaId];
113
114 uint32_t coreId = 0;
115
116 if (nullptr == pCore)
117 {
118 numaNode.cores.push_back(Core());
119 pCore = &numaNode.cores.back();
120 pCore->procGroup = procGroup;
121 #if !defined(_WIN64)
122 coreId = (uint32_t)numaNode.cores.size();
123 if ((coreId * numThreads) >= 32)
124 {
125 // Windows doesn't return threadIds >= 32 for a processor group correctly
126 // when running a 32-bit application.
127 // Just save -1 as the threadId
128 threadId = uint32_t(-1);
129 }
130 #endif
131 }
132 pCore->threadIds.push_back(threadId);
133 if (procGroup == 0)
134 {
135 out_numThreadsPerProcGroup++;
136 }
137 }
138 }
139 pBuffer = PtrAdd(pBuffer, pBuffer->Size);
140 }
141
142
143 #elif defined(__linux__) || defined (__gnu_linux__)
144
145 // Parse /proc/cpuinfo to get full topology
146 std::ifstream input("/proc/cpuinfo");
147 std::string line;
148 char* c;
149 uint32_t threadId = uint32_t(-1);
150 uint32_t coreId = uint32_t(-1);
151 uint32_t numaId = uint32_t(-1);
152
153 while (std::getline(input, line))
154 {
155 if (line.find("processor") != std::string::npos)
156 {
157 if (threadId != uint32_t(-1))
158 {
159 // Save information.
160 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
161 auto& numaNode = out_nodes[numaId];
162 if (numaNode.cores.size() <= coreId) numaNode.cores.resize(coreId + 1);
163 auto& core = numaNode.cores[coreId];
164
165 core.procGroup = coreId;
166 core.threadIds.push_back(threadId);
167
168 out_numThreadsPerProcGroup++;
169 }
170
171 auto data_start = line.find(": ") + 2;
172 threadId = std::strtoul(&line.c_str()[data_start], &c, 10);
173 continue;
174 }
175 if (line.find("core id") != std::string::npos)
176 {
177 auto data_start = line.find(": ") + 2;
178 coreId = std::strtoul(&line.c_str()[data_start], &c, 10);
179 continue;
180 }
181 if (line.find("physical id") != std::string::npos)
182 {
183 auto data_start = line.find(": ") + 2;
184 numaId = std::strtoul(&line.c_str()[data_start], &c, 10);
185 continue;
186 }
187 }
188
189 if (threadId != uint32_t(-1))
190 {
191 // Save information.
192 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
193 auto& numaNode = out_nodes[numaId];
194 if (numaNode.cores.size() <= coreId) numaNode.cores.resize(coreId + 1);
195 auto& core = numaNode.cores[coreId];
196
197 core.procGroup = coreId;
198 core.threadIds.push_back(threadId);
199 out_numThreadsPerProcGroup++;
200 }
201
202 for (uint32_t node = 0; node < out_nodes.size(); node++) {
203 auto& numaNode = out_nodes[node];
204 auto it = numaNode.cores.begin();
205 for ( ; it != numaNode.cores.end(); ) {
206 if (it->threadIds.size() == 0)
207 numaNode.cores.erase(it);
208 else
209 ++it;
210 }
211 }
212
213 #else
214
215 #error Unsupported platform
216
217 #endif
218 }
219
220
221 void bindThread(uint32_t threadId, uint32_t procGroupId = 0, bool bindProcGroup=false)
222 {
223 // Only bind threads when MAX_WORKER_THREADS isn't set.
224 if (KNOB_MAX_WORKER_THREADS && bindProcGroup == false)
225 {
226 return;
227 }
228
229 #if defined(_WIN32)
230 {
231 GROUP_AFFINITY affinity = {};
232 affinity.Group = procGroupId;
233
234 #if !defined(_WIN64)
235 if (threadId >= 32)
236 {
237 // In a 32-bit process on Windows it is impossible to bind
238 // to logical processors 32-63 within a processor group.
239 // In this case set the mask to 0 and let the system assign
240 // the processor. Hopefully it will make smart choices.
241 affinity.Mask = 0;
242 }
243 else
244 #endif
245 {
246 // If KNOB_MAX_WORKER_THREADS is set, only bind to the proc group,
247 // Not the individual HW thread.
248 if (!KNOB_MAX_WORKER_THREADS)
249 {
250 affinity.Mask = KAFFINITY(1) << threadId;
251 }
252 }
253
254 SetThreadGroupAffinity(GetCurrentThread(), &affinity, nullptr);
255 }
256 #else
257 cpu_set_t cpuset;
258 pthread_t thread = pthread_self();
259 CPU_ZERO(&cpuset);
260 CPU_SET(threadId, &cpuset);
261
262 pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
263 #endif
264 }
265
266 INLINE
267 uint64_t GetEnqueuedDraw(SWR_CONTEXT *pContext)
268 {
269 return pContext->dcRing.GetHead();
270 }
271
272 INLINE
273 DRAW_CONTEXT *GetDC(SWR_CONTEXT *pContext, uint64_t drawId)
274 {
275 return &pContext->dcRing[(drawId-1) % KNOB_MAX_DRAWS_IN_FLIGHT];
276 }
277
278 // returns true if dependency not met
279 INLINE
280 bool CheckDependency(SWR_CONTEXT *pContext, DRAW_CONTEXT *pDC, uint64_t lastRetiredDraw)
281 {
282 return (pDC->dependency > lastRetiredDraw);
283 }
284
285 INLINE int64_t CompleteDrawContext(SWR_CONTEXT* pContext, DRAW_CONTEXT* pDC)
286 {
287 int64_t result = InterlockedDecrement64(&pDC->threadsDone);
288 SWR_ASSERT(result >= 0);
289
290 if (result == 0)
291 {
292 // Cleanup memory allocations
293 pDC->pArena->Reset(true);
294 if (!pDC->isCompute)
295 {
296 pDC->pTileMgr->initialize();
297 }
298 if (pDC->cleanupState)
299 {
300 pDC->pState->pArena->Reset(true);
301 }
302
303 _ReadWriteBarrier();
304
305 pContext->dcRing.Dequeue(); // Remove from tail
306 }
307
308 return result;
309 }
310
311 INLINE bool FindFirstIncompleteDraw(SWR_CONTEXT* pContext, uint64_t& curDrawBE, uint64_t& drawEnqueued)
312 {
313 // increment our current draw id to the first incomplete draw
314 drawEnqueued = GetEnqueuedDraw(pContext);
315 while (curDrawBE < drawEnqueued)
316 {
317 DRAW_CONTEXT *pDC = &pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT];
318
319 // If its not compute and FE is not done then break out of loop.
320 if (!pDC->doneFE && !pDC->isCompute) break;
321
322 bool isWorkComplete = pDC->isCompute ?
323 pDC->pDispatch->isWorkComplete() :
324 pDC->pTileMgr->isWorkComplete();
325
326 if (isWorkComplete)
327 {
328 curDrawBE++;
329 CompleteDrawContext(pContext, pDC);
330 }
331 else
332 {
333 break;
334 }
335 }
336
337 // If there are no more incomplete draws then return false.
338 return (curDrawBE >= drawEnqueued) ? false : true;
339 }
340
341 //////////////////////////////////////////////////////////////////////////
342 /// @brief If there is any BE work then go work on it.
343 /// @param pContext - pointer to SWR context.
344 /// @param workerId - The unique worker ID that is assigned to this thread.
345 /// @param curDrawBE - This tracks the draw contexts that this thread has processed. Each worker thread
346 /// has its own curDrawBE counter and this ensures that each worker processes all the
347 /// draws in order.
348 /// @param lockedTiles - This is the set of tiles locked by other threads. Each thread maintains its
349 /// own set and each time it fails to lock a macrotile, because its already locked,
350 /// then it will add that tile to the lockedTiles set. As a worker begins to work
351 /// on future draws the lockedTiles ensure that it doesn't work on tiles that may
352 /// still have work pending in a previous draw. Additionally, the lockedTiles is
353 /// hueristic that can steer a worker back to the same macrotile that it had been
354 /// working on in a previous draw.
355 void WorkOnFifoBE(
356 SWR_CONTEXT *pContext,
357 uint32_t workerId,
358 uint64_t &curDrawBE,
359 TileSet& lockedTiles,
360 uint32_t numaNode,
361 uint32_t numaMask)
362 {
363 // Find the first incomplete draw that has pending work. If no such draw is found then
364 // return. FindFirstIncompleteDraw is responsible for incrementing the curDrawBE.
365 uint64_t drawEnqueued = 0;
366 if (FindFirstIncompleteDraw(pContext, curDrawBE, drawEnqueued) == false)
367 {
368 return;
369 }
370
371 uint64_t lastRetiredDraw = pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT].drawId - 1;
372
373 // Reset our history for locked tiles. We'll have to re-learn which tiles are locked.
374 lockedTiles.clear();
375
376 // Try to work on each draw in order of the available draws in flight.
377 // 1. If we're on curDrawBE, we can work on any macrotile that is available.
378 // 2. If we're trying to work on draws after curDrawBE, we are restricted to
379 // working on those macrotiles that are known to be complete in the prior draw to
380 // maintain order. The locked tiles provides the history to ensures this.
381 for (uint64_t i = curDrawBE; i < drawEnqueued; ++i)
382 {
383 DRAW_CONTEXT *pDC = &pContext->dcRing[i % KNOB_MAX_DRAWS_IN_FLIGHT];
384
385 if (pDC->isCompute) return; // We don't look at compute work.
386
387 // First wait for FE to be finished with this draw. This keeps threading model simple
388 // but if there are lots of bubbles between draws then serializing FE and BE may
389 // need to be revisited.
390 if (!pDC->doneFE) return;
391
392 // If this draw is dependent on a previous draw then we need to bail.
393 if (CheckDependency(pContext, pDC, lastRetiredDraw))
394 {
395 return;
396 }
397
398 // Grab the list of all dirty macrotiles. A tile is dirty if it has work queued to it.
399 std::vector<uint32_t> &macroTiles = pDC->pTileMgr->getDirtyTiles();
400
401 for (uint32_t tileID : macroTiles)
402 {
403 // Only work on tiles for for this numa node
404 uint32_t x, y;
405 pDC->pTileMgr->getTileIndices(tileID, x, y);
406 if (((x ^ y) & numaMask) != numaNode)
407 {
408 continue;
409 }
410
411 MacroTileQueue &tile = pDC->pTileMgr->getMacroTileQueue(tileID);
412
413 if (!tile.getNumQueued())
414 {
415 continue;
416 }
417
418 // can only work on this draw if it's not in use by other threads
419 if (lockedTiles.find(tileID) != lockedTiles.end())
420 {
421 continue;
422 }
423
424 if (tile.tryLock())
425 {
426 BE_WORK *pWork;
427
428 RDTSC_START(WorkerFoundWork);
429
430 uint32_t numWorkItems = tile.getNumQueued();
431 SWR_ASSERT(numWorkItems);
432
433 pWork = tile.peek();
434 SWR_ASSERT(pWork);
435 if (pWork->type == DRAW)
436 {
437 pContext->pHotTileMgr->InitializeHotTiles(pContext, pDC, tileID);
438 }
439
440 while ((pWork = tile.peek()) != nullptr)
441 {
442 pWork->pfnWork(pDC, workerId, tileID, &pWork->desc);
443 tile.dequeue();
444 }
445 RDTSC_STOP(WorkerFoundWork, numWorkItems, pDC->drawId);
446
447 _ReadWriteBarrier();
448
449 pDC->pTileMgr->markTileComplete(tileID);
450
451 // Optimization: If the draw is complete and we're the last one to have worked on it then
452 // we can reset the locked list as we know that all previous draws before the next are guaranteed to be complete.
453 if ((curDrawBE == i) && pDC->pTileMgr->isWorkComplete())
454 {
455 // We can increment the current BE and safely move to next draw since we know this draw is complete.
456 curDrawBE++;
457 CompleteDrawContext(pContext, pDC);
458
459 lastRetiredDraw++;
460
461 lockedTiles.clear();
462 break;
463 }
464 }
465 else
466 {
467 // This tile is already locked. So let's add it to our locked tiles set. This way we don't try locking this one again.
468 lockedTiles.insert(tileID);
469 }
470 }
471 }
472 }
473
474 void WorkOnFifoFE(SWR_CONTEXT *pContext, uint32_t workerId, uint64_t &curDrawFE)
475 {
476 // Try to grab the next DC from the ring
477 uint64_t drawEnqueued = GetEnqueuedDraw(pContext);
478 while (curDrawFE < drawEnqueued)
479 {
480 uint32_t dcSlot = curDrawFE % KNOB_MAX_DRAWS_IN_FLIGHT;
481 DRAW_CONTEXT *pDC = &pContext->dcRing[dcSlot];
482 if (pDC->isCompute || pDC->doneFE || pDC->FeLock)
483 {
484 CompleteDrawContext(pContext, pDC);
485 curDrawFE++;
486 }
487 else
488 {
489 break;
490 }
491 }
492
493 uint64_t curDraw = curDrawFE;
494 while (curDraw < drawEnqueued)
495 {
496 uint32_t dcSlot = curDraw % KNOB_MAX_DRAWS_IN_FLIGHT;
497 DRAW_CONTEXT *pDC = &pContext->dcRing[dcSlot];
498
499 if (!pDC->isCompute && !pDC->FeLock)
500 {
501 uint32_t initial = InterlockedCompareExchange((volatile uint32_t*)&pDC->FeLock, 1, 0);
502 if (initial == 0)
503 {
504 // successfully grabbed the DC, now run the FE
505 pDC->FeWork.pfnWork(pContext, pDC, workerId, &pDC->FeWork.desc);
506
507 _ReadWriteBarrier();
508 pDC->doneFE = true;
509 }
510 }
511 curDraw++;
512 }
513 }
514
515 //////////////////////////////////////////////////////////////////////////
516 /// @brief If there is any compute work then go work on it.
517 /// @param pContext - pointer to SWR context.
518 /// @param workerId - The unique worker ID that is assigned to this thread.
519 /// @param curDrawBE - This tracks the draw contexts that this thread has processed. Each worker thread
520 /// has its own curDrawBE counter and this ensures that each worker processes all the
521 /// draws in order.
522 void WorkOnCompute(
523 SWR_CONTEXT *pContext,
524 uint32_t workerId,
525 uint64_t& curDrawBE)
526 {
527 uint64_t drawEnqueued = 0;
528 if (FindFirstIncompleteDraw(pContext, curDrawBE, drawEnqueued) == false)
529 {
530 return;
531 }
532
533 uint64_t lastRetiredDraw = pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT].drawId - 1;
534
535 for (uint64_t i = curDrawBE; curDrawBE < drawEnqueued; ++i)
536 {
537 DRAW_CONTEXT *pDC = &pContext->dcRing[i % KNOB_MAX_DRAWS_IN_FLIGHT];
538 if (pDC->isCompute == false) return;
539
540 // check dependencies
541 if (CheckDependency(pContext, pDC, lastRetiredDraw))
542 {
543 return;
544 }
545
546 SWR_ASSERT(pDC->pDispatch != nullptr);
547 DispatchQueue& queue = *pDC->pDispatch;
548
549 // Is there any work remaining?
550 if (queue.getNumQueued() > 0)
551 {
552 void* pSpillFillBuffer = nullptr;
553 uint32_t threadGroupId = 0;
554 while (queue.getWork(threadGroupId))
555 {
556 ProcessComputeBE(pDC, workerId, threadGroupId, pSpillFillBuffer);
557
558 queue.finishedWork();
559 }
560 }
561 }
562 }
563
564 template<bool IsFEThread, bool IsBEThread>
565 DWORD workerThreadMain(LPVOID pData)
566 {
567 THREAD_DATA *pThreadData = (THREAD_DATA*)pData;
568 SWR_CONTEXT *pContext = pThreadData->pContext;
569 uint32_t threadId = pThreadData->threadId;
570 uint32_t workerId = pThreadData->workerId;
571
572 bindThread(threadId, pThreadData->procGroupId, pThreadData->forceBindProcGroup);
573
574 RDTSC_INIT(threadId);
575
576 uint32_t numaNode = pThreadData->numaId;
577 uint32_t numaMask = pContext->threadPool.numaMask;
578
579 // flush denormals to 0
580 _mm_setcsr(_mm_getcsr() | _MM_FLUSH_ZERO_ON | _MM_DENORMALS_ZERO_ON);
581
582 // Track tiles locked by other threads. If we try to lock a macrotile and find its already
583 // locked then we'll add it to this list so that we don't try and lock it again.
584 TileSet lockedTiles;
585
586 // each worker has the ability to work on any of the queued draws as long as certain
587 // conditions are met. the data associated
588 // with a draw is guaranteed to be active as long as a worker hasn't signaled that he
589 // has moved on to the next draw when he determines there is no more work to do. The api
590 // thread will not increment the head of the dc ring until all workers have moved past the
591 // current head.
592 // the logic to determine what to work on is:
593 // 1- try to work on the FE any draw that is queued. For now there are no dependencies
594 // on the FE work, so any worker can grab any FE and process in parallel. Eventually
595 // we'll need dependency tracking to force serialization on FEs. The worker will try
596 // to pick an FE by atomically incrementing a counter in the swr context. he'll keep
597 // trying until he reaches the tail.
598 // 2- BE work must be done in strict order. we accomplish this today by pulling work off
599 // the oldest draw (ie the head) of the dcRing. the worker can determine if there is
600 // any work left by comparing the total # of binned work items and the total # of completed
601 // work items. If they are equal, then there is no more work to do for this draw, and
602 // the worker can safely increment its oldestDraw counter and move on to the next draw.
603 std::unique_lock<std::mutex> lock(pContext->WaitLock, std::defer_lock);
604
605 auto threadHasWork = [&](uint64_t curDraw) { return curDraw != pContext->dcRing.GetHead(); };
606
607 uint64_t curDrawBE = 0;
608 uint64_t curDrawFE = 0;
609
610 while (pContext->threadPool.inThreadShutdown == false)
611 {
612 uint32_t loop = 0;
613 while (loop++ < KNOB_WORKER_SPIN_LOOP_COUNT && !threadHasWork(curDrawBE))
614 {
615 _mm_pause();
616 }
617
618 if (!threadHasWork(curDrawBE))
619 {
620 lock.lock();
621
622 // check for thread idle condition again under lock
623 if (threadHasWork(curDrawBE))
624 {
625 lock.unlock();
626 continue;
627 }
628
629 if (pContext->threadPool.inThreadShutdown)
630 {
631 lock.unlock();
632 break;
633 }
634
635 RDTSC_START(WorkerWaitForThreadEvent);
636
637 pContext->FifosNotEmpty.wait(lock);
638 lock.unlock();
639
640 RDTSC_STOP(WorkerWaitForThreadEvent, 0, 0);
641
642 if (pContext->threadPool.inThreadShutdown)
643 {
644 break;
645 }
646 }
647
648 if (IsBEThread)
649 {
650 RDTSC_START(WorkerWorkOnFifoBE);
651 WorkOnFifoBE(pContext, workerId, curDrawBE, lockedTiles, numaNode, numaMask);
652 RDTSC_STOP(WorkerWorkOnFifoBE, 0, 0);
653
654 WorkOnCompute(pContext, workerId, curDrawBE);
655 }
656
657 if (IsFEThread)
658 {
659 WorkOnFifoFE(pContext, workerId, curDrawFE);
660
661 if (!IsBEThread)
662 {
663 curDrawBE = curDrawFE;
664 }
665 }
666 }
667
668 return 0;
669 }
670 template<> DWORD workerThreadMain<false, false>(LPVOID) = delete;
671
672 template <bool IsFEThread, bool IsBEThread>
673 DWORD workerThreadInit(LPVOID pData)
674 {
675 #if defined(_WIN32)
676 __try
677 #endif // _WIN32
678 {
679 return workerThreadMain<IsFEThread, IsBEThread>(pData);
680 }
681
682 #if defined(_WIN32)
683 __except(EXCEPTION_CONTINUE_SEARCH)
684 {
685 }
686
687 #endif // _WIN32
688
689 return 1;
690 }
691 template<> DWORD workerThreadInit<false, false>(LPVOID pData) = delete;
692
693 void CreateThreadPool(SWR_CONTEXT *pContext, THREAD_POOL *pPool)
694 {
695 bindThread(0);
696
697 CPUNumaNodes nodes;
698 uint32_t numThreadsPerProcGroup = 0;
699 CalculateProcessorTopology(nodes, numThreadsPerProcGroup);
700
701 uint32_t numHWNodes = (uint32_t)nodes.size();
702 uint32_t numHWCoresPerNode = (uint32_t)nodes[0].cores.size();
703 uint32_t numHWHyperThreads = (uint32_t)nodes[0].cores[0].threadIds.size();
704
705 uint32_t numNodes = numHWNodes;
706 uint32_t numCoresPerNode = numHWCoresPerNode;
707 uint32_t numHyperThreads = numHWHyperThreads;
708
709 if (KNOB_MAX_WORKER_THREADS)
710 {
711 SET_KNOB(HYPERTHREADED_FE, false);
712 }
713
714 if (KNOB_HYPERTHREADED_FE)
715 {
716 SET_KNOB(MAX_THREADS_PER_CORE, 0);
717 }
718
719 if (KNOB_MAX_NUMA_NODES)
720 {
721 numNodes = std::min(numNodes, KNOB_MAX_NUMA_NODES);
722 }
723
724 if (KNOB_MAX_CORES_PER_NUMA_NODE)
725 {
726 numCoresPerNode = std::min(numCoresPerNode, KNOB_MAX_CORES_PER_NUMA_NODE);
727 }
728
729 if (KNOB_MAX_THREADS_PER_CORE)
730 {
731 numHyperThreads = std::min(numHyperThreads, KNOB_MAX_THREADS_PER_CORE);
732 }
733
734 if (numHyperThreads < 2)
735 {
736 SET_KNOB(HYPERTHREADED_FE, false);
737 }
738
739 // Calculate numThreads
740 uint32_t numThreads = numNodes * numCoresPerNode * numHyperThreads;
741
742 if (KNOB_MAX_WORKER_THREADS)
743 {
744 uint32_t maxHWThreads = numHWNodes * numHWCoresPerNode * numHWHyperThreads;
745 numThreads = std::min(KNOB_MAX_WORKER_THREADS, maxHWThreads);
746 }
747
748 if (numThreads > KNOB_MAX_NUM_THREADS)
749 {
750 printf("WARNING: system thread count %u exceeds max %u, "
751 "performance will be degraded\n",
752 numThreads, KNOB_MAX_NUM_THREADS);
753 }
754
755 uint32_t numAPIReservedThreads = 1;
756
757
758 if (numThreads == 1)
759 {
760 // If only 1 worker threads, try to move it to an available
761 // HW thread. If that fails, use the API thread.
762 if (numCoresPerNode < numHWCoresPerNode)
763 {
764 numCoresPerNode++;
765 }
766 else if (numHyperThreads < numHWHyperThreads)
767 {
768 numHyperThreads++;
769 }
770 else if (numNodes < numHWNodes)
771 {
772 numNodes++;
773 }
774 else
775 {
776 pPool->numThreads = 0;
777 SET_KNOB(SINGLE_THREADED, true);
778 return;
779 }
780 }
781 else
782 {
783 // Save HW threads for the API if we can
784 if (numThreads > numAPIReservedThreads)
785 {
786 numThreads -= numAPIReservedThreads;
787 }
788 else
789 {
790 numAPIReservedThreads = 0;
791 }
792 }
793
794 pPool->numThreads = numThreads;
795 pContext->NumWorkerThreads = pPool->numThreads;
796
797 pPool->inThreadShutdown = false;
798 pPool->pThreadData = (THREAD_DATA *)malloc(pPool->numThreads * sizeof(THREAD_DATA));
799 pPool->numaMask = 0;
800
801 if (KNOB_MAX_WORKER_THREADS)
802 {
803 bool bForceBindProcGroup = (numThreads > numThreadsPerProcGroup);
804 uint32_t numProcGroups = (numThreads + numThreadsPerProcGroup - 1) / numThreadsPerProcGroup;
805 // When MAX_WORKER_THREADS is set we don't bother to bind to specific HW threads
806 // But Windows will still require binding to specific process groups
807 for (uint32_t workerId = 0; workerId < numThreads; ++workerId)
808 {
809 pPool->pThreadData[workerId].workerId = workerId;
810 pPool->pThreadData[workerId].procGroupId = workerId % numProcGroups;
811 pPool->pThreadData[workerId].threadId = 0;
812 pPool->pThreadData[workerId].numaId = 0;
813 pPool->pThreadData[workerId].coreId = 0;
814 pPool->pThreadData[workerId].htId = 0;
815 pPool->pThreadData[workerId].pContext = pContext;
816 pPool->pThreadData[workerId].forceBindProcGroup = bForceBindProcGroup;
817 pPool->threads[workerId] = new std::thread(workerThreadInit<true, true>, &pPool->pThreadData[workerId]);
818
819 pContext->NumBEThreads++;
820 pContext->NumFEThreads++;
821 }
822 }
823 else
824 {
825 pPool->numaMask = numNodes - 1; // Only works for 2**n numa nodes (1, 2, 4, etc.)
826
827 uint32_t workerId = 0;
828 for (uint32_t n = 0; n < numNodes; ++n)
829 {
830 auto& node = nodes[n];
831 if (node.cores.size() == 0)
832 {
833 continue;
834 }
835
836 uint32_t numCores = numCoresPerNode;
837 for (uint32_t c = 0; c < numCores; ++c)
838 {
839 auto& core = node.cores[c];
840 for (uint32_t t = 0; t < numHyperThreads; ++t)
841 {
842 if (numAPIReservedThreads)
843 {
844 --numAPIReservedThreads;
845 continue;
846 }
847
848 pPool->pThreadData[workerId].workerId = workerId;
849 pPool->pThreadData[workerId].procGroupId = core.procGroup;
850 pPool->pThreadData[workerId].threadId = core.threadIds[t];
851 pPool->pThreadData[workerId].numaId = n;
852 pPool->pThreadData[workerId].coreId = c;
853 pPool->pThreadData[workerId].htId = t;
854 pPool->pThreadData[workerId].pContext = pContext;
855
856 if (KNOB_HYPERTHREADED_FE)
857 {
858 if (t == 0)
859 {
860 pContext->NumBEThreads++;
861 pPool->threads[workerId] = new std::thread(workerThreadInit<false, true>, &pPool->pThreadData[workerId]);
862 }
863 else
864 {
865 pContext->NumFEThreads++;
866 pPool->threads[workerId] = new std::thread(workerThreadInit<true, false>, &pPool->pThreadData[workerId]);
867 }
868 }
869 else
870 {
871 pPool->threads[workerId] = new std::thread(workerThreadInit<true, true>, &pPool->pThreadData[workerId]);
872 pContext->NumBEThreads++;
873 pContext->NumFEThreads++;
874 }
875
876 ++workerId;
877 }
878 }
879 }
880 }
881 }
882
883 void DestroyThreadPool(SWR_CONTEXT *pContext, THREAD_POOL *pPool)
884 {
885 if (!KNOB_SINGLE_THREADED)
886 {
887 // Inform threads to finish up
888 std::unique_lock<std::mutex> lock(pContext->WaitLock);
889 pPool->inThreadShutdown = true;
890 _mm_mfence();
891 pContext->FifosNotEmpty.notify_all();
892 lock.unlock();
893
894 // Wait for threads to finish and destroy them
895 for (uint32_t t = 0; t < pPool->numThreads; ++t)
896 {
897 pPool->threads[t]->join();
898 delete(pPool->threads[t]);
899 }
900
901 // Clean up data used by threads
902 free(pPool->pThreadData);
903 }
904 }