#include #include ///////////////// /* ZTaskThread */ ///////////////// ZThreadReturn ZTaskThread::run( uint64_t _dt ) { URFP(_dt); ZPtr task; //Check if a task is available. If it isn't set our status as waiting, then do an full wait if(!TaskSema.Wait(0)) { this->ThrStatus = ZTTS_WAITING; TaskSema.Wait(); } //At this point, we've decremented the semaphore. There should be a task waiting TaskLock.Acquire(); //No tasks -> a) Logic error b) time to exit. Gracefully handle this if(Tasks.Size() == 0) { TaskLock.Release(); return ZTR_TERMINATE; } //Ok, we're executing tasks ThrStatus = ZTTS_EXECUTING; task = Tasks.PopBack(); //Save the priority of the task that is executing CurrentTaskPriority = task->Priority; TaskLock.Release(); //If the task has not been canceled if (!task->bCancelTask) { //Execute it and check task return status if (task->Execute(task->TaskArgument) == ZTRS_FAILURE) task->OnTaskFailed(task->TaskArgument); } else { //Reset the cancel flag task->bCancelTask = false; } return ZTR_LOOP; } /******************************************************************************/ ZTaskThread::ZTaskThread( ZTaskStream *_stream, ZTaskThreadId _id ) : TaskLock(), TaskSema(0), Stream(_stream), Id(_id), ThrStatus(ZTTS_UNINITIALIZED) { } /******************************************************************************/ ZTaskThread::~ZTaskThread() { } /******************************************************************************/ void ZTaskThread::PushTask( ZPtr _task ) { //Add a task while the mutex is locked TaskLock.Acquire(); Tasks.PushBack(_task); TaskLock.Release(); //1 new task available. TaskSema.Post(); } /******************************************************************************/ ///////////////// /* ZTaskStream */ ///////////////// ZTaskStream::ZTaskStream() : TaskStreamLock(), TaskStreamSema(0), FrameCompletedEvent(), PausedEvent(), Tasks(ZTS_DEFAULT_BUFFER_SIZE), bPaused(false), bShouldHalt(false) { //Initially unpaused PausedEvent.Notify(); SetTaskThreadCount(1); } /******************************************************************************/ ZTaskStream::~ZTaskStream() { SetTaskThreadCount(0); } /******************************************************************************/ void ZTaskStream::PushTask( ZPtr _task ) { TaskStreamLock.Acquire(); Tasks.PushBack(_task); TaskStreamLock.Release(); TaskStreamSema.Post(); // 1 new task to schedule } /******************************************************************************/ void ZTaskStream::PushTask( ZTask* _task ) { } /******************************************************************************/ void ZTaskStream::PushTasks( ZArray< ZPtr >& _tasks ) { size_t count = _tasks.Size(); TaskStreamLock.Acquire(); ZArrayAlgo::Append(Tasks, _tasks); for (size_t i = 0; i < count; i++) { Tasks.PushBack( _tasks[i]); TaskStreamSema.Post(); // 1 new task to schedule } TaskStreamLock.Release(); } void ZTaskStream::PushTasks( ZTask* _tasks, size_t _count ) { // TODO } /******************************************************************************/ void ZTaskStream::ExecuteTasks(int _frameTime) { ZPtr curTask; ZArray< ZPtr > tmp; ZArray< ZPtr > delayed; //Wait for the paused event to be signaled. //When it isn't, this code blocks. PausedEvent.Wait(); uint64_t timeStart = ZConcurrency::GetTicks(); uint64_t timeEnd = timeStart + _frameTime; uint64_t timeNow = timeStart; //Schedule tasks until timer elapsed for (;;) { //If we need to halt, do so if (bShouldHalt) { bShouldHalt = false; return; } //Check if we've run out of time timeNow = ZConcurrency::GetTicks(); if (timeEnd < timeNow) { break; } //Empty task queue into a temporary array so we minimize time held with lock TaskStreamLock.Acquire(); while(TaskStreamSema.Wait(0)) { tmp.PushBack(Tasks.PopBack()); } TaskStreamLock.Release(); //Check each task while (tmp.Size() > 0) { //Start assigning tasks to threads curTask = tmp.PopBack(); //If our task can be delayed if (curTask->Priority > 0) { size_t newPriority; uint64_t tickDelta = ZConcurrency::GetTicks() - timeStart; if (tickDelta > curTask->Priority) { newPriority = 0; } else { newPriority = curTask->Priority - (size_t)tickDelta; } curTask->Priority = newPriority; delayed.PushBack(curTask); //delay the current task until after priority zero tasks are executed } else { //Assign to be executed immediately TaskThreads[AssignTask(curTask.Pointer())]->PushTask(curTask); } } //Check what priority tasks are executing bool execP0 = false; //Priority 0 tasks are executing bool execPN = false; //Priority N > 0 tasks are executing for (ZArray::Iterator itr = TaskThreads.Begin(); itr != TaskThreads.End(); itr++) { ZTaskThread* thr = *itr; if(thr->ThrStatus != ZTTS_WAITING) { if(thr->CurrentTaskPriority == 0) { execP0 = true; } else { execPN = true; } } } //No priority 0 tasks executing -> schedule lower priority stuff now if (!execP0) { //At least one delayed task to assign? if(delayed.Size() != 0) { //Schedule a delayed task now ZPtr task = delayed.PopBack(); TaskThreads[AssignTask(task.Pointer())]->PushTask(task); execPN = true; } } //No priority N tasks executing -> wait for something to do if(!execPN) { //Check if we've run out of time timeNow = ZConcurrency::GetTicks(); if (timeEnd < timeNow) { break; } //Compute remaining time, and wait for up to that amount or //the scheduler wait period, whichever is less. uint64_t timeLeft = timeEnd - timeNow; bool newTasks = TaskStreamSema.Wait((uint32_t)timeLeft); if (newTasks) { //Calling WaitSemaphore() actually decreased the count but since we won't //duplicate code here, we'll just re-increment it because next loop //iteration will do another WaitSemaphore() that won't block. TaskStreamSema.Post(); } else { //Timeout -> Time to exit break; } } } //main scheduling loop // return delayed tasks to the task stream PushTasks(delayed); } /******************************************************************************/ size_t ZTaskStream::GetTaskThreadCount() { return TaskThreads.Size(); } /******************************************************************************/ void ZTaskStream::HaltFrame() { bShouldHalt = true; } /******************************************************************************/ void ZTaskStream::PauseFrame( bool _pause ) { if (_pause) { this->bPaused = true; PausedEvent.Reset(); } else { this->bPaused = false; PausedEvent.Notify(); } } /******************************************************************************/ void ZTaskStream::SetTaskThreadCount( size_t _count ) { size_t i, j; TaskStreamLock.Acquire(); //If there are n task threads and the user wants n + k threads, there is no point in killing all n //threads just to create n + k more all over again. Similarly, if the user has n threads but wants //n - k (> 0), then only k threads need to be shut down. if (_count > TaskThreads.Size()) { //In with the new for (i = TaskThreads.Size(); i < _count; ++i) { ZTaskThread* thr = znew ZTaskThread(this, i); TaskThreads.PushBack(thr); thr->StartThread(); while(!thr->ThreadInitialized()) thr->WaitInitialized(); } } else if (_count < TaskThreads.Size()) { //KILLAMANJARO! size_t killCount = TaskThreads.Size() - _count; //Out with the old for (i = TaskThreads.Size() - 1, j = 0; j < killCount; --i, ++j) KillTaskThread(i); } TaskStreamLock.Release(); } /******************************************************************************/ ZTaskThreadId ZTaskStream::AssignTask( ZTask *_task ) { URFP(_task); static int next = 0; int taskThread = next++ % TaskThreads.Size(); return TaskThreads[taskThread]->Id; } /******************************************************************************/ void ZTaskStream::KillTaskThread( size_t _idx ) { //Post a task that does not exist, task thread will shut down TaskThreads[_idx]->TaskSema.Post(); TaskThreads[_idx]->ShutdownThread(); zdelete TaskThreads[_idx]; }