thread (lx_thread.hpp)

From The Foundry MODO SDK wiki
Jump to: navigation, search
There are security restrictions on this page


Contents

Multithreading

This module provides a very simple interface for creating and managing lightweight threads. Although they can be grouped for ease of access, all the threads in an application reside in the same process context and share the same set of thread resources. The express purpose of the threads is to permit dividing the processing task of the application into parallel chunks that can be spread across multiple processors, if available.

Threading Interfaces

ILxThreadService service provides a simple interface into basic threading constructs and, more importantly, a way to synchronize with the host process and other threads.

(1) SDK: Declarations
 #define LXu_THREADSERVICE       "0A9D5B42-1DA6-42A4-8FC4-01FCCE939AC4"
 #define LXa_THREADSERVICE       "threadservice" 
 #define LXu_THREADMUTEX         "7624F6B7-83FD-424F-A68E-0EDD089167CB"
 #define LXa_THREADMUTEX         "threadmutex" 
 #define LXu_THREADJOB           "DE892B0B-A791-4FA5-B85D-46E8CACB695B"
 #define LXa_THREADJOB           "threadjob" 
 #define LXu_THREADGROUP         "54A9DD48-3AFC-435F-9F17-2EEB6FB46FBA"
 #define LXa_THREADGROUP         "threadgroup" 
 #define LXu_WORKLIST            "74568CA9-92C9-4C73-9851-E9169934629A"
 #define LXa_WORKLIST            "worklist" 
 #define LXu_THREADSLOT          "365E4616-0FB9-478E-993D-D35282F4C326" 
 #define LXu_THREADSLOTCLIENT    "D24835B6-518B-4E33-8A70-E53038C29BB7" 
 #define LXu_SHAREDWORK          "4D414F97-35A4-4B26-84FE-0E740C79722C" 
 #define LXu_THREADRANGEWORKER   "612153FE-572F-4CD6-8033-B905762C3106"
 #define LXa_THREADRANGEWORKER   "threadrangeworker"

Worklist

A worklist is a opaque container for units of work. Typically elements are processed one at a time, and can be split into new spawned lists.

(2) SDK: ILxWorkList interface
         LXxMETHOD ( LxResult,
 IsEmpty) (
         LXtObjectID              self);
 
         LXxMETHOD ( void *,
 Next) (
         LXtObjectID              self);
 
         LXxMETHOD ( LxResult,
 Split) (
         LXtObjectID              self,
         unsigned                 mode,
         void                   **ppvObj);
 
         LXxMETHOD ( void,
 Clear) (
         LXtObjectID              self);

(3) SDK: Declarations
 #define LXiWLSPLIT_NONE  0
 #define LXiWLSPLIT_ONE   1
 #define LXiWLSPLIT_HALF  2

(4) User Class: WorkList method
         bool
 Split (
         CLxLoc_WorkList         &wlist,
         unsigned                 mode = LXiWLSPLIT_NONE)
 {
         LXtObjectID              obj;
 
         if (LXx_FAIL (CLxLoc_WorkList::Split (mode, &obj)))
                 return false;
 
         return wlist.take (obj);
 }

Synchronization

Mutexes are used for synchronization of threads. Only a single thread may be "in" a mutex at a time. And other thread calling Enter will block until the owning thread has released the mutex with a Leave call. NOTE: Calling Enter from a thread that already has the mutex will result in deadlock.

Critical sections are closely related to mutexes with the one (significant) difference that a thread *can* re-enter a critical section it owns. These are usually not necessary and have a greater performance cost than mutexes. Critical sections share the THREADMUTEX interface.

(5) SDK: ILxThreadMutex interface
         LXxMETHOD ( void,
 Enter) (
         LXtObjectID              self);
 
         LXxMETHOD ( void,
 Leave) (
         LXtObjectID              self);

(6) User Class: ThreadMutex method

The thread service allows you to create mutexes.

(7) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult,
 CreateMutex) (
         LXtObjectID              self,
         void                   **ppvObj);

(8) User Service Class: ThreadService method
         bool
 NewMutex (
         CLxLoc_ThreadMutex      &mux)
 {
         LXtObjectID              obj;
 
         if (LXx_FAIL (CreateMutex (&obj)))
                 return false;
 
         return mux.take (obj);
 }

... and critical sections.

(9) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult,
 CreateCS) (
         LXtObjectID              self,
         void                   **ppvObj);

(10) User Service Class: ThreadService method
         bool
 NewCritSec (
         CLxLoc_ThreadMutex      &cs)
 {
         LXtObjectID              obj;
 
         if (LXx_FAIL (CreateCS (&obj)))
                 return false;
 
         return cs.take (obj);
 }

We also provide a couple of simple classes for self-allocating locks and automatically scoped enter and leave.

(11) User Class: additional
 class CLxMutexLock : public CLxUser_ThreadMutex
 {
     public:
         CLxMutexLock ()
         {
                 CLxUser_ThreadService ts;
                 ts.NewMutex (*this);
         }
 };
 
 class CLxCritSecLock : public CLxUser_ThreadMutex
 {
     public:
         CLxCritSecLock ()
         {
                 CLxUser_ThreadService ts;
                 ts.NewCritSec (*this);
         }
 };
 
 class CLxArmLockedMutex : public CLxArm
 {
     public:
         CLxUser_ThreadMutex     &lock;
 
         CLxArmLockedMutex (CLxUser_ThreadMutex &mux) : lock (mux)
         {
                 lock.Enter ();
         }
 
         ~CLxArmLockedMutex ()
         {
                 if (armed)
                         lock.Leave ();
         }
 };

Jobs and Groups

Clients specify the work they wish to perform by creating a Job. The job is a very simple object that has a single method - Execute. Jobs can be added to other ThreadService objects which can in turn control their threaded execution.

(12) SDK: ILxThreadJob interface
         LXxMETHOD ( void,
 Execute) (
         LXtObjectID              self);

A more useful higher level construct than the Job is the thread group. Thread Groups allow the client launch several threads at once. Each thread in the group gets its own ThreadFunc and data. Usually, all the threads in the group will have the same ThreadFunc but different pieces of data. They may run either synchronously or asynchronously.

(13) SDK: ILxThreadGroup interface
         LXxMETHOD ( void,
 AddJob) (
         LXtObjectID              self,
         LXtObjectID              job);
 
         LXxMETHOD ( unsigned,
 NumJobs) (
         LXtObjectID              self);
 
         LXxMETHOD ( void,
 Clear) (
         LXtObjectID              self);
 
         LXxMETHOD ( void,
 Execute) (
         LXtObjectID              self);
 
         LXxMETHOD ( void,
 Wait) (
         LXtObjectID              self);
 
         LXxMETHOD ( LxResult,
 Running) (
         LXtObjectID              self);
 
         LXxMETHOD ( void,
 Kill) (
         LXtObjectID              self);

(14) User Class: ThreadGroup method

The thread service interface allows new groups to be created.

(15) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult, 
 CreateGroup) (
         LXtObjectID              self,
         void                   **ppvObj);

(16) User Service Class: ThreadService method
         bool
 NewGroup (
         CLxLoc_ThreadGroup      &tg)
 {
         LXtObjectID              obj;
 
         if (LXx_FAIL (CreateGroup (&obj)))
                 return false;
 
         return tg.take (obj);
 }

There are also a couple of simple methods to determine the state of the threading systems.

(17) SDK: ILxThreadService interface
         LXxMETHOD ( unsigned int,
 NumProcs) (
         LXtObjectID              self);
 
         LXxMETHOD ( unsigned int,
 IsMainThread) (
         LXtObjectID              self);

Thread Slots

Slots are a way to store data locally for each thread.

(18) SDK: ILxThreadSlot interface
         LXxMETHOD ( LxResult,
 Set) (
         LXtObjectID              self,
         void                    *value);
 
         LXxMETHOD ( LxResult,
 Get) (
         LXtObjectID              self,
         void                   **value);
 
         LXxMETHOD ( LxResult,
 Clear) (
         LXtObjectID              self);

(19) User Class: ThreadSlot method

Defining a slot can use a client class for allocating and freeing the data.

(20) SDK: ILxThreadSlotClient interface
         LXxMETHOD ( LxResult,
 Alloc) (
         LXtObjectID              self,
         void                   **value);
 
         LXxMETHOD ( LxResult,
 Free) (
         LXtObjectID              self,
         void                    *value);

Finally this method allows allocating a new thread slot.

(21) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult, 
 CreateSlot) (
         LXtObjectID              self,
         size_t                   size,
         LXtObjectID              client,
         void                    **ppvObj);

(22) User Service Class: ThreadService method
         bool
 NewSlot (
         CLxLoc_ThreadSlot       &ts,
         size_t                   size)
 {
         LXtObjectID              obj;
 
         if (LXx_FAIL (CreateSlot (size, 0, &obj)))
                 return false;
 
         return ts.take (obj);
 }
 
         bool
 NewSlot (
         CLxLoc_ThreadSlot       &ts,
         ILxUnknownID             client)
 {
         LXtObjectID              obj;
 
         if (LXx_FAIL (CreateSlot (0, client, &obj)))
                 return false;
 
         return ts.take (obj);
 }

Shared Work

Shared work represents a batch of work that can be done in parallel.

Evaluate
Process one unit of work and return a result code. An error will terminate the whole process. If there is no more work left to do return LXe_FALSE.
Spawn
Create a new shared work object of the same type. The new object should have no work.
Share
Move work from this object to the other. This is typically only one unit of work, but if the work units are especially small, more than one at a time can be moved to reduce contention. When there is no work left return LXe_FALSE.

(23) SDK: ILxSharedWork interface
         LXxMETHOD ( LxResult,
 Evaluate) (
         LXtObjectID              self);
 
         LXxMETHOD ( LxResult,
 Spawn) (
         LXtObjectID              self,
         void                   **ppvObj);
 
         LXxMETHOD ( LxResult,
 Share) (
         LXtObjectID              self,
         LXtObjectID              other,
         unsigned int             split);

You start with a single one of these objects that contains all the work. You then call the ProcesShared() method in the service. This will spawn enough of the shared work objects to populate the available computing resources. Each one will then process all the work it has, getting more from the main shared work object when they are empty. When all the work is done the sub-objects are destroyed.

(24) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult, 
 ProcessShared) (
         LXtObjectID              self,
         LXtObjectID              shared);

Range Worker

A range worker performs processing over a range of indices (such as scanlines in an image).

(25) SDK: ILxThreadRangeWorker interface
         LXxMETHOD ( LxResult,
 Execute) (
         LXtObjectID              self,
         int                      index,
         void                    *sharedData);

Multi-processing over a range of indices (such as scanlines in an image).

(26) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult,
 ProcessRange) (
         LXtObjectID              self,
         void                    *data,
         int                      startIndex,
         int                      endIndex,
         LXtObjectID              rangeWorker);

External Thread Initialization

If a plugin creates its own threads, using some external library (like pthreads or OpenMP), it needs to initialize itself before it can call Nexus functions. This function allows that. If the thread has already initialized itself, this function will do nothing.

(27) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult,
 InitThread) (
         LXtObjectID               self);

Once the thread has finished executing Nexus code, it needs to free the Nexus specific thread-data, so it must call CleanupThread, or there will be a memory leak.

(28) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult,
 CleanupThread) (
         LXtObjectID               self);

Note that it's actually okay for a single thread to call Init, then Cleanup, then Init, etc. The only thing necessary is that there is a Cleanup for every Init, that all calls into Nexus come after an Init and before the subsequent Cleanup. Nested Init/Cleanup calls will actually work correctly, where only the last Cleanup will fire (the thread reference counts).

Waterfall

A waterfall is a model of threaded computation that combines parallel and sequential elements. The metaphor is of a waterfall consisting of a set of pools at different levels. Water fills the first set of pools in parallel, but the pools at the next level don't start to fill until the first level is completely full.

Likewise the waterfall object presents a parade of work units in stages. The work at each stage has to be complete before the next stage can begin. The object manages not only the work list but the processing, which it does by allowing multiple copies of itself to work on the same list.

(29) SDK: Declarations
 #define LXu_WATERFALL           "2B845B2A-06BE-4C90-8E50-58F7FBEEC25E"
 #define LXa_WATERFALL           "waterfall" 
 #define LXiWFALL_DONE            0
 #define LXiWFALL_HASWORK         1
 #define LXiWFALL_NEXT_WORK       2
 #define LXiWFALL_NEXT_STAGE      3

Spawn() creates a new waterfall object taking the first work in the current stage. If there's no more work in this stage this should fail.

(30) SDK: ILxWaterfall interface
         LXxMETHOD ( LxResult,
 Spawn) (
         LXtObjectID              self,
         void                   **ppvObj);

Query the state of the waterfall object. This returns one of the LXiWFALL state values.

DONE
indicates that the last work in the last stage has been completed.
HASWORK
means that this waterfall object has work ready to be processed.
NEXT_WORK, NEXT_STAGE
if the waterfall object has no work it can indicate that there is more work in this stage, or that the current stage is complete but there is more work in the next stage.

(31) SDK: ILxWaterfall interface
         LXxMETHOD ( unsigned,
 State) (
         LXtObjectID              self);

If this object has work loaded it can be discharged with this method. This is the only method that will be called concurrently; all other methods will be called in one thread at a time and so don't need to protect themselves.

(32) SDK: ILxWaterfall interface
         LXxMETHOD ( LxResult,
 ProcessWork) (
         LXtObjectID              self);

If there is work available in the current stage this is used to take one unit of work and load it into this object.

(33) SDK: ILxWaterfall interface
         LXxMETHOD ( LxResult,
 GetWork) (
         LXtObjectID              self);

If there is no work in the current stage this is called to advance the waterfall to the next stage. All the work in the previous stage will have been completely processed.

(34) SDK: ILxWaterfall interface
         LXxMETHOD ( LxResult,
 Advance) (
         LXtObjectID              self);

This method takes a waterfall object and processes all the work it contains. New instances will be spawned to fill out the given number of threads, or one for each processor is the thread count is zero.

(35) SDK: ILxThreadService interface
         LXxMETHOD ( LxResult,
 ProcessWaterfall) (
         LXtObjectID              self,
         LXtObjectID              waterfall,
         unsigned                 threads);

Atomic Operations

This section defines functions that are assurably atomic, meaning that there are no possible race conditions if multiple threads call them on the same data at once.

All values needs to be aligned to their size (i.e. 32-bit ints needs to be 32-bit aligned, and 64-bit ints need to be 64-bit aligned). If they're not aligned, the functions do nothing. All functions return the result of the increment or decrement.

These functions atomically increment and decrement an address-aligned integer.

(36) SDK: ILxThreadService interface
         LXxMETHOD ( int,
 AtomicIncrement) (
         LXtObjectID              self,
         volatile int            *addr);
 
         LXxMETHOD ( int,
 AtomicDecrement) (
         LXtObjectID              self,
         volatile int            *addr);

These functions are the same but they allow you to add and subtract any number. They return the result of the addition or subtraction.

(37) SDK: ILxThreadService interface
         LXxMETHOD ( int,
 AtomicIntegerAdd) (
         LXtObjectID              self,
         volatile int            *addr,
         int                      val);
 
         LXxMETHOD ( int,
 AtomicIntegerSubtract) (
         LXtObjectID              self,
         volatile int            *addr,
         int                      val);