swr: [rasterizer] Avoid segv in thread creation on machines with non-consecutive...
[mesa.git] / src / gallium / drivers / swr / rasterizer / core / threads.cpp
1 /****************************************************************************
2 * Copyright (C) 2014-2015 Intel Corporation. All Rights Reserved.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice (including the next
12 * paragraph) shall be included in all copies or substantial portions of the
13 * Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 * IN THE SOFTWARE.
22 ****************************************************************************/
23
24 #include <stdio.h>
25 #include <thread>
26 #include <algorithm>
27 #include <float.h>
28 #include <vector>
29 #include <utility>
30 #include <fstream>
31 #include <string>
32
33 #if defined(__linux__) || defined(__gnu_linux__)
34 #include <pthread.h>
35 #include <sched.h>
36 #include <unistd.h>
37 #endif
38
39 #include "common/os.h"
40 #include "context.h"
41 #include "frontend.h"
42 #include "backend.h"
43 #include "rasterizer.h"
44 #include "rdtsc_core.h"
45 #include "tilemgr.h"
46
47
48
49
50 // ThreadId
51 struct Core
52 {
53 uint32_t procGroup = 0;
54 std::vector<uint32_t> threadIds;
55 };
56
57 struct NumaNode
58 {
59 std::vector<Core> cores;
60 };
61
62 typedef std::vector<NumaNode> CPUNumaNodes;
63
64 void CalculateProcessorTopology(CPUNumaNodes& out_nodes, uint32_t& out_numThreadsPerProcGroup)
65 {
66 out_nodes.clear();
67 out_numThreadsPerProcGroup = 0;
68
69 #if defined(_WIN32)
70
71 static std::mutex m;
72 std::lock_guard<std::mutex> l(m);
73
74 static SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX buffer[KNOB_MAX_NUM_THREADS];
75 DWORD bufSize = sizeof(buffer);
76
77 BOOL ret = GetLogicalProcessorInformationEx(RelationProcessorCore, buffer, &bufSize);
78 SWR_ASSERT(ret != FALSE, "Failed to get Processor Topology Information");
79
80 uint32_t count = bufSize / buffer->Size;
81 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX pBuffer = buffer;
82
83 for (uint32_t i = 0; i < count; ++i)
84 {
85 SWR_ASSERT(pBuffer->Relationship == RelationProcessorCore);
86 for (uint32_t g = 0; g < pBuffer->Processor.GroupCount; ++g)
87 {
88 auto& gmask = pBuffer->Processor.GroupMask[g];
89 uint32_t threadId = 0;
90 uint32_t procGroup = gmask.Group;
91
92 Core* pCore = nullptr;
93
94 uint32_t numThreads = (uint32_t)_mm_popcount_sizeT(gmask.Mask);
95
96 while (BitScanForwardSizeT((unsigned long*)&threadId, gmask.Mask))
97 {
98 // clear mask
99 gmask.Mask &= ~(KAFFINITY(1) << threadId);
100
101 // Find Numa Node
102 PROCESSOR_NUMBER procNum = {};
103 procNum.Group = WORD(procGroup);
104 procNum.Number = UCHAR(threadId);
105
106 uint32_t numaId = 0;
107 ret = GetNumaProcessorNodeEx(&procNum, (PUSHORT)&numaId);
108 SWR_ASSERT(ret);
109
110 // Store data
111 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
112 auto& numaNode = out_nodes[numaId];
113
114 uint32_t coreId = 0;
115
116 if (nullptr == pCore)
117 {
118 numaNode.cores.push_back(Core());
119 pCore = &numaNode.cores.back();
120 pCore->procGroup = procGroup;
121 #if !defined(_WIN64)
122 coreId = (uint32_t)numaNode.cores.size();
123 if ((coreId * numThreads) >= 32)
124 {
125 // Windows doesn't return threadIds >= 32 for a processor group correctly
126 // when running a 32-bit application.
127 // Just save -1 as the threadId
128 threadId = uint32_t(-1);
129 }
130 #endif
131 }
132 pCore->threadIds.push_back(threadId);
133 if (procGroup == 0)
134 {
135 out_numThreadsPerProcGroup++;
136 }
137 }
138 }
139 pBuffer = PtrAdd(pBuffer, pBuffer->Size);
140 }
141
142
143 #elif defined(__linux__) || defined (__gnu_linux__)
144
145 // Parse /proc/cpuinfo to get full topology
146 std::ifstream input("/proc/cpuinfo");
147 std::string line;
148 char* c;
149 uint32_t threadId = uint32_t(-1);
150 uint32_t coreId = uint32_t(-1);
151 uint32_t numaId = uint32_t(-1);
152
153 while (std::getline(input, line))
154 {
155 if (line.find("processor") != std::string::npos)
156 {
157 if (threadId != uint32_t(-1))
158 {
159 // Save information.
160 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
161 auto& numaNode = out_nodes[numaId];
162 if (numaNode.cores.size() <= coreId) numaNode.cores.resize(coreId + 1);
163 auto& core = numaNode.cores[coreId];
164
165 core.procGroup = coreId;
166 core.threadIds.push_back(threadId);
167
168 out_numThreadsPerProcGroup++;
169 }
170
171 auto data_start = line.find(": ") + 2;
172 threadId = std::strtoul(&line.c_str()[data_start], &c, 10);
173 continue;
174 }
175 if (line.find("core id") != std::string::npos)
176 {
177 auto data_start = line.find(": ") + 2;
178 coreId = std::strtoul(&line.c_str()[data_start], &c, 10);
179 continue;
180 }
181 if (line.find("physical id") != std::string::npos)
182 {
183 auto data_start = line.find(": ") + 2;
184 numaId = std::strtoul(&line.c_str()[data_start], &c, 10);
185 continue;
186 }
187 }
188
189 if (threadId != uint32_t(-1))
190 {
191 // Save information.
192 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
193 auto& numaNode = out_nodes[numaId];
194 if (numaNode.cores.size() <= coreId) numaNode.cores.resize(coreId + 1);
195 auto& core = numaNode.cores[coreId];
196
197 core.procGroup = coreId;
198 core.threadIds.push_back(threadId);
199 out_numThreadsPerProcGroup++;
200 }
201
202 for (uint32_t node = 0; node < out_nodes.size(); node++) {
203 auto& numaNode = out_nodes[node];
204 auto it = numaNode.cores.begin();
205 for ( ; it != numaNode.cores.end(); ) {
206 if (it->threadIds.size() == 0)
207 numaNode.cores.erase(it);
208 else
209 ++it;
210 }
211 }
212
213 #else
214
215 #error Unsupported platform
216
217 #endif
218 }
219
220
221 void bindThread(uint32_t threadId, uint32_t procGroupId = 0, bool bindProcGroup=false)
222 {
223 // Only bind threads when MAX_WORKER_THREADS isn't set.
224 if (KNOB_MAX_WORKER_THREADS && bindProcGroup == false)
225 {
226 return;
227 }
228
229 #if defined(_WIN32)
230 {
231 GROUP_AFFINITY affinity = {};
232 affinity.Group = procGroupId;
233
234 #if !defined(_WIN64)
235 if (threadId >= 32)
236 {
237 // In a 32-bit process on Windows it is impossible to bind
238 // to logical processors 32-63 within a processor group.
239 // In this case set the mask to 0 and let the system assign
240 // the processor. Hopefully it will make smart choices.
241 affinity.Mask = 0;
242 }
243 else
244 #endif
245 {
246 // If KNOB_MAX_WORKER_THREADS is set, only bind to the proc group,
247 // Not the individual HW thread.
248 if (!KNOB_MAX_WORKER_THREADS)
249 {
250 affinity.Mask = KAFFINITY(1) << threadId;
251 }
252 }
253
254 SetThreadGroupAffinity(GetCurrentThread(), &affinity, nullptr);
255 }
256 #else
257 cpu_set_t cpuset;
258 pthread_t thread = pthread_self();
259 CPU_ZERO(&cpuset);
260 CPU_SET(threadId, &cpuset);
261
262 pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
263 #endif
264 }
265
266 INLINE
267 uint64_t GetEnqueuedDraw(SWR_CONTEXT *pContext)
268 {
269 return pContext->dcRing.GetHead();
270 }
271
272 INLINE
273 DRAW_CONTEXT *GetDC(SWR_CONTEXT *pContext, uint64_t drawId)
274 {
275 return &pContext->dcRing[(drawId-1) % KNOB_MAX_DRAWS_IN_FLIGHT];
276 }
277
278 // returns true if dependency not met
279 INLINE
280 bool CheckDependency(SWR_CONTEXT *pContext, DRAW_CONTEXT *pDC, uint64_t lastRetiredDraw)
281 {
282 return (pDC->dependency > lastRetiredDraw);
283 }
284
285 INLINE int64_t CompleteDrawContext(SWR_CONTEXT* pContext, DRAW_CONTEXT* pDC)
286 {
287 int64_t result = InterlockedDecrement64(&pDC->threadsDone);
288 SWR_ASSERT(result >= 0);
289
290 if (result == 0)
291 {
292 // Cleanup memory allocations
293 pDC->pArena->Reset(true);
294 pDC->pTileMgr->initialize();
295 if (pDC->cleanupState)
296 {
297 pDC->pState->pArena->Reset(true);
298 }
299
300 _ReadWriteBarrier();
301
302 pContext->dcRing.Dequeue(); // Remove from tail
303 }
304
305 return result;
306 }
307
308 INLINE bool FindFirstIncompleteDraw(SWR_CONTEXT* pContext, uint64_t& curDrawBE)
309 {
310 // increment our current draw id to the first incomplete draw
311 uint64_t drawEnqueued = GetEnqueuedDraw(pContext);
312 while (curDrawBE < drawEnqueued)
313 {
314 DRAW_CONTEXT *pDC = &pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT];
315
316 // If its not compute and FE is not done then break out of loop.
317 if (!pDC->doneFE && !pDC->isCompute) break;
318
319 bool isWorkComplete = (pDC->isCompute) ?
320 pDC->pDispatch->isWorkComplete() : pDC->pTileMgr->isWorkComplete();
321
322 if (isWorkComplete)
323 {
324 curDrawBE++;
325 CompleteDrawContext(pContext, pDC);
326 }
327 else
328 {
329 break;
330 }
331 }
332
333 // If there are no more incomplete draws then return false.
334 return (curDrawBE >= drawEnqueued) ? false : true;
335 }
336
337 //////////////////////////////////////////////////////////////////////////
338 /// @brief If there is any BE work then go work on it.
339 /// @param pContext - pointer to SWR context.
340 /// @param workerId - The unique worker ID that is assigned to this thread.
341 /// @param curDrawBE - This tracks the draw contexts that this thread has processed. Each worker thread
342 /// has its own curDrawBE counter and this ensures that each worker processes all the
343 /// draws in order.
344 /// @param lockedTiles - This is the set of tiles locked by other threads. Each thread maintains its
345 /// own set and each time it fails to lock a macrotile, because its already locked,
346 /// then it will add that tile to the lockedTiles set. As a worker begins to work
347 /// on future draws the lockedTiles ensure that it doesn't work on tiles that may
348 /// still have work pending in a previous draw. Additionally, the lockedTiles is
349 /// hueristic that can steer a worker back to the same macrotile that it had been
350 /// working on in a previous draw.
351 void WorkOnFifoBE(
352 SWR_CONTEXT *pContext,
353 uint32_t workerId,
354 uint64_t &curDrawBE,
355 TileSet& lockedTiles,
356 uint32_t numaNode,
357 uint32_t numaMask)
358 {
359 // Find the first incomplete draw that has pending work. If no such draw is found then
360 // return. FindFirstIncompleteDraw is responsible for incrementing the curDrawBE.
361 if (FindFirstIncompleteDraw(pContext, curDrawBE) == false)
362 {
363 return;
364 }
365
366 uint64_t lastRetiredDraw = pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT].drawId - 1;
367
368 // Reset our history for locked tiles. We'll have to re-learn which tiles are locked.
369 lockedTiles.clear();
370
371 // Try to work on each draw in order of the available draws in flight.
372 // 1. If we're on curDrawBE, we can work on any macrotile that is available.
373 // 2. If we're trying to work on draws after curDrawBE, we are restricted to
374 // working on those macrotiles that are known to be complete in the prior draw to
375 // maintain order. The locked tiles provides the history to ensures this.
376 for (uint64_t i = curDrawBE; i < GetEnqueuedDraw(pContext); ++i)
377 {
378 DRAW_CONTEXT *pDC = &pContext->dcRing[i % KNOB_MAX_DRAWS_IN_FLIGHT];
379
380 if (pDC->isCompute) return; // We don't look at compute work.
381
382 // First wait for FE to be finished with this draw. This keeps threading model simple
383 // but if there are lots of bubbles between draws then serializing FE and BE may
384 // need to be revisited.
385 if (!pDC->doneFE) return;
386
387 // If this draw is dependent on a previous draw then we need to bail.
388 if (CheckDependency(pContext, pDC, lastRetiredDraw))
389 {
390 return;
391 }
392
393 // Grab the list of all dirty macrotiles. A tile is dirty if it has work queued to it.
394 std::vector<uint32_t> &macroTiles = pDC->pTileMgr->getDirtyTiles();
395
396 for (uint32_t tileID : macroTiles)
397 {
398 // Only work on tiles for for this numa node
399 uint32_t x, y;
400 pDC->pTileMgr->getTileIndices(tileID, x, y);
401 if (((x ^ y) & numaMask) != numaNode)
402 {
403 continue;
404 }
405
406 MacroTileQueue &tile = pDC->pTileMgr->getMacroTileQueue(tileID);
407
408 if (!tile.getNumQueued())
409 {
410 continue;
411 }
412
413 // can only work on this draw if it's not in use by other threads
414 if (lockedTiles.find(tileID) != lockedTiles.end())
415 {
416 continue;
417 }
418
419 if (tile.tryLock())
420 {
421 BE_WORK *pWork;
422
423 RDTSC_START(WorkerFoundWork);
424
425 uint32_t numWorkItems = tile.getNumQueued();
426 SWR_ASSERT(numWorkItems);
427
428 pWork = tile.peek();
429 SWR_ASSERT(pWork);
430 if (pWork->type == DRAW)
431 {
432 pContext->pHotTileMgr->InitializeHotTiles(pContext, pDC, tileID);
433 }
434
435 while ((pWork = tile.peek()) != nullptr)
436 {
437 pWork->pfnWork(pDC, workerId, tileID, &pWork->desc);
438 tile.dequeue();
439 }
440 RDTSC_STOP(WorkerFoundWork, numWorkItems, pDC->drawId);
441
442 _ReadWriteBarrier();
443
444 pDC->pTileMgr->markTileComplete(tileID);
445
446 // Optimization: If the draw is complete and we're the last one to have worked on it then
447 // we can reset the locked list as we know that all previous draws before the next are guaranteed to be complete.
448 if ((curDrawBE == i) && pDC->pTileMgr->isWorkComplete())
449 {
450 // We can increment the current BE and safely move to next draw since we know this draw is complete.
451 curDrawBE++;
452 CompleteDrawContext(pContext, pDC);
453
454 lastRetiredDraw++;
455
456 lockedTiles.clear();
457 break;
458 }
459 }
460 else
461 {
462 // This tile is already locked. So let's add it to our locked tiles set. This way we don't try locking this one again.
463 lockedTiles.insert(tileID);
464 }
465 }
466 }
467 }
468
469 void WorkOnFifoFE(SWR_CONTEXT *pContext, uint32_t workerId, uint64_t &curDrawFE, uint32_t numaNode)
470 {
471 // Try to grab the next DC from the ring
472 uint64_t drawEnqueued = GetEnqueuedDraw(pContext);
473 while (curDrawFE < drawEnqueued)
474 {
475 uint32_t dcSlot = curDrawFE % KNOB_MAX_DRAWS_IN_FLIGHT;
476 DRAW_CONTEXT *pDC = &pContext->dcRing[dcSlot];
477 if (pDC->isCompute || pDC->doneFE || pDC->FeLock)
478 {
479 CompleteDrawContext(pContext, pDC);
480 curDrawFE++;
481 }
482 else
483 {
484 break;
485 }
486 }
487
488 uint64_t curDraw = curDrawFE;
489 while (curDraw < drawEnqueued)
490 {
491 uint32_t dcSlot = curDraw % KNOB_MAX_DRAWS_IN_FLIGHT;
492 DRAW_CONTEXT *pDC = &pContext->dcRing[dcSlot];
493
494 if (!pDC->isCompute && !pDC->FeLock)
495 {
496 uint32_t initial = InterlockedCompareExchange((volatile uint32_t*)&pDC->FeLock, 1, 0);
497 if (initial == 0)
498 {
499 // successfully grabbed the DC, now run the FE
500 pDC->FeWork.pfnWork(pContext, pDC, workerId, &pDC->FeWork.desc);
501
502 _ReadWriteBarrier();
503 pDC->doneFE = true;
504 }
505 }
506 curDraw++;
507 }
508 }
509
510 //////////////////////////////////////////////////////////////////////////
511 /// @brief If there is any compute work then go work on it.
512 /// @param pContext - pointer to SWR context.
513 /// @param workerId - The unique worker ID that is assigned to this thread.
514 /// @param curDrawBE - This tracks the draw contexts that this thread has processed. Each worker thread
515 /// has its own curDrawBE counter and this ensures that each worker processes all the
516 /// draws in order.
517 void WorkOnCompute(
518 SWR_CONTEXT *pContext,
519 uint32_t workerId,
520 uint64_t& curDrawBE)
521 {
522 if (FindFirstIncompleteDraw(pContext, curDrawBE) == false)
523 {
524 return;
525 }
526
527 uint64_t lastRetiredDraw = pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT].drawId - 1;
528
529 DRAW_CONTEXT *pDC = &pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT];
530 if (pDC->isCompute == false) return;
531
532 // check dependencies
533 if (CheckDependency(pContext, pDC, lastRetiredDraw))
534 {
535 return;
536 }
537
538 SWR_ASSERT(pDC->pDispatch != nullptr);
539 DispatchQueue& queue = *pDC->pDispatch;
540
541 // Is there any work remaining?
542 if (queue.getNumQueued() > 0)
543 {
544 uint32_t threadGroupId = 0;
545 while (queue.getWork(threadGroupId))
546 {
547 ProcessComputeBE(pDC, workerId, threadGroupId);
548
549 queue.finishedWork();
550 }
551 }
552 }
553
554 DWORD workerThreadMain(LPVOID pData)
555 {
556 THREAD_DATA *pThreadData = (THREAD_DATA*)pData;
557 SWR_CONTEXT *pContext = pThreadData->pContext;
558 uint32_t threadId = pThreadData->threadId;
559 uint32_t workerId = pThreadData->workerId;
560
561 bindThread(threadId, pThreadData->procGroupId, pThreadData->forceBindProcGroup);
562
563 RDTSC_INIT(threadId);
564
565 uint32_t numaNode = pThreadData->numaId;
566 uint32_t numaMask = pContext->threadPool.numaMask;
567
568 // flush denormals to 0
569 _mm_setcsr(_mm_getcsr() | _MM_FLUSH_ZERO_ON | _MM_DENORMALS_ZERO_ON);
570
571 // Track tiles locked by other threads. If we try to lock a macrotile and find its already
572 // locked then we'll add it to this list so that we don't try and lock it again.
573 TileSet lockedTiles;
574
575 // each worker has the ability to work on any of the queued draws as long as certain
576 // conditions are met. the data associated
577 // with a draw is guaranteed to be active as long as a worker hasn't signaled that he
578 // has moved on to the next draw when he determines there is no more work to do. The api
579 // thread will not increment the head of the dc ring until all workers have moved past the
580 // current head.
581 // the logic to determine what to work on is:
582 // 1- try to work on the FE any draw that is queued. For now there are no dependencies
583 // on the FE work, so any worker can grab any FE and process in parallel. Eventually
584 // we'll need dependency tracking to force serialization on FEs. The worker will try
585 // to pick an FE by atomically incrementing a counter in the swr context. he'll keep
586 // trying until he reaches the tail.
587 // 2- BE work must be done in strict order. we accomplish this today by pulling work off
588 // the oldest draw (ie the head) of the dcRing. the worker can determine if there is
589 // any work left by comparing the total # of binned work items and the total # of completed
590 // work items. If they are equal, then there is no more work to do for this draw, and
591 // the worker can safely increment its oldestDraw counter and move on to the next draw.
592 std::unique_lock<std::mutex> lock(pContext->WaitLock, std::defer_lock);
593
594 auto threadHasWork = [&](uint64_t curDraw) { return curDraw != pContext->dcRing.GetHead(); };
595
596 uint64_t curDrawBE = 0;
597 uint64_t curDrawFE = 0;
598
599 while (pContext->threadPool.inThreadShutdown == false)
600 {
601 uint32_t loop = 0;
602 while (loop++ < KNOB_WORKER_SPIN_LOOP_COUNT && !threadHasWork(curDrawBE))
603 {
604 _mm_pause();
605 }
606
607 if (!threadHasWork(curDrawBE))
608 {
609 lock.lock();
610
611 // check for thread idle condition again under lock
612 if (threadHasWork(curDrawBE))
613 {
614 lock.unlock();
615 continue;
616 }
617
618 if (pContext->threadPool.inThreadShutdown)
619 {
620 lock.unlock();
621 break;
622 }
623
624 RDTSC_START(WorkerWaitForThreadEvent);
625
626 pContext->FifosNotEmpty.wait(lock);
627 lock.unlock();
628
629 RDTSC_STOP(WorkerWaitForThreadEvent, 0, 0);
630
631 if (pContext->threadPool.inThreadShutdown)
632 {
633 break;
634 }
635 }
636
637 RDTSC_START(WorkerWorkOnFifoBE);
638 WorkOnFifoBE(pContext, workerId, curDrawBE, lockedTiles, numaNode, numaMask);
639 RDTSC_STOP(WorkerWorkOnFifoBE, 0, 0);
640
641 WorkOnCompute(pContext, workerId, curDrawBE);
642
643 WorkOnFifoFE(pContext, workerId, curDrawFE, numaNode);
644 }
645
646 return 0;
647 }
648
649 DWORD workerThreadInit(LPVOID pData)
650 {
651 #if defined(_WIN32)
652 __try
653 #endif // _WIN32
654 {
655 return workerThreadMain(pData);
656 }
657
658 #if defined(_WIN32)
659 __except(EXCEPTION_CONTINUE_SEARCH)
660 {
661 }
662
663 #endif // _WIN32
664
665 return 1;
666 }
667
668 void CreateThreadPool(SWR_CONTEXT *pContext, THREAD_POOL *pPool)
669 {
670 bindThread(0);
671
672 CPUNumaNodes nodes;
673 uint32_t numThreadsPerProcGroup = 0;
674 CalculateProcessorTopology(nodes, numThreadsPerProcGroup);
675
676 uint32_t numHWNodes = (uint32_t)nodes.size();
677 uint32_t numHWCoresPerNode = (uint32_t)nodes[0].cores.size();
678 uint32_t numHWHyperThreads = (uint32_t)nodes[0].cores[0].threadIds.size();
679
680 uint32_t numNodes = numHWNodes;
681 uint32_t numCoresPerNode = numHWCoresPerNode;
682 uint32_t numHyperThreads = numHWHyperThreads;
683
684 if (KNOB_MAX_NUMA_NODES)
685 {
686 numNodes = std::min(numNodes, KNOB_MAX_NUMA_NODES);
687 }
688
689 if (KNOB_MAX_CORES_PER_NUMA_NODE)
690 {
691 numCoresPerNode = std::min(numCoresPerNode, KNOB_MAX_CORES_PER_NUMA_NODE);
692 }
693
694 if (KNOB_MAX_THREADS_PER_CORE)
695 {
696 numHyperThreads = std::min(numHyperThreads, KNOB_MAX_THREADS_PER_CORE);
697 }
698
699 // Calculate numThreads
700 uint32_t numThreads = numNodes * numCoresPerNode * numHyperThreads;
701
702 if (KNOB_MAX_WORKER_THREADS)
703 {
704 uint32_t maxHWThreads = numHWNodes * numHWCoresPerNode * numHWHyperThreads;
705 numThreads = std::min(KNOB_MAX_WORKER_THREADS, maxHWThreads);
706 }
707
708 if (numThreads > KNOB_MAX_NUM_THREADS)
709 {
710 printf("WARNING: system thread count %u exceeds max %u, "
711 "performance will be degraded\n",
712 numThreads, KNOB_MAX_NUM_THREADS);
713 }
714
715 uint32_t numAPIReservedThreads = 1;
716
717
718 if (numThreads == 1)
719 {
720 // If only 1 worker threads, try to move it to an available
721 // HW thread. If that fails, use the API thread.
722 if (numCoresPerNode < numHWCoresPerNode)
723 {
724 numCoresPerNode++;
725 }
726 else if (numHyperThreads < numHWHyperThreads)
727 {
728 numHyperThreads++;
729 }
730 else if (numNodes < numHWNodes)
731 {
732 numNodes++;
733 }
734 else
735 {
736 pPool->numThreads = 0;
737 SET_KNOB(SINGLE_THREADED, true);
738 return;
739 }
740 }
741 else
742 {
743 // Save HW threads for the API if we can
744 if (numThreads > numAPIReservedThreads)
745 {
746 numThreads -= numAPIReservedThreads;
747 }
748 else
749 {
750 numAPIReservedThreads = 0;
751 }
752 }
753
754 pPool->numThreads = numThreads;
755 pContext->NumWorkerThreads = pPool->numThreads;
756
757 pPool->inThreadShutdown = false;
758 pPool->pThreadData = (THREAD_DATA *)malloc(pPool->numThreads * sizeof(THREAD_DATA));
759 pPool->numaMask = 0;
760
761 if (KNOB_MAX_WORKER_THREADS)
762 {
763 bool bForceBindProcGroup = (numThreads > numThreadsPerProcGroup);
764 uint32_t numProcGroups = (numThreads + numThreadsPerProcGroup - 1) / numThreadsPerProcGroup;
765 // When MAX_WORKER_THREADS is set we don't bother to bind to specific HW threads
766 // But Windows will still require binding to specific process groups
767 for (uint32_t workerId = 0; workerId < numThreads; ++workerId)
768 {
769 pPool->pThreadData[workerId].workerId = workerId;
770 pPool->pThreadData[workerId].procGroupId = workerId % numProcGroups;
771 pPool->pThreadData[workerId].threadId = 0;
772 pPool->pThreadData[workerId].numaId = 0;
773 pPool->pThreadData[workerId].pContext = pContext;
774 pPool->pThreadData[workerId].forceBindProcGroup = bForceBindProcGroup;
775 pPool->threads[workerId] = new std::thread(workerThreadInit, &pPool->pThreadData[workerId]);
776 }
777 }
778 else
779 {
780 pPool->numaMask = numNodes - 1; // Only works for 2**n numa nodes (1, 2, 4, etc.)
781
782 uint32_t workerId = 0;
783 for (uint32_t n = 0; n < numNodes; ++n)
784 {
785 auto& node = nodes[n];
786 if (node.cores.size() == 0)
787 {
788 continue;
789 }
790
791 uint32_t numCores = numCoresPerNode;
792 for (uint32_t c = 0; c < numCores; ++c)
793 {
794 auto& core = node.cores[c];
795 for (uint32_t t = 0; t < numHyperThreads; ++t)
796 {
797 if (numAPIReservedThreads)
798 {
799 --numAPIReservedThreads;
800 continue;
801 }
802
803 pPool->pThreadData[workerId].workerId = workerId;
804 pPool->pThreadData[workerId].procGroupId = core.procGroup;
805 pPool->pThreadData[workerId].threadId = core.threadIds[t];
806 pPool->pThreadData[workerId].numaId = n;
807 pPool->pThreadData[workerId].pContext = pContext;
808 pPool->threads[workerId] = new std::thread(workerThreadInit, &pPool->pThreadData[workerId]);
809
810 ++workerId;
811 }
812 }
813 }
814 }
815 }
816
817 void DestroyThreadPool(SWR_CONTEXT *pContext, THREAD_POOL *pPool)
818 {
819 if (!KNOB_SINGLE_THREADED)
820 {
821 // Inform threads to finish up
822 std::unique_lock<std::mutex> lock(pContext->WaitLock);
823 pPool->inThreadShutdown = true;
824 _mm_mfence();
825 pContext->FifosNotEmpty.notify_all();
826 lock.unlock();
827
828 // Wait for threads to finish and destroy them
829 for (uint32_t t = 0; t < pPool->numThreads; ++t)
830 {
831 pPool->threads[t]->join();
832 delete(pPool->threads[t]);
833 }
834
835 // Clean up data used by threads
836 free(pPool->pThreadData);
837 }
838 }