Here's the final code for those interested:
class LocklessMailbox{public: // // Construct and initialize the read and write stacks // LocklessMailbox() { std::auto_ptr wh(new Node); std::auto_ptr rh(new Node); WriteHead = wh.get(); WriteHead->Next = NULL; WriteHead->Payload = NULL; ReadHead = rh.get(); ReadHead->Next = NULL; ReadHead->Payload = NULL; wh.release(); rh.release(); } // // Clean up the read, write, and cache stacks, freeing any // remaining messages from each stack. // ~LocklessMailbox() { Release(); }public: // // Register a message from a producer thread. Any number of threads // may call this function. // void AddMessage(MessageInfo* info) { Node* msgnode = new Node; msgnode->Payload = info; Node* oldval; Node* newval; Node* retval; do { msgnode->Next = WriteHead; __asm mfence; oldval = msgnode->Next; newval = msgnode; Node** fieldaddr = &(WriteHead); _asm { mfence mov eax, oldval mov ecx, newval mov edx, dword ptr [fieldaddr] lock cmpxchg dword ptr[edx], ecx mov retval, eax mfence } } while(retval != oldval); } // // Retrieve a message from the mailbox. // IMPORTANT: only ONE consumer thread (per mailbox) should call this function // MessageInfo* GetMessage() { if(!PendingReads.empty()) { Node* readnode = PendingReads.top(); MessageInfo* payload = readnode->Payload; delete readnode; PendingReads.pop(); return payload; } if(ReadHead->Next == NULL) SwapReadAndWrite(); Node* n = ReadHead; while(ReadHead->Next) { PendingReads.push(n); n = n->Next; ReadHead = n; } if(PendingReads.empty()) return NULL; Node* readnode = PendingReads.top(); MessageInfo* payload = readnode->Payload; delete readnode; PendingReads.pop(); return payload; }private: // // Internal helper for swapping the read/write stacks // See class comment for details // void SwapReadAndWrite() { Node* oldval; Node* newval; Node* retval; Node* swappedreadhead; do { swappedreadhead = WriteHead; __asm mfence; oldval = swappedreadhead; newval = ReadHead; Node** fieldaddr = &(WriteHead); _asm { mfence mov eax, oldval mov ecx, newval mov edx, dword ptr [fieldaddr] lock cmpxchg dword ptr[edx], ecx mov retval, eax mfence } } while(retval != oldval); ReadHead = swappedreadhead; } // // Release the mailbox and free any remaining messages // void Release() { for(std::stack::container_type::iterator iter = PendingReads.c.begin(); iter != PendingReads.c.end(); ++iter) { delete (*iter)->Payload; delete *iter; } Node* n = WriteHead; while(n) { delete n->Payload; Node* nextn = n->Next; delete n; n = nextn; } n = ReadHead; while(n) { delete n->Payload; Node* nextn = n->Next; delete n; n = nextn; } }private: struct Node { Node* Next; MessageInfo* Payload; }; Node* WriteHead; Node* ReadHead; std::stack PendingReads;};
(And if you happen to catch a bug, let me know; I'm banking pretty heavily on this code.)
So that takes care of all the big-ticket items on my list for the GDC release; all I have left to do is some thorough code cleanup, some documentation, and then put together the sales pitch documents.
Overall I'm very happy with this release; as usual it's a huge step beyond the previous release, and I already have a sketched-out roadmap for R7 that'll be even cooler.
Right... back to work [grin]
In looking at your code, it seems to me that each thread would have it's own LocklessMailbox? Pretty nice class setup here. I've actually been getting started on a multithreaded game engine recently and have been trying to figure out how to set up a messaging system across all threads that didn't slow the rest of the engine down. My ideas always centered around having a single Messaging component for all threads to access. That, along with my lack of understanding of basic threading concepts, is probably why I wasn't able to write a good messaging class.
Keep up the good work, and congrats on the GDC 2009 release of Epoch!
-Wynter Woods