Merge remote-tracking branch 'public/master' into vulkan
[mesa.git] / src / gallium / drivers / swr / rasterizer / core / threads.cpp
1 /****************************************************************************
2 * Copyright (C) 2014-2015 Intel Corporation. All Rights Reserved.
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice (including the next
12 * paragraph) shall be included in all copies or substantial portions of the
13 * Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 * IN THE SOFTWARE.
22 ****************************************************************************/
23
24 #include <stdio.h>
25 #include <thread>
26 #include <algorithm>
27 #include <float.h>
28 #include <vector>
29 #include <utility>
30 #include <fstream>
31 #include <string>
32
33 #if defined(__linux__) || defined(__gnu_linux__)
34 #include <pthread.h>
35 #include <sched.h>
36 #include <unistd.h>
37 #endif
38
39 #include "common/os.h"
40 #include "context.h"
41 #include "frontend.h"
42 #include "backend.h"
43 #include "rasterizer.h"
44 #include "rdtsc_core.h"
45 #include "tilemgr.h"
46
47
48
49
50 // ThreadId
51 struct Core
52 {
53 uint32_t procGroup = 0;
54 std::vector<uint32_t> threadIds;
55 };
56
57 struct NumaNode
58 {
59 std::vector<Core> cores;
60 };
61
62 typedef std::vector<NumaNode> CPUNumaNodes;
63
64 void CalculateProcessorTopology(CPUNumaNodes& out_nodes, uint32_t& out_numThreadsPerProcGroup)
65 {
66 out_nodes.clear();
67 out_numThreadsPerProcGroup = 0;
68
69 #if defined(_WIN32)
70
71 SYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX buffer[KNOB_MAX_NUM_THREADS];
72 DWORD bufSize = sizeof(buffer);
73
74 BOOL ret = GetLogicalProcessorInformationEx(RelationProcessorCore, buffer, &bufSize);
75 SWR_ASSERT(ret != FALSE, "Failed to get Processor Topology Information");
76
77 uint32_t count = bufSize / buffer->Size;
78 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX pBuffer = buffer;
79
80 for (uint32_t i = 0; i < count; ++i)
81 {
82 SWR_ASSERT(pBuffer->Relationship == RelationProcessorCore);
83 for (uint32_t g = 0; g < pBuffer->Processor.GroupCount; ++g)
84 {
85 auto& gmask = pBuffer->Processor.GroupMask[g];
86 uint32_t threadId = 0;
87 uint32_t procGroup = gmask.Group;
88
89 Core* pCore = nullptr;
90
91 uint32_t numThreads = (uint32_t)_mm_popcount_sizeT(gmask.Mask);
92
93 while (BitScanForwardSizeT((unsigned long*)&threadId, gmask.Mask))
94 {
95 // clear mask
96 gmask.Mask &= ~(KAFFINITY(1) << threadId);
97
98 // Find Numa Node
99 PROCESSOR_NUMBER procNum = {};
100 procNum.Group = WORD(procGroup);
101 procNum.Number = UCHAR(threadId);
102
103 uint32_t numaId = 0;
104 ret = GetNumaProcessorNodeEx(&procNum, (PUSHORT)&numaId);
105 SWR_ASSERT(ret);
106
107 // Store data
108 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
109 auto& numaNode = out_nodes[numaId];
110
111 uint32_t coreId = 0;
112
113 if (nullptr == pCore)
114 {
115 numaNode.cores.push_back(Core());
116 pCore = &numaNode.cores.back();
117 pCore->procGroup = procGroup;
118 #if !defined(_WIN64)
119 coreId = (uint32_t)numaNode.cores.size();
120 if ((coreId * numThreads) >= 32)
121 {
122 // Windows doesn't return threadIds >= 32 for a processor group correctly
123 // when running a 32-bit application.
124 // Just save -1 as the threadId
125 threadId = uint32_t(-1);
126 }
127 #endif
128 }
129 pCore->threadIds.push_back(threadId);
130 if (procGroup == 0)
131 {
132 out_numThreadsPerProcGroup++;
133 }
134 }
135 }
136 pBuffer = PtrAdd(pBuffer, pBuffer->Size);
137 }
138
139
140 #elif defined(__linux__) || defined (__gnu_linux__)
141
142 // Parse /proc/cpuinfo to get full topology
143 std::ifstream input("/proc/cpuinfo");
144 std::string line;
145 char* c;
146 uint32_t threadId = uint32_t(-1);
147 uint32_t coreId = uint32_t(-1);
148 uint32_t numaId = uint32_t(-1);
149
150 while (std::getline(input, line))
151 {
152 if (line.find("processor") != std::string::npos)
153 {
154 if (threadId != uint32_t(-1))
155 {
156 // Save information.
157 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
158 auto& numaNode = out_nodes[numaId];
159 if (numaNode.cores.size() <= coreId) numaNode.cores.resize(coreId + 1);
160 auto& core = numaNode.cores[coreId];
161
162 core.procGroup = coreId;
163 core.threadIds.push_back(threadId);
164
165 out_numThreadsPerProcGroup++;
166 }
167
168 auto data_start = line.find(": ") + 2;
169 threadId = std::strtoul(&line.c_str()[data_start], &c, 10);
170 continue;
171 }
172 if (line.find("core id") != std::string::npos)
173 {
174 auto data_start = line.find(": ") + 2;
175 coreId = std::strtoul(&line.c_str()[data_start], &c, 10);
176 continue;
177 }
178 if (line.find("physical id") != std::string::npos)
179 {
180 auto data_start = line.find(": ") + 2;
181 numaId = std::strtoul(&line.c_str()[data_start], &c, 10);
182 continue;
183 }
184 }
185
186 if (threadId != uint32_t(-1))
187 {
188 // Save information.
189 if (out_nodes.size() <= numaId) out_nodes.resize(numaId + 1);
190 auto& numaNode = out_nodes[numaId];
191 if (numaNode.cores.size() <= coreId) numaNode.cores.resize(coreId + 1);
192 auto& core = numaNode.cores[coreId];
193
194 core.procGroup = coreId;
195 core.threadIds.push_back(threadId);
196 out_numThreadsPerProcGroup++;
197 }
198
199 for (uint32_t node = 0; node < out_nodes.size(); node++) {
200 auto& numaNode = out_nodes[node];
201 auto it = numaNode.cores.begin();
202 for ( ; it != numaNode.cores.end(); ) {
203 if (it->threadIds.size() == 0)
204 numaNode.cores.erase(it);
205 else
206 ++it;
207 }
208 }
209
210 #else
211
212 #error Unsupported platform
213
214 #endif
215 }
216
217
218 void bindThread(uint32_t threadId, uint32_t procGroupId = 0, bool bindProcGroup=false)
219 {
220 // Only bind threads when MAX_WORKER_THREADS isn't set.
221 if (KNOB_MAX_WORKER_THREADS && bindProcGroup == false)
222 {
223 return;
224 }
225
226 #if defined(_WIN32)
227 {
228 GROUP_AFFINITY affinity = {};
229 affinity.Group = procGroupId;
230
231 #if !defined(_WIN64)
232 if (threadId >= 32)
233 {
234 // In a 32-bit process on Windows it is impossible to bind
235 // to logical processors 32-63 within a processor group.
236 // In this case set the mask to 0 and let the system assign
237 // the processor. Hopefully it will make smart choices.
238 affinity.Mask = 0;
239 }
240 else
241 #endif
242 {
243 // If KNOB_MAX_WORKER_THREADS is set, only bind to the proc group,
244 // Not the individual HW thread.
245 if (!KNOB_MAX_WORKER_THREADS)
246 {
247 affinity.Mask = KAFFINITY(1) << threadId;
248 }
249 }
250
251 SetThreadGroupAffinity(GetCurrentThread(), &affinity, nullptr);
252 }
253 #else
254 cpu_set_t cpuset;
255 pthread_t thread = pthread_self();
256 CPU_ZERO(&cpuset);
257 CPU_SET(threadId, &cpuset);
258
259 pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
260 #endif
261 }
262
263 INLINE
264 uint64_t GetEnqueuedDraw(SWR_CONTEXT *pContext)
265 {
266 return pContext->dcRing.GetHead();
267 }
268
269 INLINE
270 DRAW_CONTEXT *GetDC(SWR_CONTEXT *pContext, uint64_t drawId)
271 {
272 return &pContext->dcRing[(drawId-1) % KNOB_MAX_DRAWS_IN_FLIGHT];
273 }
274
275 // returns true if dependency not met
276 INLINE
277 bool CheckDependency(SWR_CONTEXT *pContext, DRAW_CONTEXT *pDC, uint64_t lastRetiredDraw)
278 {
279 return (pDC->dependency > lastRetiredDraw);
280 }
281
282 INLINE int64_t CompleteDrawContext(SWR_CONTEXT* pContext, DRAW_CONTEXT* pDC)
283 {
284 int64_t result = InterlockedDecrement64(&pDC->threadsDone);
285 SWR_ASSERT(result >= 0);
286
287 if (result == 0)
288 {
289 // Cleanup memory allocations
290 pDC->pArena->Reset(true);
291 pDC->pTileMgr->initialize();
292 if (pDC->cleanupState)
293 {
294 pDC->pState->pArena->Reset(true);
295 }
296
297 _ReadWriteBarrier();
298
299 pContext->dcRing.Dequeue(); // Remove from tail
300 }
301
302 return result;
303 }
304
305 INLINE bool FindFirstIncompleteDraw(SWR_CONTEXT* pContext, uint64_t& curDrawBE)
306 {
307 // increment our current draw id to the first incomplete draw
308 uint64_t drawEnqueued = GetEnqueuedDraw(pContext);
309 while (curDrawBE < drawEnqueued)
310 {
311 DRAW_CONTEXT *pDC = &pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT];
312
313 // If its not compute and FE is not done then break out of loop.
314 if (!pDC->doneFE && !pDC->isCompute) break;
315
316 bool isWorkComplete = (pDC->isCompute) ?
317 pDC->pDispatch->isWorkComplete() : pDC->pTileMgr->isWorkComplete();
318
319 if (isWorkComplete)
320 {
321 curDrawBE++;
322 CompleteDrawContext(pContext, pDC);
323 }
324 else
325 {
326 break;
327 }
328 }
329
330 // If there are no more incomplete draws then return false.
331 return (curDrawBE >= drawEnqueued) ? false : true;
332 }
333
334 //////////////////////////////////////////////////////////////////////////
335 /// @brief If there is any BE work then go work on it.
336 /// @param pContext - pointer to SWR context.
337 /// @param workerId - The unique worker ID that is assigned to this thread.
338 /// @param curDrawBE - This tracks the draw contexts that this thread has processed. Each worker thread
339 /// has its own curDrawBE counter and this ensures that each worker processes all the
340 /// draws in order.
341 /// @param lockedTiles - This is the set of tiles locked by other threads. Each thread maintains its
342 /// own set and each time it fails to lock a macrotile, because its already locked,
343 /// then it will add that tile to the lockedTiles set. As a worker begins to work
344 /// on future draws the lockedTiles ensure that it doesn't work on tiles that may
345 /// still have work pending in a previous draw. Additionally, the lockedTiles is
346 /// hueristic that can steer a worker back to the same macrotile that it had been
347 /// working on in a previous draw.
348 void WorkOnFifoBE(
349 SWR_CONTEXT *pContext,
350 uint32_t workerId,
351 uint64_t &curDrawBE,
352 TileSet& lockedTiles,
353 uint32_t numaNode,
354 uint32_t numaMask)
355 {
356 // Find the first incomplete draw that has pending work. If no such draw is found then
357 // return. FindFirstIncompleteDraw is responsible for incrementing the curDrawBE.
358 if (FindFirstIncompleteDraw(pContext, curDrawBE) == false)
359 {
360 return;
361 }
362
363 uint64_t lastRetiredDraw = pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT].drawId - 1;
364
365 // Reset our history for locked tiles. We'll have to re-learn which tiles are locked.
366 lockedTiles.clear();
367
368 // Try to work on each draw in order of the available draws in flight.
369 // 1. If we're on curDrawBE, we can work on any macrotile that is available.
370 // 2. If we're trying to work on draws after curDrawBE, we are restricted to
371 // working on those macrotiles that are known to be complete in the prior draw to
372 // maintain order. The locked tiles provides the history to ensures this.
373 for (uint64_t i = curDrawBE; i < GetEnqueuedDraw(pContext); ++i)
374 {
375 DRAW_CONTEXT *pDC = &pContext->dcRing[i % KNOB_MAX_DRAWS_IN_FLIGHT];
376
377 if (pDC->isCompute) return; // We don't look at compute work.
378
379 // First wait for FE to be finished with this draw. This keeps threading model simple
380 // but if there are lots of bubbles between draws then serializing FE and BE may
381 // need to be revisited.
382 if (!pDC->doneFE) return;
383
384 // If this draw is dependent on a previous draw then we need to bail.
385 if (CheckDependency(pContext, pDC, lastRetiredDraw))
386 {
387 return;
388 }
389
390 // Grab the list of all dirty macrotiles. A tile is dirty if it has work queued to it.
391 std::vector<uint32_t> &macroTiles = pDC->pTileMgr->getDirtyTiles();
392
393 for (uint32_t tileID : macroTiles)
394 {
395 // Only work on tiles for for this numa node
396 uint32_t x, y;
397 pDC->pTileMgr->getTileIndices(tileID, x, y);
398 if (((x ^ y) & numaMask) != numaNode)
399 {
400 continue;
401 }
402
403 MacroTileQueue &tile = pDC->pTileMgr->getMacroTileQueue(tileID);
404
405 if (!tile.getNumQueued())
406 {
407 continue;
408 }
409
410 // can only work on this draw if it's not in use by other threads
411 if (lockedTiles.find(tileID) != lockedTiles.end())
412 {
413 continue;
414 }
415
416 if (tile.tryLock())
417 {
418 BE_WORK *pWork;
419
420 RDTSC_START(WorkerFoundWork);
421
422 uint32_t numWorkItems = tile.getNumQueued();
423 SWR_ASSERT(numWorkItems);
424
425 pWork = tile.peek();
426 SWR_ASSERT(pWork);
427 if (pWork->type == DRAW)
428 {
429 pContext->pHotTileMgr->InitializeHotTiles(pContext, pDC, tileID);
430 }
431
432 while ((pWork = tile.peek()) != nullptr)
433 {
434 pWork->pfnWork(pDC, workerId, tileID, &pWork->desc);
435 tile.dequeue();
436 }
437 RDTSC_STOP(WorkerFoundWork, numWorkItems, pDC->drawId);
438
439 _ReadWriteBarrier();
440
441 pDC->pTileMgr->markTileComplete(tileID);
442
443 // Optimization: If the draw is complete and we're the last one to have worked on it then
444 // we can reset the locked list as we know that all previous draws before the next are guaranteed to be complete.
445 if ((curDrawBE == i) && pDC->pTileMgr->isWorkComplete())
446 {
447 // We can increment the current BE and safely move to next draw since we know this draw is complete.
448 curDrawBE++;
449 CompleteDrawContext(pContext, pDC);
450
451 lastRetiredDraw++;
452
453 lockedTiles.clear();
454 break;
455 }
456 }
457 else
458 {
459 // This tile is already locked. So let's add it to our locked tiles set. This way we don't try locking this one again.
460 lockedTiles.insert(tileID);
461 }
462 }
463 }
464 }
465
466 void WorkOnFifoFE(SWR_CONTEXT *pContext, uint32_t workerId, uint64_t &curDrawFE, uint32_t numaNode)
467 {
468 // Try to grab the next DC from the ring
469 uint64_t drawEnqueued = GetEnqueuedDraw(pContext);
470 while (curDrawFE < drawEnqueued)
471 {
472 uint32_t dcSlot = curDrawFE % KNOB_MAX_DRAWS_IN_FLIGHT;
473 DRAW_CONTEXT *pDC = &pContext->dcRing[dcSlot];
474 if (pDC->isCompute || pDC->doneFE || pDC->FeLock)
475 {
476 CompleteDrawContext(pContext, pDC);
477 curDrawFE++;
478 }
479 else
480 {
481 break;
482 }
483 }
484
485 uint64_t curDraw = curDrawFE;
486 while (curDraw < drawEnqueued)
487 {
488 uint32_t dcSlot = curDraw % KNOB_MAX_DRAWS_IN_FLIGHT;
489 DRAW_CONTEXT *pDC = &pContext->dcRing[dcSlot];
490
491 if (!pDC->isCompute && !pDC->FeLock)
492 {
493 uint32_t initial = InterlockedCompareExchange((volatile uint32_t*)&pDC->FeLock, 1, 0);
494 if (initial == 0)
495 {
496 // successfully grabbed the DC, now run the FE
497 pDC->FeWork.pfnWork(pContext, pDC, workerId, &pDC->FeWork.desc);
498
499 _ReadWriteBarrier();
500 pDC->doneFE = true;
501 }
502 }
503 curDraw++;
504 }
505 }
506
507 //////////////////////////////////////////////////////////////////////////
508 /// @brief If there is any compute work then go work on it.
509 /// @param pContext - pointer to SWR context.
510 /// @param workerId - The unique worker ID that is assigned to this thread.
511 /// @param curDrawBE - This tracks the draw contexts that this thread has processed. Each worker thread
512 /// has its own curDrawBE counter and this ensures that each worker processes all the
513 /// draws in order.
514 void WorkOnCompute(
515 SWR_CONTEXT *pContext,
516 uint32_t workerId,
517 uint64_t& curDrawBE)
518 {
519 if (FindFirstIncompleteDraw(pContext, curDrawBE) == false)
520 {
521 return;
522 }
523
524 uint64_t lastRetiredDraw = pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT].drawId - 1;
525
526 DRAW_CONTEXT *pDC = &pContext->dcRing[curDrawBE % KNOB_MAX_DRAWS_IN_FLIGHT];
527 if (pDC->isCompute == false) return;
528
529 // check dependencies
530 if (CheckDependency(pContext, pDC, lastRetiredDraw))
531 {
532 return;
533 }
534
535 SWR_ASSERT(pDC->pDispatch != nullptr);
536 DispatchQueue& queue = *pDC->pDispatch;
537
538 // Is there any work remaining?
539 if (queue.getNumQueued() > 0)
540 {
541 uint32_t threadGroupId = 0;
542 while (queue.getWork(threadGroupId))
543 {
544 ProcessComputeBE(pDC, workerId, threadGroupId);
545
546 queue.finishedWork();
547 }
548 }
549 }
550
551 DWORD workerThreadMain(LPVOID pData)
552 {
553 THREAD_DATA *pThreadData = (THREAD_DATA*)pData;
554 SWR_CONTEXT *pContext = pThreadData->pContext;
555 uint32_t threadId = pThreadData->threadId;
556 uint32_t workerId = pThreadData->workerId;
557
558 bindThread(threadId, pThreadData->procGroupId, pThreadData->forceBindProcGroup);
559
560 RDTSC_INIT(threadId);
561
562 uint32_t numaNode = pThreadData->numaId;
563 uint32_t numaMask = pContext->threadPool.numaMask;
564
565 // flush denormals to 0
566 _mm_setcsr(_mm_getcsr() | _MM_FLUSH_ZERO_ON | _MM_DENORMALS_ZERO_ON);
567
568 // Track tiles locked by other threads. If we try to lock a macrotile and find its already
569 // locked then we'll add it to this list so that we don't try and lock it again.
570 TileSet lockedTiles;
571
572 // each worker has the ability to work on any of the queued draws as long as certain
573 // conditions are met. the data associated
574 // with a draw is guaranteed to be active as long as a worker hasn't signaled that he
575 // has moved on to the next draw when he determines there is no more work to do. The api
576 // thread will not increment the head of the dc ring until all workers have moved past the
577 // current head.
578 // the logic to determine what to work on is:
579 // 1- try to work on the FE any draw that is queued. For now there are no dependencies
580 // on the FE work, so any worker can grab any FE and process in parallel. Eventually
581 // we'll need dependency tracking to force serialization on FEs. The worker will try
582 // to pick an FE by atomically incrementing a counter in the swr context. he'll keep
583 // trying until he reaches the tail.
584 // 2- BE work must be done in strict order. we accomplish this today by pulling work off
585 // the oldest draw (ie the head) of the dcRing. the worker can determine if there is
586 // any work left by comparing the total # of binned work items and the total # of completed
587 // work items. If they are equal, then there is no more work to do for this draw, and
588 // the worker can safely increment its oldestDraw counter and move on to the next draw.
589 std::unique_lock<std::mutex> lock(pContext->WaitLock, std::defer_lock);
590
591 auto threadHasWork = [&](uint64_t curDraw) { return curDraw != pContext->dcRing.GetHead(); };
592
593 uint64_t curDrawBE = 0;
594 uint64_t curDrawFE = 0;
595
596 while (pContext->threadPool.inThreadShutdown == false)
597 {
598 uint32_t loop = 0;
599 while (loop++ < KNOB_WORKER_SPIN_LOOP_COUNT && !threadHasWork(curDrawBE))
600 {
601 _mm_pause();
602 }
603
604 if (!threadHasWork(curDrawBE))
605 {
606 lock.lock();
607
608 // check for thread idle condition again under lock
609 if (threadHasWork(curDrawBE))
610 {
611 lock.unlock();
612 continue;
613 }
614
615 if (pContext->threadPool.inThreadShutdown)
616 {
617 lock.unlock();
618 break;
619 }
620
621 RDTSC_START(WorkerWaitForThreadEvent);
622
623 pContext->FifosNotEmpty.wait(lock);
624 lock.unlock();
625
626 RDTSC_STOP(WorkerWaitForThreadEvent, 0, 0);
627
628 if (pContext->threadPool.inThreadShutdown)
629 {
630 break;
631 }
632 }
633
634 RDTSC_START(WorkerWorkOnFifoBE);
635 WorkOnFifoBE(pContext, workerId, curDrawBE, lockedTiles, numaNode, numaMask);
636 RDTSC_STOP(WorkerWorkOnFifoBE, 0, 0);
637
638 WorkOnCompute(pContext, workerId, curDrawBE);
639
640 WorkOnFifoFE(pContext, workerId, curDrawFE, numaNode);
641 }
642
643 return 0;
644 }
645
646 DWORD workerThreadInit(LPVOID pData)
647 {
648 #if defined(_WIN32)
649 __try
650 #endif // _WIN32
651 {
652 return workerThreadMain(pData);
653 }
654
655 #if defined(_WIN32)
656 __except(EXCEPTION_CONTINUE_SEARCH)
657 {
658 }
659
660 #endif // _WIN32
661
662 return 1;
663 }
664
665 void CreateThreadPool(SWR_CONTEXT *pContext, THREAD_POOL *pPool)
666 {
667 bindThread(0);
668
669 CPUNumaNodes nodes;
670 uint32_t numThreadsPerProcGroup = 0;
671 CalculateProcessorTopology(nodes, numThreadsPerProcGroup);
672
673 uint32_t numHWNodes = (uint32_t)nodes.size();
674 uint32_t numHWCoresPerNode = (uint32_t)nodes[0].cores.size();
675 uint32_t numHWHyperThreads = (uint32_t)nodes[0].cores[0].threadIds.size();
676
677 uint32_t numNodes = numHWNodes;
678 uint32_t numCoresPerNode = numHWCoresPerNode;
679 uint32_t numHyperThreads = numHWHyperThreads;
680
681 if (KNOB_MAX_NUMA_NODES)
682 {
683 numNodes = std::min(numNodes, KNOB_MAX_NUMA_NODES);
684 }
685
686 if (KNOB_MAX_CORES_PER_NUMA_NODE)
687 {
688 numCoresPerNode = std::min(numCoresPerNode, KNOB_MAX_CORES_PER_NUMA_NODE);
689 }
690
691 if (KNOB_MAX_THREADS_PER_CORE)
692 {
693 numHyperThreads = std::min(numHyperThreads, KNOB_MAX_THREADS_PER_CORE);
694 }
695
696 // Calculate numThreads
697 uint32_t numThreads = numNodes * numCoresPerNode * numHyperThreads;
698
699 if (KNOB_MAX_WORKER_THREADS)
700 {
701 uint32_t maxHWThreads = numHWNodes * numHWCoresPerNode * numHWHyperThreads;
702 numThreads = std::min(KNOB_MAX_WORKER_THREADS, maxHWThreads);
703 }
704
705 if (numThreads > KNOB_MAX_NUM_THREADS)
706 {
707 printf("WARNING: system thread count %u exceeds max %u, "
708 "performance will be degraded\n",
709 numThreads, KNOB_MAX_NUM_THREADS);
710 }
711
712 uint32_t numAPIReservedThreads = 1;
713
714
715 if (numThreads == 1)
716 {
717 // If only 1 worker threads, try to move it to an available
718 // HW thread. If that fails, use the API thread.
719 if (numCoresPerNode < numHWCoresPerNode)
720 {
721 numCoresPerNode++;
722 }
723 else if (numHyperThreads < numHWHyperThreads)
724 {
725 numHyperThreads++;
726 }
727 else if (numNodes < numHWNodes)
728 {
729 numNodes++;
730 }
731 else
732 {
733 pPool->numThreads = 0;
734 SET_KNOB(SINGLE_THREADED, true);
735 return;
736 }
737 }
738 else
739 {
740 // Save HW threads for the API if we can
741 if (numThreads > numAPIReservedThreads)
742 {
743 numThreads -= numAPIReservedThreads;
744 }
745 else
746 {
747 numAPIReservedThreads = 0;
748 }
749 }
750
751 pPool->numThreads = numThreads;
752 pContext->NumWorkerThreads = pPool->numThreads;
753
754 pPool->inThreadShutdown = false;
755 pPool->pThreadData = (THREAD_DATA *)malloc(pPool->numThreads * sizeof(THREAD_DATA));
756 pPool->numaMask = 0;
757
758 if (KNOB_MAX_WORKER_THREADS)
759 {
760 bool bForceBindProcGroup = (numThreads > numThreadsPerProcGroup);
761 uint32_t numProcGroups = (numThreads + numThreadsPerProcGroup - 1) / numThreadsPerProcGroup;
762 // When MAX_WORKER_THREADS is set we don't bother to bind to specific HW threads
763 // But Windows will still require binding to specific process groups
764 for (uint32_t workerId = 0; workerId < numThreads; ++workerId)
765 {
766 pPool->pThreadData[workerId].workerId = workerId;
767 pPool->pThreadData[workerId].procGroupId = workerId % numProcGroups;
768 pPool->pThreadData[workerId].threadId = 0;
769 pPool->pThreadData[workerId].numaId = 0;
770 pPool->pThreadData[workerId].pContext = pContext;
771 pPool->pThreadData[workerId].forceBindProcGroup = bForceBindProcGroup;
772 pPool->threads[workerId] = new std::thread(workerThreadInit, &pPool->pThreadData[workerId]);
773 }
774 }
775 else
776 {
777 pPool->numaMask = numNodes - 1; // Only works for 2**n numa nodes (1, 2, 4, etc.)
778
779 uint32_t workerId = 0;
780 for (uint32_t n = 0; n < numNodes; ++n)
781 {
782 auto& node = nodes[n];
783
784 uint32_t numCores = numCoresPerNode;
785 for (uint32_t c = 0; c < numCores; ++c)
786 {
787 auto& core = node.cores[c];
788 for (uint32_t t = 0; t < numHyperThreads; ++t)
789 {
790 if (numAPIReservedThreads)
791 {
792 --numAPIReservedThreads;
793 continue;
794 }
795
796 pPool->pThreadData[workerId].workerId = workerId;
797 pPool->pThreadData[workerId].procGroupId = core.procGroup;
798 pPool->pThreadData[workerId].threadId = core.threadIds[t];
799 pPool->pThreadData[workerId].numaId = n;
800 pPool->pThreadData[workerId].pContext = pContext;
801 pPool->threads[workerId] = new std::thread(workerThreadInit, &pPool->pThreadData[workerId]);
802
803 ++workerId;
804 }
805 }
806 }
807 }
808 }
809
810 void DestroyThreadPool(SWR_CONTEXT *pContext, THREAD_POOL *pPool)
811 {
812 if (!KNOB_SINGLE_THREADED)
813 {
814 // Inform threads to finish up
815 std::unique_lock<std::mutex> lock(pContext->WaitLock);
816 pPool->inThreadShutdown = true;
817 _mm_mfence();
818 pContext->FifosNotEmpty.notify_all();
819 lock.unlock();
820
821 // Wait for threads to finish and destroy them
822 for (uint32_t t = 0; t < pPool->numThreads; ++t)
823 {
824 pPool->threads[t]->join();
825 delete(pPool->threads[t]);
826 }
827
828 // Clean up data used by threads
829 free(pPool->pThreadData);
830 }
831 }