TrinityCore
MPSCQueue.h
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef MPSCQueue_h__
19#define MPSCQueue_h__
20
21#include <array>
22#include <atomic>
23#include <new>
24
25namespace Trinity
26{
27namespace Impl
28{
29// C++ implementation of Dmitry Vyukov's lock free MPSC queue
30// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
31template<typename T>
33{
34public:
35 MPSCQueueNonIntrusive() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
36 {
37 Node* front = _head.load(std::memory_order_relaxed);
38 front->Next.store(nullptr, std::memory_order_relaxed);
39 }
40
42 {
43 T* output;
44 while (Dequeue(output))
45 delete output;
46
47 Node* front = _head.load(std::memory_order_relaxed);
48 delete front;
49 }
50
51 void Enqueue(T* input)
52 {
53 Node* node = new Node(input);
54 Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);
55 prevHead->Next.store(node, std::memory_order_release);
56 }
57
58 bool Dequeue(T*& result)
59 {
60 Node* tail = _tail.load(std::memory_order_relaxed);
61 Node* next = tail->Next.load(std::memory_order_acquire);
62 if (!next)
63 return false;
64
65 result = next->Data;
66 _tail.store(next, std::memory_order_release);
67 delete tail;
68 return true;
69 }
70
71private:
72 struct Node
73 {
74 Node() = default;
75 explicit Node(T* data) : Data(data)
76 {
77 Next.store(nullptr, std::memory_order_relaxed);
78 }
79
80 T* Data;
81 std::atomic<Node*> Next;
82 };
83
84 std::atomic<Node*> _head;
85 std::atomic<Node*> _tail;
86
89};
90
91// C++ implementation of Dmitry Vyukov's lock free MPSC queue
92// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
93template<typename T, std::atomic<T*> T::* IntrusiveLink>
95{
96 using Atomic = std::atomic<T*>;
97
98public:
99 MPSCQueueIntrusive() : _dummy(), _dummyPtr(reinterpret_cast<T*>(_dummy.data())), _head(_dummyPtr), _tail(_dummyPtr)
100 {
101 // _dummy is constructed from raw byte array and is intentionally left uninitialized (it might not be default constructible)
102 // so we init only its IntrusiveLink here
103 Atomic* dummyNext = new (&(_dummyPtr->*IntrusiveLink)) Atomic();
104 dummyNext->store(nullptr, std::memory_order_relaxed);
105 }
106
108 {
109 T* output;
110 while (Dequeue(output))
111 delete output;
112
113 // destruct our dummy atomic
114 (_dummyPtr->*IntrusiveLink).~Atomic();
115 }
116
117 void Enqueue(T* input)
118 {
119 (input->*IntrusiveLink).store(nullptr, std::memory_order_release);
120 T* prevHead = _head.exchange(input, std::memory_order_acq_rel);
121 (prevHead->*IntrusiveLink).store(input, std::memory_order_release);
122 }
123
124 bool Dequeue(T*& result)
125 {
126 T* tail = _tail.load(std::memory_order_relaxed);
127 T* next = (tail->*IntrusiveLink).load(std::memory_order_acquire);
128 if (tail == _dummyPtr)
129 {
130 if (!next)
131 return false;
132
133 _tail.store(next, std::memory_order_release);
134 tail = next;
135 next = (next->*IntrusiveLink).load(std::memory_order_acquire);
136 }
137
138 if (next)
139 {
140 _tail.store(next, std::memory_order_release);
141 result = tail;
142 return true;
143 }
144
145 T* head = _head.load(std::memory_order_acquire);
146 if (tail != head)
147 return false;
148
150 next = (tail->*IntrusiveLink).load(std::memory_order_acquire);
151 if (next)
152 {
153 _tail.store(next, std::memory_order_release);
154 result = tail;
155 return true;
156 }
157 return false;
158 }
159
160private:
161 alignas(T) std::array<std::byte, sizeof(T)> _dummy;
165
168};
169}
170}
171
172template<typename T, std::atomic<T*> T::* IntrusiveLink = nullptr>
173using MPSCQueue = std::conditional_t<IntrusiveLink != nullptr, Trinity::Impl::MPSCQueueIntrusive<T, IntrusiveLink>, Trinity::Impl::MPSCQueueNonIntrusive<T>>;
174
175#endif // MPSCQueue_h__
std::conditional_t< IntrusiveLink !=nullptr, Trinity::Impl::MPSCQueueIntrusive< T, IntrusiveLink >, Trinity::Impl::MPSCQueueNonIntrusive< T > > MPSCQueue
Definition: MPSCQueue.h:173
std::array< std::byte, sizeof(T)> _dummy
Definition: MPSCQueue.h:161
MPSCQueueIntrusive(MPSCQueueIntrusive const &)=delete
MPSCQueueIntrusive & operator=(MPSCQueueIntrusive const &)=delete
std::atomic< T * > Atomic
Definition: MPSCQueue.h:96
MPSCQueueNonIntrusive & operator=(MPSCQueueNonIntrusive const &)=delete
std::atomic< Node * > _head
Definition: MPSCQueue.h:84
std::atomic< Node * > _tail
Definition: MPSCQueue.h:85
MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const &)=delete
STL namespace.