TrinityCore
Socket.h
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef __SOCKET_H__
19#define __SOCKET_H__
20
21#include "Log.h"
22#include "MessageBuffer.h"
23#include <boost/asio/ip/tcp.hpp>
24#include <atomic>
25#include <memory>
26#include <queue>
27#include <type_traits>
28
29#define READ_BLOCK_SIZE 4096
30#ifdef BOOST_ASIO_HAS_IOCP
31#define TC_SOCKET_USE_IOCP
32#endif
33
61template<class T, class Stream = boost::asio::ip::tcp::socket>
62class Socket : public std::enable_shared_from_this<T>
63{
64public:
65 template<typename... Args>
66 explicit Socket(boost::asio::ip::tcp::socket&& socket, Args&&... args) : _socket(std::move(socket), std::forward<Args>(args)...),
67 _remoteAddress(_socket.remote_endpoint().address()), _remotePort(_socket.remote_endpoint().port()),
68 _closed(false), _closing(false), _isWritingAsync(false)
69 {
71 }
72
73 Socket(Socket const& other) = delete;
74 Socket(Socket&& other) = delete;
75 Socket& operator=(Socket const& other) = delete;
76 Socket& operator=(Socket&& other) = delete;
77
78 virtual ~Socket()
79 {
80 _closed = true;
81 boost::system::error_code error;
82 _socket.close(error);
83 }
84
85 virtual void Start() = 0;
86
87 virtual bool Update()
88 {
89 if (_closed)
90 return false;
91
92#ifndef TC_SOCKET_USE_IOCP
93 if (_isWritingAsync || (_writeQueue.empty() && !_closing))
94 return true;
95
96 for (; HandleQueue();)
97 ;
98#endif
99
100 return true;
101 }
102
103 boost::asio::ip::address GetRemoteIpAddress() const
104 {
105 return _remoteAddress;
106 }
107
109 {
110 return _remotePort;
111 }
112
114 {
115 if (!IsOpen())
116 return;
117
120 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
121 [self = this->shared_from_this()](boost::system::error_code const& error, size_t transferredBytes)
122 {
123 self->ReadHandlerInternal(error, transferredBytes);
124 });
125 }
126
127 void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code const&, std::size_t))
128 {
129 if (!IsOpen())
130 return;
131
134 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
135 [self = this->shared_from_this(), callback](boost::system::error_code const& error, size_t transferredBytes)
136 {
137 (self.get()->*callback)(error, transferredBytes);
138 });
139 }
140
142 {
143 _writeQueue.push(std::move(buffer));
144
145#ifdef TC_SOCKET_USE_IOCP
147#endif
148 }
149
150 bool IsOpen() const { return !_closed && !_closing; }
151
153 {
154 if (_closed.exchange(true))
155 return;
156
157 boost::system::error_code shutdownError;
158 _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
159 if (shutdownError)
160 TC_LOG_DEBUG("network", "Socket::CloseSocket: {} errored when shutting down socket: {} ({})", GetRemoteIpAddress().to_string(),
161 shutdownError.value(), shutdownError.message());
162
163 OnClose();
164 }
165
167 void DelayedCloseSocket() { _closing = true; }
168
170
171protected:
172 virtual void OnClose() { }
173
174 virtual void ReadHandler() = 0;
175
177 {
178 if (_isWritingAsync)
179 return false;
180
181 _isWritingAsync = true;
182
183#ifdef TC_SOCKET_USE_IOCP
184 MessageBuffer& buffer = _writeQueue.front();
185 _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()),
186 [self = this->shared_from_this()](boost::system::error_code const& error, std::size_t transferedBytes)
187 {
188 self->WriteHandler(error, transferedBytes);
189 });
190#else
191 _socket.async_write_some(boost::asio::null_buffers(),
192 [self = this->shared_from_this()](boost::system::error_code const& error, std::size_t transferedBytes)
193 {
194 self->WriteHandlerWrapper(error, transferedBytes);
195 });
196#endif
197
198 return false;
199 }
200
201 void SetNoDelay(bool enable)
202 {
203 boost::system::error_code err;
204 _socket.set_option(boost::asio::ip::tcp::no_delay(enable), err);
205 if (err)
206 TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for {} - {} ({})",
207 GetRemoteIpAddress().to_string(), err.value(), err.message());
208 }
209
211 {
212 return _socket;
213 }
214
215private:
216 void ReadHandlerInternal(boost::system::error_code const& error, size_t transferredBytes)
217 {
218 if (error)
219 {
220 CloseSocket();
221 return;
222 }
223
224 _readBuffer.WriteCompleted(transferredBytes);
225 ReadHandler();
226 }
227
228#ifdef TC_SOCKET_USE_IOCP
229
230 void WriteHandler(boost::system::error_code const& error, std::size_t transferedBytes)
231 {
232 if (!error)
233 {
234 _isWritingAsync = false;
235 _writeQueue.front().ReadCompleted(transferedBytes);
236 if (!_writeQueue.front().GetActiveSize())
237 _writeQueue.pop();
238
239 if (!_writeQueue.empty())
241 else if (_closing)
242 CloseSocket();
243 }
244 else
245 CloseSocket();
246 }
247
248#else
249
250 void WriteHandlerWrapper(boost::system::error_code const& /*error*/, std::size_t /*transferedBytes*/)
251 {
252 _isWritingAsync = false;
253 HandleQueue();
254 }
255
257 {
258 if (_writeQueue.empty())
259 return false;
260
261 MessageBuffer& queuedMessage = _writeQueue.front();
262
263 std::size_t bytesToSend = queuedMessage.GetActiveSize();
264
265 boost::system::error_code error;
266 std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
267
268 if (error)
269 {
270 if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
271 return AsyncProcessQueue();
272
273 _writeQueue.pop();
274 if (_closing && _writeQueue.empty())
275 CloseSocket();
276 return false;
277 }
278 else if (bytesSent == 0)
279 {
280 _writeQueue.pop();
281 if (_closing && _writeQueue.empty())
282 CloseSocket();
283 return false;
284 }
285 else if (bytesSent < bytesToSend) // now n > 0
286 {
287 queuedMessage.ReadCompleted(bytesSent);
288 return AsyncProcessQueue();
289 }
290
291 _writeQueue.pop();
292 if (_closing && _writeQueue.empty())
293 CloseSocket();
294 return !_writeQueue.empty();
295 }
296
297#endif
298
299 Stream _socket;
300
301 boost::asio::ip::address _remoteAddress;
303
305 std::queue<MessageBuffer> _writeQueue;
306
307 std::atomic<bool> _closed;
308 std::atomic<bool> _closing;
309
311};
312
313#endif // __SOCKET_H__
uint16_t uint16
Definition: Define.h:143
#define TC_LOG_DEBUG(filterType__,...)
Definition: Log.h:156
#define READ_BLOCK_SIZE
Definition: Socket.h:29
void Resize(size_type bytes)
Definition: MessageBuffer.h:52
size_type GetRemainingSpace() const
Definition: MessageBuffer.h:69
void ReadCompleted(size_type bytes)
Definition: MessageBuffer.h:63
void WriteCompleted(size_type bytes)
Definition: MessageBuffer.h:65
uint8 * GetReadPointer()
Definition: MessageBuffer.h:59
size_type GetActiveSize() const
Definition: MessageBuffer.h:67
uint8 * GetWritePointer()
Definition: MessageBuffer.h:61
void EnsureFreeSpace()
Definition: MessageBuffer.h:86
void Normalize()
Definition: MessageBuffer.h:74
Definition: Socket.h:63
MessageBuffer & GetReadBuffer()
Definition: Socket.h:169
void WriteHandlerWrapper(boost::system::error_code const &, std::size_t)
Definition: Socket.h:250
Socket & operator=(Socket &&other)=delete
bool HandleQueue()
Definition: Socket.h:256
std::atomic< bool > _closing
Definition: Socket.h:308
Stream _socket
Definition: Socket.h:299
MessageBuffer _readBuffer
Definition: Socket.h:304
Socket & operator=(Socket const &other)=delete
virtual bool Update()
Definition: Socket.h:87
bool _isWritingAsync
Definition: Socket.h:310
Socket(boost::asio::ip::tcp::socket &&socket, Args &&... args)
Definition: Socket.h:66
void AsyncReadWithCallback(void(T::*callback)(boost::system::error_code const &, std::size_t))
Definition: Socket.h:127
boost::asio::ip::address _remoteAddress
Definition: Socket.h:301
void DelayedCloseSocket()
Marks the socket for closing after write buffer becomes empty.
Definition: Socket.h:167
Socket(Socket &&other)=delete
boost::asio::ip::address GetRemoteIpAddress() const
Definition: Socket.h:103
void QueuePacket(MessageBuffer &&buffer)
Definition: Socket.h:141
bool IsOpen() const
Definition: Socket.h:150
bool AsyncProcessQueue()
Definition: Socket.h:176
void SetNoDelay(bool enable)
Definition: Socket.h:201
void AsyncRead()
Definition: Socket.h:113
void CloseSocket()
Definition: Socket.h:152
virtual ~Socket()
Definition: Socket.h:78
virtual void Start()=0
Stream & underlying_stream()
Definition: Socket.h:210
uint16 GetRemotePort() const
Definition: Socket.h:108
void ReadHandlerInternal(boost::system::error_code const &error, size_t transferredBytes)
Definition: Socket.h:216
std::queue< MessageBuffer > _writeQueue
Definition: Socket.h:305
Socket(Socket const &other)=delete
virtual void ReadHandler()=0
uint16 _remotePort
Definition: Socket.h:302
virtual void OnClose()
Definition: Socket.h:172
std::atomic< bool > _closed
Definition: Socket.h:307
STL namespace.