TrinityCore
Socket.h
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef __SOCKET_H__
19#define __SOCKET_H__
20
21#include "Log.h"
22#include "MessageBuffer.h"
23#include <boost/asio/ip/tcp.hpp>
24#include <atomic>
25#include <memory>
26#include <queue>
27#include <type_traits>
28
29#define READ_BLOCK_SIZE 4096
30#ifdef BOOST_ASIO_HAS_IOCP
31#define TC_SOCKET_USE_IOCP
32#endif
33
61template<class T, class Stream = boost::asio::ip::tcp::socket>
62class Socket : public std::enable_shared_from_this<T>
63{
64public:
65 template<typename... Args>
66 explicit Socket(boost::asio::ip::tcp::socket&& socket, Args&&... args) : _socket(std::move(socket), std::forward<Args>(args)...),
67 _remoteAddress(_socket.remote_endpoint().address()), _remotePort(_socket.remote_endpoint().port()),
68 _closed(false), _closing(false), _isWritingAsync(false)
69 {
71 }
72
73 Socket(Socket const& other) = delete;
74 Socket(Socket&& other) = delete;
75 Socket& operator=(Socket const& other) = delete;
76 Socket& operator=(Socket&& other) = delete;
77
78 virtual ~Socket()
79 {
80 _closed = true;
81 boost::system::error_code error;
82 _socket.close(error);
83 }
84
85 virtual void Start() = 0;
86
87 virtual bool Update()
88 {
89 if (_closed)
90 return false;
91
92#ifndef TC_SOCKET_USE_IOCP
93 if (_isWritingAsync || (_writeQueue.empty() && !_closing))
94 return true;
95
96 for (; HandleQueue();)
97 ;
98#endif
99
100 return true;
101 }
102
103 boost::asio::ip::address GetRemoteIpAddress() const
104 {
105 return _remoteAddress;
106 }
107
109 {
110 return _remotePort;
111 }
112
114 {
115 if (!IsOpen())
116 return;
117
120 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
121 [self = this->shared_from_this()](boost::system::error_code const& error, size_t transferredBytes)
122 {
123 self->ReadHandlerInternal(error, transferredBytes);
124 });
125 }
126
127 void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code const&, std::size_t))
128 {
129 if (!IsOpen())
130 return;
131
134 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
135 [self = this->shared_from_this(), callback](boost::system::error_code const& error, size_t transferredBytes)
136 {
137 (self.get()->*callback)(error, transferredBytes);
138 });
139 }
140
142 {
143 _writeQueue.push(std::move(buffer));
144
145#ifdef TC_SOCKET_USE_IOCP
147#endif
148 }
149
150 bool IsOpen() const { return !_closed && !_closing; }
151
153 {
154 if (_closed.exchange(true))
155 return;
156
157 boost::system::error_code shutdownError;
158 _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
159 if (shutdownError)
160 TC_LOG_DEBUG("network", "Socket::CloseSocket: {} errored when shutting down socket: {} ({})", GetRemoteIpAddress().to_string(),
161 shutdownError.value(), shutdownError.message());
162
163 OnClose();
164 }
165
168 {
169 if (_closing.exchange(true))
170 return;
171
172 if (_writeQueue.empty())
173 CloseSocket();
174 }
175
177
178protected:
179 virtual void OnClose() { }
180
181 virtual void ReadHandler() = 0;
182
184 {
185 if (_isWritingAsync)
186 return false;
187
188 _isWritingAsync = true;
189
190#ifdef TC_SOCKET_USE_IOCP
191 MessageBuffer& buffer = _writeQueue.front();
192 _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()),
193 [self = this->shared_from_this()](boost::system::error_code const& error, std::size_t transferedBytes)
194 {
195 self->WriteHandler(error, transferedBytes);
196 });
197#else
198 _socket.async_write_some(boost::asio::null_buffers(),
199 [self = this->shared_from_this()](boost::system::error_code const& error, std::size_t transferedBytes)
200 {
201 self->WriteHandlerWrapper(error, transferedBytes);
202 });
203#endif
204
205 return false;
206 }
207
208 void SetNoDelay(bool enable)
209 {
210 boost::system::error_code err;
211 _socket.set_option(boost::asio::ip::tcp::no_delay(enable), err);
212 if (err)
213 TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for {} - {} ({})",
214 GetRemoteIpAddress().to_string(), err.value(), err.message());
215 }
216
218 {
219 return _socket;
220 }
221
222private:
223 void ReadHandlerInternal(boost::system::error_code const& error, size_t transferredBytes)
224 {
225 if (error)
226 {
227 CloseSocket();
228 return;
229 }
230
231 _readBuffer.WriteCompleted(transferredBytes);
232 ReadHandler();
233 }
234
235#ifdef TC_SOCKET_USE_IOCP
236
237 void WriteHandler(boost::system::error_code const& error, std::size_t transferedBytes)
238 {
239 if (!error)
240 {
241 _isWritingAsync = false;
242 _writeQueue.front().ReadCompleted(transferedBytes);
243 if (!_writeQueue.front().GetActiveSize())
244 _writeQueue.pop();
245
246 if (!_writeQueue.empty())
248 else if (_closing)
249 CloseSocket();
250 }
251 else
252 CloseSocket();
253 }
254
255#else
256
257 void WriteHandlerWrapper(boost::system::error_code const& /*error*/, std::size_t /*transferedBytes*/)
258 {
259 _isWritingAsync = false;
260 HandleQueue();
261 }
262
264 {
265 if (_writeQueue.empty())
266 return false;
267
268 MessageBuffer& queuedMessage = _writeQueue.front();
269
270 std::size_t bytesToSend = queuedMessage.GetActiveSize();
271
272 boost::system::error_code error;
273 std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
274
275 if (error)
276 {
277 if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
278 return AsyncProcessQueue();
279
280 _writeQueue.pop();
281 if (_closing && _writeQueue.empty())
282 CloseSocket();
283 return false;
284 }
285 else if (bytesSent == 0)
286 {
287 _writeQueue.pop();
288 if (_closing && _writeQueue.empty())
289 CloseSocket();
290 return false;
291 }
292 else if (bytesSent < bytesToSend) // now n > 0
293 {
294 queuedMessage.ReadCompleted(bytesSent);
295 return AsyncProcessQueue();
296 }
297
298 _writeQueue.pop();
299 if (_closing && _writeQueue.empty())
300 CloseSocket();
301 return !_writeQueue.empty();
302 }
303
304#endif
305
306 Stream _socket;
307
308 boost::asio::ip::address _remoteAddress;
310
312 std::queue<MessageBuffer> _writeQueue;
313
314 std::atomic<bool> _closed;
315 std::atomic<bool> _closing;
316
318};
319
320#endif // __SOCKET_H__
uint16_t uint16
Definition: Define.h:143
#define TC_LOG_DEBUG(filterType__, message__,...)
Definition: Log.h:179
#define READ_BLOCK_SIZE
Definition: Socket.h:29
void Resize(size_type bytes)
Definition: MessageBuffer.h:51
size_type GetRemainingSpace() const
Definition: MessageBuffer.h:68
void ReadCompleted(size_type bytes)
Definition: MessageBuffer.h:62
void WriteCompleted(size_type bytes)
Definition: MessageBuffer.h:64
uint8 * GetReadPointer()
Definition: MessageBuffer.h:58
size_type GetActiveSize() const
Definition: MessageBuffer.h:66
uint8 * GetWritePointer()
Definition: MessageBuffer.h:60
void EnsureFreeSpace()
Definition: MessageBuffer.h:85
void Normalize()
Definition: MessageBuffer.h:73
Definition: Socket.h:63
MessageBuffer & GetReadBuffer()
Definition: Socket.h:176
void WriteHandlerWrapper(boost::system::error_code const &, std::size_t)
Definition: Socket.h:257
Socket & operator=(Socket &&other)=delete
bool HandleQueue()
Definition: Socket.h:263
std::atomic< bool > _closing
Definition: Socket.h:315
Stream _socket
Definition: Socket.h:306
MessageBuffer _readBuffer
Definition: Socket.h:311
Socket & operator=(Socket const &other)=delete
virtual bool Update()
Definition: Socket.h:87
bool _isWritingAsync
Definition: Socket.h:317
Socket(boost::asio::ip::tcp::socket &&socket, Args &&... args)
Definition: Socket.h:66
void AsyncReadWithCallback(void(T::*callback)(boost::system::error_code const &, std::size_t))
Definition: Socket.h:127
boost::asio::ip::address _remoteAddress
Definition: Socket.h:308
void DelayedCloseSocket()
Marks the socket for closing after write buffer becomes empty.
Definition: Socket.h:167
Socket(Socket &&other)=delete
boost::asio::ip::address GetRemoteIpAddress() const
Definition: Socket.h:103
void QueuePacket(MessageBuffer &&buffer)
Definition: Socket.h:141
bool IsOpen() const
Definition: Socket.h:150
bool AsyncProcessQueue()
Definition: Socket.h:183
void SetNoDelay(bool enable)
Definition: Socket.h:208
void AsyncRead()
Definition: Socket.h:113
void CloseSocket()
Definition: Socket.h:152
virtual ~Socket()
Definition: Socket.h:78
virtual void Start()=0
Stream & underlying_stream()
Definition: Socket.h:217
uint16 GetRemotePort() const
Definition: Socket.h:108
void ReadHandlerInternal(boost::system::error_code const &error, size_t transferredBytes)
Definition: Socket.h:223
std::queue< MessageBuffer > _writeQueue
Definition: Socket.h:312
Socket(Socket const &other)=delete
virtual void ReadHandler()=0
uint16 _remotePort
Definition: Socket.h:309
virtual void OnClose()
Definition: Socket.h:179
std::atomic< bool > _closed
Definition: Socket.h:314
STL namespace.