TrinityCore
Loading...
Searching...
No Matches
DatabaseWorkerPool.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#include "DatabaseWorkerPool.h"
19#include "AdhocStatement.h"
20#include "Common.h"
21#include "Errors.h"
22#include "IoContext.h"
27#include "Log.h"
29#include "PreparedStatement.h"
31#include "QueryCallback.h"
32#include "QueryHolder.h"
33#include "QueryResult.h"
34#include "Transaction.h"
35#include "MySQLWorkaround.h"
36#include <boost/asio/use_future.hpp>
37#include <mysqld_error.h>
38#include <utility>
39#ifdef TRINITY_DEBUG
40#include <boost/stacktrace.hpp>
41#endif
42
43static consteval uint32 ParseVersionString(std::string_view chars)
44{
45 uint32 result = 0;
46 uint32 partialResult = 0;
47 uint32 multiplier = 10000;
48 for (std::size_t i = 0; i < chars.length(); ++i)
49 {
50 char c = chars[i];
51 if (c == '.')
52 {
53 if (multiplier < 100)
54 throw "Too many . characters in version string";
55
56 result += partialResult * multiplier;
57 multiplier /= 100;
58 partialResult = 0;
59 }
60 else if (c >= '0' && c <= '9')
61 {
62 partialResult *= 10;
63 partialResult += c - '0';
64 }
65 else
66 throw "Invalid input character";
67 }
68
69 result += partialResult * multiplier;
70
71 return result;
72}
73
74namespace
75{
76#ifdef TRINITY_DEBUG
77template<typename Database>
78thread_local bool WarnSyncQueries = false;
79#endif
80}
81
82template<typename T>
84{
85 explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool)
86 {
87 ++_pool->_queueSize;
88 }
89
90 QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; }
91 QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { }
92
94 {
95 if (this != &other)
96 {
97 if (_pool != other._pool)
98 {
99 if (_pool)
100 --_pool->_queueSize;
101 if (other._pool)
102 ++other._pool->_queueSize;
103 }
104 _pool = other._pool;
105 }
106 return *this;
107 }
109 {
110 if (this != &other)
111 {
112 if (_pool != other._pool)
113 {
114 if (_pool)
115 --_pool->_queueSize;
116 }
117 _pool = std::exchange(other._pool, nullptr);
118 }
119 return *this;
120 }
121
123 {
124 if (_pool)
125 --_pool->_queueSize;
126 }
127
128private:
130};
131
132template <class T>
134 : _async_threads(0), _synch_threads(0)
135{
136 // We only need check compiled version match on Windows
137 // because on other platforms ABI compatibility is ensured by SOVERSION
138 // and Windows MySQL releases don't even have abi-version-like component in their dll file name
139#if TRINITY_PLATFORM == TRINITY_PLATFORM_WINDOWS
140#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
141#define TRINITY_COMPILED_CLIENT_VERSION MARIADB_PACKAGE_VERSION_ID
142#else
143#define TRINITY_COMPILED_CLIENT_VERSION MYSQL_VERSION_ID
144#endif
145 WPFatal(mysql_get_client_version() == TRINITY_COMPILED_CLIENT_VERSION, "Used " TRINITY_MYSQL_FLAVOR " library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), TRINITY_COMPILED_CLIENT_VERSION);
146#undef TRINITY_COMPILED_CLIENT_VERSION
147#endif
148}
149
150template <class T>
154
155template <class T>
156void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
157 uint8 const asyncThreads, uint8 const synchThreads)
158{
159 _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
160
161 _async_threads = asyncThreads;
162 _synch_threads = synchThreads;
163}
164
165template <class T>
167{
168 WPFatal(_connectionInfo.get(), "Connection info was not set!");
169
170 TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
171 "Asynchronous connections: {}, synchronous connections: {}.",
172 GetDatabaseName(), _async_threads, _synch_threads);
173
174 _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
175
176 uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
177
178 if (error)
179 return error;
180
181 error = OpenConnections(IDX_SYNCH, _synch_threads);
182
183 if (error)
184 return error;
185
186 for (std::unique_ptr<T> const& connection : _connections[IDX_ASYNC])
187 connection->StartWorkerThread(_ioContext.get());
188
189 TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
190 "{} total connections running.", GetDatabaseName(),
191 (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
192
193 return 0;
194}
195
196template <class T>
198{
199 TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
200
201 if (_ioContext)
202 _ioContext->stop();
203
205 _connections[IDX_ASYNC].clear();
206
207 _ioContext.reset();
208
209 TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
210 "Proceeding with synchronous connections.",
211 GetDatabaseName());
212
217 _connections[IDX_SYNCH].clear();
218
219 TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
220}
221
222template <class T>
224{
225 for (auto& connections : _connections)
226 {
227 for (auto& connection : connections)
228 {
229 connection->LockIfReady();
230 if (!connection->PrepareStatements())
231 {
232 connection->Unlock();
233 Close();
234 return false;
235 }
236 else
237 connection->Unlock();
238
239 size_t const preparedSize = connection->m_stmts.size();
240 if (_preparedStatementSize.size() < preparedSize)
241 _preparedStatementSize.resize(preparedSize);
242
243 for (size_t i = 0; i < preparedSize; ++i)
244 {
245 // already set by another connection
246 // (each connection only has prepared statements of it's own type sync/async)
247 if (_preparedStatementSize[i] > 0)
248 continue;
249
250 if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get())
251 {
252 uint32 const paramCount = stmt->GetParameterCount();
253
254 // TC only supports uint8 indices.
255 ASSERT(paramCount < std::numeric_limits<uint8>::max());
256
257 _preparedStatementSize[i] = static_cast<uint8>(paramCount);
258 }
259 }
260 }
261 }
262
263 return true;
264}
265
266template <class T>
267QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
268{
269 if (!connection)
270 connection = GetFreeConnection();
271
272 QueryResult result = BasicStatementTask::Query(connection, sql);
273 connection->Unlock();
274
275 return result;
276}
277
278template <class T>
280{
281 T* connection = GetFreeConnection();
283 connection->Unlock();
284
286 delete stmt;
287
288 return ret;
289}
290
291template <class T>
293{
294 std::future<QueryResult> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
295 {
296 T* conn = GetAsyncConnectionForCurrentThread();
297 return BasicStatementTask::Query(conn, sql.c_str());
298 }));
299 return QueryCallback(std::move(result));
300}
301
302template <class T>
304{
305 std::future<PreparedQueryResult> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
306 {
307 T* conn = GetAsyncConnectionForCurrentThread();
308 return PreparedStatementTask::Query(conn, stmt.get());
309 }));
310 return QueryCallback(std::move(result));
311}
312
313template <class T>
315{
316 std::future<void> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)]
317 {
318 T* conn = GetAsyncConnectionForCurrentThread();
319 SQLQueryHolderTask::Execute(conn, holder.get());
320 }));
321 return { std::move(holder), std::move(result) };
322}
323
324template <class T>
326{
327 return std::make_shared<Transaction<T>>();
328}
329
330template <class T>
332{
333#ifdef TRINITY_DEBUG
337 switch (transaction->GetSize())
338 {
339 case 0:
340 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
341 return;
342 case 1:
343 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
344 break;
345 default:
346 break;
347 }
348#endif // TRINITY_DEBUG
349
350 boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)]
351 {
352 T* conn = GetAsyncConnectionForCurrentThread();
353 TransactionTask::Execute(conn, transaction);
354 });
355}
356
357template <class T>
359{
360#ifdef TRINITY_DEBUG
364 switch (transaction->GetSize())
365 {
366 case 0:
367 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
368 break;
369 case 1:
370 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
371 break;
372 default:
373 break;
374 }
375#endif // TRINITY_DEBUG
376
377 std::future<bool> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)]
378 {
379 T* conn = GetAsyncConnectionForCurrentThread();
380 return TransactionTask::Execute(conn, transaction);
381 }));
382 return TransactionCallback(std::move(result));
383}
384
385template <class T>
387{
388 T* connection = GetFreeConnection();
389 int errorCode = connection->ExecuteTransaction(transaction);
390 if (!errorCode)
391 {
392 connection->Unlock(); // OK, operation succesful
393 return;
394 }
395
398 if (errorCode == ER_LOCK_DEADLOCK)
399 {
400 //todo: handle multiple sync threads deadlocking in a similar way as async threads
401 uint8 loopBreaker = 5;
402 for (uint8 i = 0; i < loopBreaker; ++i)
403 {
404 if (!connection->ExecuteTransaction(transaction))
405 break;
406 }
407 }
408
410 transaction->Cleanup();
411
412 connection->Unlock();
413}
414
415template <class T>
417{
418 return new PreparedStatement<T>(index, _preparedStatementSize[index]);
419}
420
421template <class T>
423{
424 if (str.empty())
425 return;
426
427 char* buf = new char[str.size() * 2 + 1];
428 EscapeString(buf, str.c_str(), uint32(str.size()));
429 str = buf;
430 delete[] buf;
431}
432
433template <class T>
435{
437 for (auto& connection : _connections[IDX_SYNCH])
438 {
439 if (connection->LockIfReady())
440 {
441 connection->Ping();
442 connection->Unlock();
443 }
444 }
445
449 auto const count = _connections[IDX_ASYNC].size();
450 for (uint8 i = 0; i < count; ++i)
451 {
452 boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)]
453 {
454 T* conn = GetAsyncConnectionForCurrentThread();
455 conn->Ping();
456 });
457 }
458}
459
460#ifdef TRINITY_DEBUG
461template <class T>
462void DatabaseWorkerPool<T>::WarnAboutSyncQueries([[maybe_unused]] bool warn)
463{
464 WarnSyncQueries<T> = warn;
465}
466#endif
467
468template <class T>
470{
471 for (uint8 i = 0; i < numConnections; ++i)
472 {
473 // Create the connection
474 constexpr std::array<ConnectionFlags, IDX_SIZE> flags = { { CONNECTION_ASYNC, CONNECTION_SYNCH } };
475
476 std::unique_ptr<T> connection = std::make_unique<T>(*_connectionInfo, flags[type]);
477
478 if (uint32 error = connection->Open())
479 {
480 // Failed to open a connection or invalid version, abort and cleanup
481 _connections[type].clear();
482 return error;
483 }
484 else if (uint32 serverVersion = connection->GetServerVersion(); serverVersion < ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION))
485 {
486 TC_LOG_ERROR("sql.driver", "TrinityCore does not support " TRINITY_MYSQL_FLAVOR " versions below " TRINITY_REQUIRED_MYSQL_VERSION " (found id {}, need id >= {}), please update your " TRINITY_MYSQL_FLAVOR " server", serverVersion, ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION));
487 return 1;
488 }
489 else
490 {
491 _connections[type].push_back(std::move(connection));
492 }
493 }
494
495 // Everything is fine
496 return 0;
497}
498
499template <class T>
500unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
501{
502 if (!to || !from || !length)
503 return 0;
504
505 return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
506}
507
508template <class T>
510{
511 return _queueSize;
512}
513
514template <class T>
516{
517#ifdef TRINITY_DEBUG
518 if (WarnSyncQueries<T>)
519 {
520 TC_LOG_WARN("sql.performances", "Sync query at:\n{}", boost::stacktrace::to_string(boost::stacktrace::stacktrace()));
521 }
522#endif
523
524 uint8 i = 0;
525 auto const num_cons = _connections[IDX_SYNCH].size();
526 T* connection = nullptr;
528 for (;;)
529 {
530 connection = _connections[IDX_SYNCH][i++ % num_cons].get();
532 if (connection->LockIfReady())
533 break;
534 }
535
536 return connection;
537}
538
539template <class T>
541{
542 std::thread::id id = std::this_thread::get_id();
543 for (auto&& connection : _connections[IDX_ASYNC])
544 if (connection->GetWorkerThreadId() == id)
545 return connection.get();
546
547 return nullptr;
548}
549
550template <class T>
552{
553 return _connectionInfo->database.c_str();
554}
555
556template <class T>
558{
559 if (!sql)
560 return;
561
562 boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
563 {
564 T* conn = GetAsyncConnectionForCurrentThread();
565 BasicStatementTask::Execute(conn, sql.c_str());
566 });
567}
568
569template <class T>
571{
572 boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
573 {
574 T* conn = GetAsyncConnectionForCurrentThread();
575 PreparedStatementTask::Execute(conn, stmt.get());
576 });
577}
578
579template <class T>
581{
582 if (!sql)
583 return;
584
585 T* connection = GetFreeConnection();
586 BasicStatementTask::Execute(connection, sql);
587 connection->Unlock();
588}
589
590template <class T>
592{
593 T* connection = GetFreeConnection();
594 PreparedStatementTask::Execute(connection, stmt);
595 connection->Unlock();
596
598 delete stmt;
599}
600
601template <class T>
603{
604 if (!trans)
605 Execute(sql);
606 else
607 trans->Append(sql);
608}
609
610template <class T>
612{
613 if (!trans)
614 Execute(stmt);
615 else
616 trans->Append(stmt);
617}
618
std::shared_ptr< ResultSet > QueryResult
std::shared_ptr< Transaction< T > > SQLTransaction
std::shared_ptr< PreparedResultSet > PreparedQueryResult
static consteval uint32 ParseVersionString(std::string_view chars)
#define TRINITY_COMPILED_CLIENT_VERSION
uint8_t uint8
Definition Define.h:156
#define TC_DATABASE_API
Definition Define.h:111
uint32_t uint32
Definition Define.h:154
uint16 flags
#define WPFatal(cond,...)
Definition Errors.h:69
#define ASSERT
Definition Errors.h:80
#define TC_LOG_DEBUG(filterType__, message__,...)
Definition Log.h:181
#define TC_LOG_ERROR(filterType__, message__,...)
Definition Log.h:190
#define TC_LOG_INFO(filterType__, message__,...)
Definition Log.h:184
#define TC_LOG_WARN(filterType__, message__,...)
Definition Log.h:187
@ CONNECTION_SYNCH
@ CONNECTION_ASYNC
static bool Execute(MySQLConnection *conn, char const *sql)
static QueryResult Query(MySQLConnection *conn, char const *sql)
uint32 OpenConnections(InternalIndex type, uint8 numConnections)
void CommitTransaction(SQLTransaction< T > transaction)
QueryResult Query(char const *sql, T *connection=nullptr)
bool PrepareStatements()
Prepares all prepared statements.
T * GetAsyncConnectionForCurrentThread() const
void SetConnectionInfo(std::string const &infoString, uint8 const asyncThreads, uint8 const synchThreads)
static void WarnAboutSyncQueries(bool warn)
void ExecuteOrAppend(SQLTransaction< T > &trans, char const *sql)
std::atomic< size_t > _queueSize
SQLTransaction< T > BeginTransaction()
Begins an automanaged transaction pointer that will automatically rollback if not commited....
void DirectCommitTransaction(SQLTransaction< T > &transaction)
void KeepAlive()
Keeps all our MySQL connections alive, prevent the server from disconnecting us.
T::Statements PreparedStatementIndex
char const * GetDatabaseName() const
PreparedStatement< T > * GetPreparedStatement(PreparedStatementIndex index)
void DirectExecute(char const *sql)
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr< SQLQueryHolder< T > > holder)
void EscapeString(std::string &str)
Apply escape string'ing for current collation. (utf8)
void Execute(char const *sql)
QueryCallback AsyncQuery(char const *sql)
TransactionCallback AsyncCommitTransaction(SQLTransaction< T > transaction)
static PreparedQueryResult Query(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, SQLQueryHolderBase *holder)
static bool Execute(MySQLConnection *conn, std::shared_ptr< TransactionBase > trans)
QueueSizeTracker & operator=(QueueSizeTracker const &other)
QueueSizeTracker(DatabaseWorkerPool *pool)
QueueSizeTracker(QueueSizeTracker &&other) noexcept
QueueSizeTracker & operator=(QueueSizeTracker &&other) noexcept
QueueSizeTracker(QueueSizeTracker const &other)