Initial commit
This commit is contained in:
360
ZUtil/ZTaskStream.cpp
Normal file
360
ZUtil/ZTaskStream.cpp
Normal file
@@ -0,0 +1,360 @@
|
||||
#include <ZUtil/ZTaskStream.hpp>
|
||||
|
||||
#include <ZUtil/ZAlloc.hpp>
|
||||
|
||||
/////////////////
|
||||
/* ZTaskThread */
|
||||
/////////////////
|
||||
|
||||
ZThreadReturn ZTaskThread::run( uint64_t _dt )
|
||||
{
|
||||
URFP(_dt);
|
||||
|
||||
ZPtr<ZTask> task;
|
||||
|
||||
//Check if a task is available. If it isn't set our status as waiting, then do an full wait
|
||||
if(!TaskSema.Wait(0)) {
|
||||
this->ThrStatus = ZTTS_WAITING;
|
||||
TaskSema.Wait();
|
||||
}
|
||||
|
||||
//At this point, we've decremented the semaphore. There should be a task waiting
|
||||
TaskLock.Acquire();
|
||||
|
||||
//No tasks -> a) Logic error b) time to exit. Gracefully handle this
|
||||
if(Tasks.Size() == 0) {
|
||||
TaskLock.Release();
|
||||
return ZTR_TERMINATE;
|
||||
}
|
||||
|
||||
//Ok, we're executing tasks
|
||||
ThrStatus = ZTTS_EXECUTING;
|
||||
|
||||
task = Tasks.PopBack();
|
||||
|
||||
//Save the priority of the task that is executing
|
||||
CurrentTaskPriority = task->Priority;
|
||||
|
||||
TaskLock.Release();
|
||||
|
||||
//If the task has not been canceled
|
||||
if (!task->bCancelTask) {
|
||||
//Execute it and check task return status
|
||||
if (task->Execute(task->TaskArgument) == ZTRS_FAILURE)
|
||||
task->OnTaskFailed(task->TaskArgument);
|
||||
} else {
|
||||
//Reset the cancel flag
|
||||
task->bCancelTask = false;
|
||||
}
|
||||
|
||||
return ZTR_LOOP;
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
ZTaskThread::ZTaskThread( ZTaskStream *_stream, ZTaskThreadId _id )
|
||||
: TaskLock(), TaskSema(0), Stream(_stream), Id(_id), ThrStatus(ZTTS_UNINITIALIZED)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
ZTaskThread::~ZTaskThread()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskThread::PushTask( ZPtr<ZTask> _task )
|
||||
{
|
||||
//Add a task while the mutex is locked
|
||||
TaskLock.Acquire();
|
||||
|
||||
Tasks.PushBack(_task);
|
||||
|
||||
TaskLock.Release();
|
||||
|
||||
//1 new task available.
|
||||
TaskSema.Post();
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
/////////////////
|
||||
/* ZTaskStream */
|
||||
/////////////////
|
||||
|
||||
ZTaskStream::ZTaskStream()
|
||||
: TaskStreamLock(), TaskStreamSema(0),
|
||||
FrameCompletedEvent(), PausedEvent(),
|
||||
Tasks(ZTS_DEFAULT_BUFFER_SIZE), bPaused(false), bShouldHalt(false)
|
||||
{
|
||||
//Initially unpaused
|
||||
PausedEvent.Notify();
|
||||
|
||||
SetTaskThreadCount(1);
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
ZTaskStream::~ZTaskStream()
|
||||
{
|
||||
SetTaskThreadCount(0);
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::PushTask( ZPtr<ZTask> _task )
|
||||
{
|
||||
TaskStreamLock.Acquire();
|
||||
|
||||
Tasks.PushBack(_task);
|
||||
|
||||
TaskStreamLock.Release();
|
||||
|
||||
TaskStreamSema.Post(); // 1 new task to schedule
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::PushTask( ZTask* _task )
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::PushTasks( ZArray< ZPtr<ZTask> >& _tasks )
|
||||
{
|
||||
size_t count = _tasks.Size();
|
||||
|
||||
TaskStreamLock.Acquire();
|
||||
|
||||
ZArrayAlgo::Append(Tasks, _tasks);
|
||||
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
Tasks.PushBack( _tasks[i]);
|
||||
TaskStreamSema.Post(); // 1 new task to schedule
|
||||
}
|
||||
|
||||
TaskStreamLock.Release();
|
||||
}
|
||||
|
||||
void ZTaskStream::PushTasks( ZTask* _tasks, size_t _count )
|
||||
{
|
||||
// TODO
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::ExecuteTasks(int _frameTime)
|
||||
{
|
||||
ZPtr<ZTask> curTask;
|
||||
ZArray< ZPtr<ZTask> > tmp;
|
||||
ZArray< ZPtr<ZTask> > delayed;
|
||||
|
||||
//Wait for the paused event to be signaled.
|
||||
//When it isn't, this code blocks.
|
||||
PausedEvent.Wait();
|
||||
|
||||
uint64_t timeStart = ZConcurrency::GetTicks();
|
||||
uint64_t timeEnd = timeStart + _frameTime;
|
||||
uint64_t timeNow = timeStart;
|
||||
|
||||
//Schedule tasks until timer elapsed
|
||||
for (;;) {
|
||||
//If we need to halt, do so
|
||||
if (bShouldHalt) {
|
||||
bShouldHalt = false;
|
||||
return;
|
||||
}
|
||||
|
||||
//Check if we've run out of time
|
||||
timeNow = ZConcurrency::GetTicks();
|
||||
|
||||
if (timeEnd < timeNow) {
|
||||
break;
|
||||
}
|
||||
|
||||
//Empty task queue into a temporary array so we minimize time held with lock
|
||||
TaskStreamLock.Acquire();
|
||||
|
||||
while(TaskStreamSema.Wait(0)) {
|
||||
tmp.PushBack(Tasks.PopBack());
|
||||
}
|
||||
|
||||
TaskStreamLock.Release();
|
||||
|
||||
//Check each task
|
||||
while (tmp.Size() > 0) {
|
||||
//Start assigning tasks to threads
|
||||
curTask = tmp.PopBack();
|
||||
|
||||
//If our task can be delayed
|
||||
if (curTask->Priority > 0) {
|
||||
size_t newPriority;
|
||||
uint64_t tickDelta = ZConcurrency::GetTicks() - timeStart;
|
||||
if (tickDelta > curTask->Priority) {
|
||||
newPriority = 0;
|
||||
} else {
|
||||
newPriority = curTask->Priority - (size_t)tickDelta;
|
||||
}
|
||||
|
||||
curTask->Priority = newPriority;
|
||||
|
||||
delayed.PushBack(curTask); //delay the current task until after priority zero tasks are executed
|
||||
} else {
|
||||
//Assign to be executed immediately
|
||||
TaskThreads[AssignTask(curTask.Pointer())]->PushTask(curTask);
|
||||
}
|
||||
}
|
||||
|
||||
//Check what priority tasks are executing
|
||||
bool execP0 = false; //Priority 0 tasks are executing
|
||||
bool execPN = false; //Priority N > 0 tasks are executing
|
||||
|
||||
for (ZArray<ZTaskThread*>::Iterator itr = TaskThreads.Begin(); itr != TaskThreads.End(); itr++) {
|
||||
ZTaskThread* thr = *itr;
|
||||
|
||||
if(thr->ThrStatus != ZTTS_WAITING) {
|
||||
if(thr->CurrentTaskPriority == 0) {
|
||||
execP0 = true;
|
||||
} else {
|
||||
execPN = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//No priority 0 tasks executing -> schedule lower priority stuff now
|
||||
if (!execP0) {
|
||||
//At least one delayed task to assign?
|
||||
if(delayed.Size() != 0) {
|
||||
//Schedule a delayed task now
|
||||
ZPtr<ZTask> task = delayed.PopBack();
|
||||
TaskThreads[AssignTask(task.Pointer())]->PushTask(task);
|
||||
execPN = true;
|
||||
}
|
||||
}
|
||||
|
||||
//No priority N tasks executing -> wait for something to do
|
||||
if(!execPN) {
|
||||
//Check if we've run out of time
|
||||
timeNow = ZConcurrency::GetTicks();
|
||||
|
||||
if (timeEnd < timeNow) {
|
||||
break;
|
||||
}
|
||||
|
||||
//Compute remaining time, and wait for up to that amount or
|
||||
//the scheduler wait period, whichever is less.
|
||||
uint64_t timeLeft = timeEnd - timeNow;
|
||||
|
||||
bool newTasks = TaskStreamSema.Wait((uint32_t)timeLeft);
|
||||
|
||||
if (newTasks) {
|
||||
//Calling WaitSemaphore() actually decreased the count but since we won't
|
||||
//duplicate code here, we'll just re-increment it because next loop
|
||||
//iteration will do another WaitSemaphore() that won't block.
|
||||
TaskStreamSema.Post();
|
||||
} else {
|
||||
//Timeout -> Time to exit
|
||||
break;
|
||||
}
|
||||
}
|
||||
} //main scheduling loop
|
||||
|
||||
// return delayed tasks to the task stream
|
||||
PushTasks(delayed);
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
size_t ZTaskStream::GetTaskThreadCount()
|
||||
{
|
||||
return TaskThreads.Size();
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::HaltFrame()
|
||||
{
|
||||
bShouldHalt = true;
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::PauseFrame( bool _pause )
|
||||
{
|
||||
if (_pause)
|
||||
{
|
||||
this->bPaused = true;
|
||||
PausedEvent.Reset();
|
||||
}
|
||||
else
|
||||
{
|
||||
this->bPaused = false;
|
||||
PausedEvent.Notify();
|
||||
}
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::SetTaskThreadCount( size_t _count )
|
||||
{
|
||||
size_t i, j;
|
||||
|
||||
TaskStreamLock.Acquire();
|
||||
|
||||
//If there are n task threads and the user wants n + k threads, there is no point in killing all n
|
||||
//threads just to create n + k more all over again. Similarly, if the user has n threads but wants
|
||||
//n - k (> 0), then only k threads need to be shut down.
|
||||
if (_count > TaskThreads.Size())
|
||||
{
|
||||
//In with the new
|
||||
for (i = TaskThreads.Size(); i < _count; ++i)
|
||||
{
|
||||
ZTaskThread* thr = znew ZTaskThread(this, i);
|
||||
TaskThreads.PushBack(thr);
|
||||
thr->StartThread();
|
||||
while(!thr->ThreadInitialized())
|
||||
thr->WaitInitialized();
|
||||
}
|
||||
|
||||
}
|
||||
else if (_count < TaskThreads.Size())
|
||||
{
|
||||
//KILLAMANJARO!
|
||||
size_t killCount = TaskThreads.Size() - _count;
|
||||
|
||||
//Out with the old
|
||||
for (i = TaskThreads.Size() - 1, j = 0; j < killCount; --i, ++j)
|
||||
KillTaskThread(i);
|
||||
}
|
||||
|
||||
TaskStreamLock.Release();
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
ZTaskThreadId ZTaskStream::AssignTask( ZTask *_task )
|
||||
{
|
||||
URFP(_task);
|
||||
|
||||
static int next = 0;
|
||||
|
||||
int taskThread = next++ % TaskThreads.Size();
|
||||
|
||||
return TaskThreads[taskThread]->Id;
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
void ZTaskStream::KillTaskThread( size_t _idx )
|
||||
{
|
||||
//Post a task that does not exist, task thread will shut down
|
||||
TaskThreads[_idx]->TaskSema.Post();
|
||||
TaskThreads[_idx]->ShutdownThread();
|
||||
zdelete TaskThreads[_idx];
|
||||
}
|
||||
Reference in New Issue
Block a user