TrinityCore
Loading...
Searching...
No Matches
Socket.h
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef TRINITYCORE_SOCKET_H
19#define TRINITYCORE_SOCKET_H
20
21#include "Concepts.h"
22#include "IpAddress.h"
23#include "Log.h"
24#include "MessageBuffer.h"
26#include <boost/asio/compose.hpp>
27#include <boost/asio/io_context.hpp>
28#include <boost/asio/ip/tcp.hpp>
29#include <atomic>
30#include <memory>
31#include <queue>
32#include <type_traits>
33
34#ifdef BOOST_ASIO_HAS_IOCP
35#define TC_SOCKET_USE_IOCP
36#endif
37
38namespace Trinity::Net
39{
40using IoContextTcpSocket = boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::executor_type>;
41
42namespace Impl::Operations
43{
44template <typename Socket>
45struct Connect;
46}
47
49{
51 Stop
52};
53
54inline boost::asio::mutable_buffer PrepareReadBuffer(MessageBuffer& readBuffer)
55{
56 readBuffer.Normalize();
57 readBuffer.EnsureFreeSpace();
58 return boost::asio::buffer(readBuffer.GetWritePointer(), readBuffer.GetRemainingSpace());
59}
60
61template <typename SocketType>
63{
65 {
66 return this->Socket->ReadHandler();
67 }
68
69 SocketType* Socket;
70};
71
72template <typename AsyncReadObjectType, typename ReadHandlerObjectType = AsyncReadObjectType>
74{
75 explicit ReadConnectionInitializer(AsyncReadObjectType* socket) : Socket(socket), ReadCallback({ .Socket = socket }) { }
76 explicit ReadConnectionInitializer(AsyncReadObjectType* socket, ReadHandlerObjectType* callbackSocket) : Socket(socket), ReadCallback({ .Socket = callbackSocket }) { }
77
78 void Start() override
79 {
80 Socket->AsyncRead(std::move(ReadCallback));
81
82 this->InvokeNext();
83 }
84
85 AsyncReadObjectType* Socket;
87};
88
125template<class Stream = IoContextTcpSocket>
126class Socket : public std::enable_shared_from_this<Socket<Stream>>
127{
128public:
129 template<typename... Args>
130 explicit Socket(IoContextTcpSocket&& socket, Args&&... args) : _socket(std::move(socket), std::forward<Args>(args)...),
132 {
133 }
134
135 template<typename... Args>
136 explicit Socket(boost::asio::io_context& context, Args&&... args) : _socket(context, std::forward<Args>(args)...), _openState(OpenState_Closed)
137 {
138 }
139
140 Socket(Socket const& other) = delete;
141 Socket(Socket&& other) = delete;
142 Socket& operator=(Socket const& other) = delete;
143 Socket& operator=(Socket&& other) = delete;
144
145 virtual ~Socket()
146 {
148 boost::system::error_code error;
149 _socket.close(error);
150 }
151
152 virtual void Start() { }
153
154 template <BOOST_ASIO_COMPLETION_TOKEN_FOR(void(boost::system::error_code, boost::asio::ip::tcp::endpoint)) Callback>
155 decltype(auto) Connect(boost::asio::ip::tcp::endpoint const& endpoint, Callback&& callback)
156 {
158 return boost::asio::async_compose<Callback, void(boost::system::error_code, boost::asio::ip::tcp::endpoint), Impl::Operations::Connect<Socket>>(
159 Impl::Operations::Connect<Socket>(this->shared_from_this(), endpoint), callback, this->underlying_stream());
160 }
161
162 template <BOOST_ASIO_COMPLETION_TOKEN_FOR(void(boost::system::error_code, boost::asio::ip::tcp::endpoint)) Callback>
163 decltype(auto) Connect(std::vector<boost::asio::ip::tcp::endpoint> const& endpoints, Callback&& callback)
164 {
166 return boost::asio::async_compose<Callback, void(boost::system::error_code, boost::asio::ip::tcp::endpoint), Impl::Operations::Connect<Socket>>(
167 Impl::Operations::Connect<Socket>(this->shared_from_this(), endpoints), callback, this->underlying_stream());
168 }
169
170 virtual bool Update()
171 {
173 return false;
174
175#ifndef TC_SOCKET_USE_IOCP
177 return true;
178
179 for (; HandleQueue();)
180 ;
181#endif
182
183 return true;
184 }
185
186 boost::asio::ip::address const& GetRemoteIpAddress() const
187 {
189 }
190
192 {
193 return _remoteEndpoint.Port;
194 }
195
196 void SetRemoteEndpoint(boost::asio::ip::tcp::endpoint const& endpoint)
197 {
198 _remoteEndpoint = endpoint;
199 }
200
201 template <invocable_r<SocketReadCallbackResult> Callback>
202 void AsyncRead(Callback&& callback)
203 {
204 if (!IsOpen())
205 return;
206
207 _socket.async_read_some(PrepareReadBuffer(_readBuffer),
208 [self = this->shared_from_this(), callback = std::forward<Callback>(callback)](boost::system::error_code const& error, size_t transferredBytes) mutable
209 {
210 if (self->ReadHandlerInternal(error, transferredBytes))
212 self->AsyncRead(std::forward<Callback>(callback));
213 });
214 }
215
217 {
218 _writeQueue.push(std::move(buffer));
219
220#ifdef TC_SOCKET_USE_IOCP
222#endif
223 }
224
225 bool IsOpen() const { return _openState == OpenState_Open; }
226
228 {
230 return;
231
232 boost::system::error_code shutdownError;
233 _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
234 if (shutdownError)
235 TC_LOG_DEBUG("network", "Socket::CloseSocket: {} errored when shutting down socket: {} ({})", GetRemoteIpAddress(),
236 shutdownError.value(), shutdownError.message());
237
238 this->OnClose();
239 }
240
243 {
244 uint8 oldState = OpenState_Open;
245 if (!_openState.compare_exchange_strong(oldState, OpenState_Closing))
246 return;
247
248 if (_writeQueue.empty())
249 CloseSocket();
250 }
251
253
255 {
256 return _socket;
257 }
258
259protected:
260 virtual void OnClose() { }
261
263
265 {
266 if (_isWritingAsync)
267 return false;
268
269 _isWritingAsync = true;
270
271#ifdef TC_SOCKET_USE_IOCP
272 MessageBuffer& buffer = _writeQueue.front();
273 _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()),
274 [self = this->shared_from_this()](boost::system::error_code const& error, std::size_t transferedBytes)
275 {
276 self->WriteHandler(error, transferedBytes);
277 });
278#else
279 _socket.async_wait(boost::asio::socket_base::wait_type::wait_write,
280 [self = this->shared_from_this()](boost::system::error_code const& error)
281 {
282 self->WriteHandlerWrapper(error);
283 });
284#endif
285
286 return false;
287 }
288
289 void SetNoDelay(bool enable)
290 {
291 boost::system::error_code err;
292 _socket.set_option(boost::asio::ip::tcp::no_delay(enable), err);
293 if (err)
294 TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for {} - {} ({})",
295 GetRemoteIpAddress(), err.value(), err.message());
296 }
297
298private:
299 bool ReadHandlerInternal(boost::system::error_code const& error, size_t transferredBytes)
300 {
301 if (error)
302 {
303 CloseSocket();
304 return false;
305 }
306
307 _readBuffer.WriteCompleted(transferredBytes);
308 return IsOpen();
309 }
310
312 {
313 _writeQueue.pop();
314 if (_openState == OpenState_Closing && _writeQueue.empty())
315 CloseSocket();
316 }
317
318#ifdef TC_SOCKET_USE_IOCP
319
320 void WriteHandler(boost::system::error_code const& error, std::size_t transferedBytes)
321 {
322 if (!error)
323 {
324 _isWritingAsync = false;
325 _writeQueue.front().ReadCompleted(transferedBytes);
326 if (!_writeQueue.front().GetActiveSize())
328
329 if (!_writeQueue.empty())
331 }
332 else
333 CloseSocket();
334 }
335
336#else
337
338 void WriteHandlerWrapper(boost::system::error_code const& /*error*/)
339 {
340 _isWritingAsync = false;
341 HandleQueue();
342 }
343
345 {
346 if (_writeQueue.empty())
347 return false;
348
349 MessageBuffer& queuedMessage = _writeQueue.front();
350
351 std::size_t bytesToSend = queuedMessage.GetActiveSize();
352
353 boost::system::error_code error;
354 std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
355
356 if (error)
357 {
358 if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
359 return AsyncProcessQueue();
360
362 return false;
363 }
364 else if (bytesSent == 0)
365 {
367 return false;
368 }
369 else if (bytesSent < bytesToSend) // now n > 0
370 {
371 queuedMessage.ReadCompleted(bytesSent);
372 return AsyncProcessQueue();
373 }
374
376 return !_writeQueue.empty();
377 }
378
379#endif
380
381 Stream _socket;
382
383 struct Endpoint
384 {
385 Endpoint() : Address(), Port(0) { }
386 explicit(false) Endpoint(boost::asio::ip::tcp_endpoint const& endpoint) : Address(endpoint.address()), Port(endpoint.port()) { }
387
388 boost::asio::ip::address Address;
391
393 std::queue<MessageBuffer> _writeQueue;
394
395 // Socket open state "enum" (not enum to enable integral std::atomic api)
396 static constexpr uint8 OpenState_Open = 0x0;
397 static constexpr uint8 OpenState_Closing = 0x1;
398 static constexpr uint8 OpenState_Closed = 0x2;
399
400 std::atomic<uint8> _openState;
401
402 bool _isWritingAsync = false;
403};
404
405namespace Impl::Operations
406{
408{
409 explicit ConnectState(std::shared_ptr<void> const& socketRef, boost::asio::ip::tcp::endpoint const& endpoint)
410 : SocketRef(socketRef), Endpoints(1, endpoint), Index(-1) { }
411
412 explicit ConnectState(std::shared_ptr<void> const& socketRef, std::vector<boost::asio::ip::tcp::endpoint> const& endpoints)
413 : SocketRef(socketRef), Endpoints(endpoints), Index(-1) { }
414
415 std::weak_ptr<void> SocketRef;
416 std::vector<boost::asio::ip::tcp::endpoint> Endpoints;
417 std::ptrdiff_t Index;
418};
419
420template <typename Socket>
422{
423 explicit Connect(std::shared_ptr<Socket> const& socketRef, boost::asio::ip::tcp::endpoint const& endpoint)
424 : State(std::make_shared<ConnectState>(std::move(socketRef), endpoint)) { }
425
426 explicit Connect(std::shared_ptr<Socket> const& socketRef, std::vector<boost::asio::ip::tcp::endpoint> const& endpoints)
427 : State(std::make_shared<ConnectState>(std::move(socketRef), endpoints)) { }
428
429 std::shared_ptr<ConnectState> State;
430
431 template <typename Handler>
432 void operator()(Handler& handler, boost::system::error_code error = {})
433 {
434 std::shared_ptr<Socket> socket = static_pointer_cast<Socket>(State->SocketRef.lock());
435 if (!socket)
436 {
437 error = boost::asio::error::operation_aborted;
438 handler.complete(error, boost::asio::ip::tcp::endpoint());
439 return;
440 }
441
442 bool isFirst = State->Index < 0;
443
444 if (std::max(State->Index, std::ptrdiff_t(0)) >= std::ssize(State->Endpoints))
445 {
446 Connect::HandleError(socket.get(), "failed to connect to any of specified endpoints");
447 error = boost::asio::error::not_found;
448 handler.complete(error, boost::asio::ip::tcp::endpoint());
449 return;
450 }
451
452 if (!isFirst && !socket->underlying_stream().is_open())
453 {
454 Connect::HandleError(socket.get(), "socket closed");
455 error = boost::asio::error::operation_aborted;
456 handler.complete(error, boost::asio::ip::tcp::endpoint());
457 return;
458 }
459
460 if (!error && !isFirst)
461 {
462 socket->SetRemoteEndpoint(State->Endpoints[State->Index]);
463 handler.complete(error, State->Endpoints[State->Index]);
464 }
465 else
466 {
467#if BOOST_VERSION >= 107700
468 if (handler.get_cancellation_state().cancelled() != boost::asio::cancellation_type::none)
469 {
470 Connect::HandleError(socket.get(), "connect cancelled");
471 error = boost::asio::error::operation_aborted;
472 handler.complete(error, boost::asio::ip::tcp::endpoint());
473 return;
474 }
475#endif
476
477 socket->underlying_stream().close(error);
478 socket->underlying_stream().async_connect(State->Endpoints[++State->Index], std::move(handler));
479 }
480 }
481
482 static void HandleError(Socket* self, std::string_view message)
483 {
484 TC_LOG_DEBUG("network", "Socket::Connect: {}", message);
485 self->CloseSocket();
486 }
487};
488}
489}
490
491#endif // TRINITYCORE_SOCKET_H
uint8_t uint8
Definition Define.h:156
uint16_t uint16
Definition Define.h:155
#define TC_LOG_DEBUG(filterType__, message__,...)
Definition Log.h:181
size_type GetRemainingSpace() const
void ReadCompleted(size_type bytes)
void WriteCompleted(size_type bytes)
uint8 * GetReadPointer()
size_type GetActiveSize() const
uint8 * GetWritePointer()
void EnsureFreeSpace()
uint16 GetRemotePort() const
Definition Socket.h:191
decltype(auto) Connect(std::vector< boost::asio::ip::tcp::endpoint > const &endpoints, Callback &&callback)
Definition Socket.h:163
static constexpr uint8 OpenState_Closed
Definition Socket.h:398
std::atomic< uint8 > _openState
Definition Socket.h:400
void QueuedBufferWriteDone()
Definition Socket.h:311
Socket(Socket const &other)=delete
bool ReadHandlerInternal(boost::system::error_code const &error, size_t transferredBytes)
Definition Socket.h:299
void SetNoDelay(bool enable)
Definition Socket.h:289
Socket(IoContextTcpSocket &&socket, Args &&... args)
Definition Socket.h:130
std::queue< MessageBuffer > _writeQueue
Definition Socket.h:393
virtual SocketReadCallbackResult ReadHandler()
Definition Socket.h:262
Socket & operator=(Socket const &other)=delete
bool AsyncProcessQueue()
Definition Socket.h:264
bool IsOpen() const
Definition Socket.h:225
Stream & underlying_stream()
Definition Socket.h:254
boost::asio::ip::address const & GetRemoteIpAddress() const
Definition Socket.h:186
static constexpr uint8 OpenState_Closing
Transition to Closed state after sending all queued data.
Definition Socket.h:397
static constexpr uint8 OpenState_Open
Definition Socket.h:396
virtual void OnClose()
Definition Socket.h:260
virtual bool Update()
Definition Socket.h:170
Socket(boost::asio::io_context &context, Args &&... args)
Definition Socket.h:136
void AsyncRead(Callback &&callback)
Definition Socket.h:202
void QueuePacket(MessageBuffer &&buffer)
Definition Socket.h:216
void DelayedCloseSocket()
Marks the socket for closing after write buffer becomes empty.
Definition Socket.h:242
MessageBuffer _readBuffer
Definition Socket.h:392
void WriteHandlerWrapper(boost::system::error_code const &)
Definition Socket.h:338
virtual void Start()
Definition Socket.h:152
decltype(auto) Connect(boost::asio::ip::tcp::endpoint const &endpoint, Callback &&callback)
Definition Socket.h:155
void SetRemoteEndpoint(boost::asio::ip::tcp::endpoint const &endpoint)
Definition Socket.h:196
Socket(Socket &&other)=delete
Socket & operator=(Socket &&other)=delete
struct Trinity::Net::Socket::Endpoint _remoteEndpoint
virtual ~Socket()
Definition Socket.h:145
MessageBuffer & GetReadBuffer()
Definition Socket.h:252
SocketReadCallbackResult
Definition Socket.h:49
boost::asio::basic_stream_socket< boost::asio::ip::tcp, boost::asio::io_context::executor_type > IoContextTcpSocket
Definition Socket.h:40
boost::asio::mutable_buffer PrepareReadBuffer(MessageBuffer &readBuffer)
Definition Socket.h:54
STL namespace.
ConnectState(std::shared_ptr< void > const &socketRef, boost::asio::ip::tcp::endpoint const &endpoint)
Definition Socket.h:409
ConnectState(std::shared_ptr< void > const &socketRef, std::vector< boost::asio::ip::tcp::endpoint > const &endpoints)
Definition Socket.h:412
std::vector< boost::asio::ip::tcp::endpoint > Endpoints
Definition Socket.h:416
void operator()(Handler &handler, boost::system::error_code error={})
Definition Socket.h:432
std::shared_ptr< ConnectState > State
Definition Socket.h:429
Connect(std::shared_ptr< Socket > const &socketRef, boost::asio::ip::tcp::endpoint const &endpoint)
Definition Socket.h:423
static void HandleError(Socket *self, std::string_view message)
Definition Socket.h:482
Connect(std::shared_ptr< Socket > const &socketRef, std::vector< boost::asio::ip::tcp::endpoint > const &endpoints)
Definition Socket.h:426
SocketReadCallbackResult operator()() const
Definition Socket.h:64
AsyncReadObjectType * Socket
Definition Socket.h:85
InvokeReadHandlerCallback< ReadHandlerObjectType > ReadCallback
Definition Socket.h:86
ReadConnectionInitializer(AsyncReadObjectType *socket, ReadHandlerObjectType *callbackSocket)
Definition Socket.h:76
ReadConnectionInitializer(AsyncReadObjectType *socket)
Definition Socket.h:75
boost::asio::ip::address Address
Definition Socket.h:388