361 lines
8.2 KiB
C++
361 lines
8.2 KiB
C++
#include <ZUtil/ZTaskStream.hpp>
|
|
|
|
#include <ZUtil/ZAlloc.hpp>
|
|
|
|
/////////////////
|
|
/* ZTaskThread */
|
|
/////////////////
|
|
|
|
ZThreadReturn ZTaskThread::run( uint64_t _dt )
|
|
{
|
|
URFP(_dt);
|
|
|
|
ZPtr<ZTask> task;
|
|
|
|
//Check if a task is available. If it isn't set our status as waiting, then do an full wait
|
|
if(!TaskSema.Wait(0)) {
|
|
this->ThrStatus = ZTTS_WAITING;
|
|
TaskSema.Wait();
|
|
}
|
|
|
|
//At this point, we've decremented the semaphore. There should be a task waiting
|
|
TaskLock.Acquire();
|
|
|
|
//No tasks -> a) Logic error b) time to exit. Gracefully handle this
|
|
if(Tasks.Size() == 0) {
|
|
TaskLock.Release();
|
|
return ZTR_TERMINATE;
|
|
}
|
|
|
|
//Ok, we're executing tasks
|
|
ThrStatus = ZTTS_EXECUTING;
|
|
|
|
task = Tasks.PopBack();
|
|
|
|
//Save the priority of the task that is executing
|
|
CurrentTaskPriority = task->Priority;
|
|
|
|
TaskLock.Release();
|
|
|
|
//If the task has not been canceled
|
|
if (!task->bCancelTask) {
|
|
//Execute it and check task return status
|
|
if (task->Execute(task->TaskArgument) == ZTRS_FAILURE)
|
|
task->OnTaskFailed(task->TaskArgument);
|
|
} else {
|
|
//Reset the cancel flag
|
|
task->bCancelTask = false;
|
|
}
|
|
|
|
return ZTR_LOOP;
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
ZTaskThread::ZTaskThread( ZTaskStream *_stream, ZTaskThreadId _id )
|
|
: TaskLock(), TaskSema(0), Stream(_stream), Id(_id), ThrStatus(ZTTS_UNINITIALIZED)
|
|
{
|
|
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
ZTaskThread::~ZTaskThread()
|
|
{
|
|
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskThread::PushTask( ZPtr<ZTask> _task )
|
|
{
|
|
//Add a task while the mutex is locked
|
|
TaskLock.Acquire();
|
|
|
|
Tasks.PushBack(_task);
|
|
|
|
TaskLock.Release();
|
|
|
|
//1 new task available.
|
|
TaskSema.Post();
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
/////////////////
|
|
/* ZTaskStream */
|
|
/////////////////
|
|
|
|
ZTaskStream::ZTaskStream()
|
|
: TaskStreamLock(), TaskStreamSema(0),
|
|
FrameCompletedEvent(), PausedEvent(),
|
|
Tasks(ZTS_DEFAULT_BUFFER_SIZE), bPaused(false), bShouldHalt(false)
|
|
{
|
|
//Initially unpaused
|
|
PausedEvent.Notify();
|
|
|
|
SetTaskThreadCount(1);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
ZTaskStream::~ZTaskStream()
|
|
{
|
|
SetTaskThreadCount(0);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::PushTask( ZPtr<ZTask> _task )
|
|
{
|
|
TaskStreamLock.Acquire();
|
|
|
|
Tasks.PushBack(_task);
|
|
|
|
TaskStreamLock.Release();
|
|
|
|
TaskStreamSema.Post(); // 1 new task to schedule
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::PushTask( ZTask* _task )
|
|
{
|
|
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::PushTasks( ZArray< ZPtr<ZTask> >& _tasks )
|
|
{
|
|
size_t count = _tasks.Size();
|
|
|
|
TaskStreamLock.Acquire();
|
|
|
|
ZArrayAlgo::Append(Tasks, _tasks);
|
|
|
|
for (size_t i = 0; i < count; i++) {
|
|
Tasks.PushBack( _tasks[i]);
|
|
TaskStreamSema.Post(); // 1 new task to schedule
|
|
}
|
|
|
|
TaskStreamLock.Release();
|
|
}
|
|
|
|
void ZTaskStream::PushTasks( ZTask* _tasks, size_t _count )
|
|
{
|
|
// TODO
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::ExecuteTasks(int _frameTime)
|
|
{
|
|
ZPtr<ZTask> curTask;
|
|
ZArray< ZPtr<ZTask> > tmp;
|
|
ZArray< ZPtr<ZTask> > delayed;
|
|
|
|
//Wait for the paused event to be signaled.
|
|
//When it isn't, this code blocks.
|
|
PausedEvent.Wait();
|
|
|
|
uint64_t timeStart = ZConcurrency::GetTicks();
|
|
uint64_t timeEnd = timeStart + _frameTime;
|
|
uint64_t timeNow = timeStart;
|
|
|
|
//Schedule tasks until timer elapsed
|
|
for (;;) {
|
|
//If we need to halt, do so
|
|
if (bShouldHalt) {
|
|
bShouldHalt = false;
|
|
return;
|
|
}
|
|
|
|
//Check if we've run out of time
|
|
timeNow = ZConcurrency::GetTicks();
|
|
|
|
if (timeEnd < timeNow) {
|
|
break;
|
|
}
|
|
|
|
//Empty task queue into a temporary array so we minimize time held with lock
|
|
TaskStreamLock.Acquire();
|
|
|
|
while(TaskStreamSema.Wait(0)) {
|
|
tmp.PushBack(Tasks.PopBack());
|
|
}
|
|
|
|
TaskStreamLock.Release();
|
|
|
|
//Check each task
|
|
while (tmp.Size() > 0) {
|
|
//Start assigning tasks to threads
|
|
curTask = tmp.PopBack();
|
|
|
|
//If our task can be delayed
|
|
if (curTask->Priority > 0) {
|
|
size_t newPriority;
|
|
uint64_t tickDelta = ZConcurrency::GetTicks() - timeStart;
|
|
if (tickDelta > curTask->Priority) {
|
|
newPriority = 0;
|
|
} else {
|
|
newPriority = curTask->Priority - (size_t)tickDelta;
|
|
}
|
|
|
|
curTask->Priority = newPriority;
|
|
|
|
delayed.PushBack(curTask); //delay the current task until after priority zero tasks are executed
|
|
} else {
|
|
//Assign to be executed immediately
|
|
TaskThreads[AssignTask(curTask.Pointer())]->PushTask(curTask);
|
|
}
|
|
}
|
|
|
|
//Check what priority tasks are executing
|
|
bool execP0 = false; //Priority 0 tasks are executing
|
|
bool execPN = false; //Priority N > 0 tasks are executing
|
|
|
|
for (ZArray<ZTaskThread*>::Iterator itr = TaskThreads.Begin(); itr != TaskThreads.End(); itr++) {
|
|
ZTaskThread* thr = *itr;
|
|
|
|
if(thr->ThrStatus != ZTTS_WAITING) {
|
|
if(thr->CurrentTaskPriority == 0) {
|
|
execP0 = true;
|
|
} else {
|
|
execPN = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
//No priority 0 tasks executing -> schedule lower priority stuff now
|
|
if (!execP0) {
|
|
//At least one delayed task to assign?
|
|
if(delayed.Size() != 0) {
|
|
//Schedule a delayed task now
|
|
ZPtr<ZTask> task = delayed.PopBack();
|
|
TaskThreads[AssignTask(task.Pointer())]->PushTask(task);
|
|
execPN = true;
|
|
}
|
|
}
|
|
|
|
//No priority N tasks executing -> wait for something to do
|
|
if(!execPN) {
|
|
//Check if we've run out of time
|
|
timeNow = ZConcurrency::GetTicks();
|
|
|
|
if (timeEnd < timeNow) {
|
|
break;
|
|
}
|
|
|
|
//Compute remaining time, and wait for up to that amount or
|
|
//the scheduler wait period, whichever is less.
|
|
uint64_t timeLeft = timeEnd - timeNow;
|
|
|
|
bool newTasks = TaskStreamSema.Wait((uint32_t)timeLeft);
|
|
|
|
if (newTasks) {
|
|
//Calling WaitSemaphore() actually decreased the count but since we won't
|
|
//duplicate code here, we'll just re-increment it because next loop
|
|
//iteration will do another WaitSemaphore() that won't block.
|
|
TaskStreamSema.Post();
|
|
} else {
|
|
//Timeout -> Time to exit
|
|
break;
|
|
}
|
|
}
|
|
} //main scheduling loop
|
|
|
|
// return delayed tasks to the task stream
|
|
PushTasks(delayed);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
size_t ZTaskStream::GetTaskThreadCount()
|
|
{
|
|
return TaskThreads.Size();
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::HaltFrame()
|
|
{
|
|
bShouldHalt = true;
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::PauseFrame( bool _pause )
|
|
{
|
|
if (_pause)
|
|
{
|
|
this->bPaused = true;
|
|
PausedEvent.Reset();
|
|
}
|
|
else
|
|
{
|
|
this->bPaused = false;
|
|
PausedEvent.Notify();
|
|
}
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::SetTaskThreadCount( size_t _count )
|
|
{
|
|
size_t i, j;
|
|
|
|
TaskStreamLock.Acquire();
|
|
|
|
//If there are n task threads and the user wants n + k threads, there is no point in killing all n
|
|
//threads just to create n + k more all over again. Similarly, if the user has n threads but wants
|
|
//n - k (> 0), then only k threads need to be shut down.
|
|
if (_count > TaskThreads.Size())
|
|
{
|
|
//In with the new
|
|
for (i = TaskThreads.Size(); i < _count; ++i)
|
|
{
|
|
ZTaskThread* thr = znew ZTaskThread(this, i);
|
|
TaskThreads.PushBack(thr);
|
|
thr->StartThread();
|
|
while(!thr->ThreadInitialized())
|
|
thr->WaitInitialized();
|
|
}
|
|
|
|
}
|
|
else if (_count < TaskThreads.Size())
|
|
{
|
|
//KILLAMANJARO!
|
|
size_t killCount = TaskThreads.Size() - _count;
|
|
|
|
//Out with the old
|
|
for (i = TaskThreads.Size() - 1, j = 0; j < killCount; --i, ++j)
|
|
KillTaskThread(i);
|
|
}
|
|
|
|
TaskStreamLock.Release();
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
ZTaskThreadId ZTaskStream::AssignTask( ZTask *_task )
|
|
{
|
|
URFP(_task);
|
|
|
|
static int next = 0;
|
|
|
|
int taskThread = next++ % TaskThreads.Size();
|
|
|
|
return TaskThreads[taskThread]->Id;
|
|
}
|
|
|
|
/******************************************************************************/
|
|
|
|
void ZTaskStream::KillTaskThread( size_t _idx )
|
|
{
|
|
//Post a task that does not exist, task thread will shut down
|
|
TaskThreads[_idx]->TaskSema.Post();
|
|
TaskThreads[_idx]->ShutdownThread();
|
|
zdelete TaskThreads[_idx];
|
|
}
|