/* ZTaskStream.hpp Author: James Russell Purpose: Manages the execution of 'Tasks' by a number of threads determined at runtime. Attempts to parallelize execution as well as possible with the provided information. Changelog 2011/04/10 - creation (jcrussell) */ #ifndef _ZTASKSTREAM_H #define _ZTASKSTREAM_H #include #include //size_t //Forward Declarations class ZTaskStream; typedef size_t ZTaskPriority; //Task Priority typedef size_t ZTaskThreadId; //Task Thread Id //Definition for immediate task execution priority #define ZTASK_PRIORITY_IMMEDIATE (0) //Default allocator buffer size for the task thread #define ZTT_DEFAULT_BUFFER_SIZE 256 //Default allocator buffer size for the task stream #define ZTS_DEFAULT_BUFFER_SIZE 1024 /* ZTask execute return values. */ enum ZTaskReturnStatus { ZTRS_SUCCESS, // Task has succeeded ZTRS_FAILURE, // Task has failed ZTRS_ENUM_SIZE }; /* ZTask thread Status enumeration. */ enum ZTaskThreadStatus { ZTTS_UNINITIALIZED, // Task thread is uninitialized ZTTS_WAITING, // Task thread is awaiting tasks ZTTS_EXECUTING, // Task thread is executing tasks ZTTS_ENUM_SIZE }; /* Tasks are the basic unit of execution for a ZTaskStream. They are added to the ZTaskStream and assigned to a thread for execution. They are then executed as TaskThreads become available. */ class ZTask { friend class ZTaskThread; friend class ZTaskStream; public: ZTaskStream* TaskStream; // The task stream that is executing this task ZTaskPriority Priority; // Priority, assigned at construction, decreased each time it is delayed to zero, which is immediate priority bool bCancelTask; // Cancel Boolean, if set to true, will prevent the task from being executed /* Parameterized c'tor (with default arguments). @param _priority - The amount of time (in ms) the task can be delayed is priority. This means the closer to zero the priority is set, the faster it will be executed. */ ZTask(ZTaskPriority _priority = ZTASK_PRIORITY_IMMEDIATE) : TaskStream(NULL), Priority(_priority), bCancelTask(false), Func(NULL), TaskArgument(NULL) { } /* Parameterized c'tor. @param _func - function to execute @param _taskArg - argument to the task @param _priority - The amount of time (in ms) the task can be delayed is priority. This means the closer to zero the priority is set, the faster it will be executed. */ ZTask(ZTaskReturnStatus (*_func)(ZTaskStream*, ZTask*, void*), void *_taskArg, ZTaskPriority _priority = ZTASK_PRIORITY_IMMEDIATE) : TaskStream(NULL), Priority(_priority), bCancelTask(false), Func(_func),TaskArgument(_taskArg) { } /* Destructor. */ virtual ~ZTask() { } /* virtual public ZTask::Execute Execute method. Defined by the task implementation. @param _arg - argument to the task (NULL by default) @return (ZTaskReturnStatus) - task return value */ virtual ZTaskReturnStatus Execute(void *_arg) { URFP(_arg); if (Func != NULL) return Func(TaskStream, this, TaskArgument); else return ZTRS_FAILURE; } /* virtual public ZTask::OnTaskFailed Handler function for task failure. No-op by default. @return (void) */ virtual void OnTaskFailed(void *_taskArgument) { URFP(_taskArgument); } protected: ZTaskReturnStatus (*Func)(ZTaskStream*, ZTask*, void*); // function for the ZTask to execute if not implemented as a subtask void* TaskArgument; // Argument passed to this task }; /* Future. Used to asynchronously execute a function that will return a value when completed. */ template class ZFuture { friend class ZTaskStream; protected: // Future Task class ZFutureTask : public ZTask { public: R (*Func)(A); // The Function A Arg; // The argument R Ret; // The return value bool Completed; // Completed flag ZEvent CompletedEvent; // Completed Event // c'tor ZFutureTask(R (*_func)(A), A _arg) : ZTask(), Func(_func), Arg(_arg), Ret(), Completed(false), CompletedEvent() { } // d'tor ~ZFutureTask() { } // subclass override virtual ZTaskReturnStatus Execute(void *_arg) { URFP(_arg); CompletedEvent.Reset(); Ret = Func(Arg); Completed = true; CompletedEvent.Notify(); return ZTRS_SUCCESS; } }; ZPtr FutureTask; // The function execute task public: /* Parameterized c'tor. @param _func - the function to execute, must return a value of type R @param _arg - argument to the function */ ZFuture(R (_func)(A), A _arg) : FutureTask(znew ZFutureTask(_func, _arg)) { } /* public ZFuture::IsComplete Returns whether or not the future has completed. @return (bool) - true if the task has completed */ bool IsComplete() { return FutureTask->Completed; } /* public ZFuture::GetValue Gets the return value from the future. @return (R) - return from the task */ R GetValue() { if (!FutureTask->Completed) FutureTask->CompletedEvent.Wait(); return FutureTask->Ret; } /* public ZFuture::GetValueRef Gets the return value from the future as a reference. @return (R&) - reference to return from the task */ R& GetValueRef() { if (!FutureTask->Completed) FutureTask->CompletedEvent.Wait(); return FutureTask->Ret; } }; /* Threads which execute tasks in the ZTaskStream. The number of operating Task Threads is assigned at construction of a ZTaskStream and can be changed. */ class ZTaskThread : public ZThread { friend class ZTaskStream; public: /* Parameterized c'tor. @param _stream - the stream giving us tasks @param _id - the id of this task */ ZTaskThread(ZTaskStream *_stream, ZTaskThreadId _id); // d'tor virtual ~ZTaskThread(); /* public ZTaskThread::PushTask() Adds a task to the task thread. @param _task - the task to add to this task thread (argument already set) @return (void) */ void PushTask(ZPtr _task); protected: // Subclass implementation of thread run, which executes tasks ZThreadReturn run(uint64_t _dt); private: ZMutex TaskLock; // Concurrency lock ZSemaphore TaskSema; // Event that is set when new tasks are available ZTaskStream* Stream; // The task stream providing us with tasks ZTaskThreadId Id; // Task thread ID, used to identify this task thread ZTaskThreadStatus ThrStatus; // Status of this task thread ZTaskPriority CurrentTaskPriority; //The priority of the currently executing task ZArray< ZPtr > Tasks; //Set of tasks to execute }; /* The task stream. Assigns tasks to task threads that execute when desired. Can also handle 'background' tasks. */ class ZTaskStream { public: /* Default c'tor. Creates a single task thread. The amount of task threads running concurrently can be changed using SetTaskThreadCount. */ ZTaskStream(); // d'tor ~ZTaskStream(); /* public ZTaskStream::ExecuteTasks Begins execution on the currently loaded set of tasks and any further tasks generated during the execution of those tasks. Returns after all tasks that will be executed this frame are done executing. Tasks that were not completed have had their priority values reduced, meaning that if they are again added to the ZTaskStream for execution repeatedly until executed, they are guaranteed to eventually execute within their priority time frame. @param _frameTime - (given in ms) is the desired amount of time that execution of all tasks should take; if any time is remaining after execution of ALL priority zero tasks, a number of extra priority > 0 tasks are executed as can be fit into the extra time. Because of this definition, frameTime is an estimate and not a guarantee - execution of priority 0 tasks can take more than frameTime. @return (void) */ void ExecuteTasks(int _frameTime); /* public ZTaskStream::GetTaskThreadCount Gets the current number of worker threads. @return (size_t) - number of task stream worker threads */ size_t GetTaskThreadCount(); /* public ZTaskStream::HaltFrame Notifies the ZTaskStream to stop assigning tasks tasks and return from the ExecuteTasks call that started the process. This does not stop the execution of tasks that have already been assigned. @return (void) */ void HaltFrame(); /* public ZTaskStream::Pause Notifies the ZTaskStream to pause task execution for all tasks. @param _pause - true to pause the stream, false to unpause @return (void) */ void PauseFrame(bool _pause); /* public ZTaskStream::PushFuture Adds a ZFuture to the task stream, which will execute the future at the soonest available opportunity. The ZSmartPointer version will hold a reference to the future until after execution. If the raw pointer version is used, the user is required to keep the pointer valid until after execution. @param _future - the future to execute @return (void) */ template void PushFuture(ZPtr< ZFuture > _future) { TaskThreads[AssignTask(_future->FutureTask.Pointer())]->PushTask(_future->FutureTask); } template void PushFuture(ZFuture* _future) { // TODO } /* public ZTaskStream::PushTask Adds a task to the task list. The task is added to the task queue in the currently executing frame phase, or if a phase is not yet executing, it will be added to the next frame phase to execute. The ZSmartPointer version will hold a reference to the task until after execution. If the raw pointer version is used, the user is required to keep the pointer valid until after execution. @param _task - the task to add @return (void) - the id of the task */ void PushTask(ZPtr _task); void PushTask(ZTask* _task); /* public ZTaskStream::PushTasks Adds a list of tasks. Identical to PushTask() but with less locking overhead. The list of tasks is not emptied (i.e _tasks.Clear() is not called). The ZSmartPointer version will hold a reference to the tasks until after execution. If the raw pointer version is used, the user is required to keep the pointer valid until after execution. @param _tasks - the tasks to add @param _count - the number of tasks @return (void) */ void PushTasks(ZArray< ZPtr >& _tasks); void PushTasks(ZTask* _tasks, size_t _count); /* public ZTaskStream::SetTaskThreadCount Sets the number of task threads that will be running. Should be greater than zero, or the call has no effect. @param _count - the number of task threads to run simultaneously @return (void) */ void SetTaskThreadCount(size_t _count); protected: ZMutex TaskStreamLock; // Lock for controlling access to the ZTaskStream objects ZSemaphore TaskStreamSema; // Semaphore for signaling events ZEvent FrameCompletedEvent; // Event for notifying the execute caller thread that a frame is completed ZEvent PausedEvent; // Event for pausing the Task Stream ZArray TaskThreads; // Task threads that run the set of tasks ZArray< ZPtr > Tasks; // The task list // Pause flag, which indicates that task execution should be paused // This is only used for debugging purposes, 'PausedEvent' has the same state // wrapped into an OS object. bool bPaused; //Halt flag, which indicates that task execution should halt bool bShouldHalt; //Assigns a task to one of the task threads ZTaskThreadId AssignTask( ZTask *_task); //Kills a given task thread void KillTaskThread(size_t _idx); }; #endif