TrinityCore
DatabaseWorkerPool.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#include "DatabaseWorkerPool.h"
19#include "AdhocStatement.h"
20#include "Common.h"
21#include "Errors.h"
22#include "IoContext.h"
27#include "Log.h"
29#include "PreparedStatement.h"
31#include "QueryCallback.h"
32#include "QueryHolder.h"
33#include "QueryResult.h"
34#include "Transaction.h"
35#include "MySQLWorkaround.h"
36#include <boost/asio/use_future.hpp>
37#include <mysqld_error.h>
38#include <utility>
39#ifdef TRINITY_DEBUG
40#include <boost/stacktrace.hpp>
41#endif
42
43#define MIN_MYSQL_SERVER_VERSION 50700u
44#define MIN_MYSQL_SERVER_VERSION_STRING "5.7"
45#define MIN_MYSQL_CLIENT_VERSION 50700u
46#define MIN_MYSQL_CLIENT_VERSION_STRING "5.7"
47
48#define MIN_MARIADB_SERVER_VERSION 100209u
49#define MIN_MARIADB_SERVER_VERSION_STRING "10.2.9"
50#define MIN_MARIADB_CLIENT_VERSION 30003u
51#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"
52
53namespace
54{
55#ifdef TRINITY_DEBUG
56template<typename Database>
57thread_local bool WarnSyncQueries = false;
58#endif
59}
60
61template<typename T>
63{
64 explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool)
65 {
66 ++_pool->_queueSize;
67 }
68
69 QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; }
70 QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { }
71
73 {
74 if (this != &other)
75 {
76 if (_pool != other._pool)
77 {
78 if (_pool)
79 --_pool->_queueSize;
80 if (other._pool)
81 ++other._pool->_queueSize;
82 }
83 _pool = other._pool;
84 }
85 return *this;
86 }
88 {
89 if (this != &other)
90 {
91 if (_pool != other._pool)
92 {
93 if (_pool)
94 --_pool->_queueSize;
95 }
96 _pool = std::exchange(other._pool, nullptr);
97 }
98 return *this;
99 }
100
102 {
103 if (_pool)
104 --_pool->_queueSize;
105 }
106
107private:
109};
110
111template <class T>
113 : _async_threads(0), _synch_threads(0)
114{
115 WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
116
117#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
118 WPFatal(mysql_get_client_version() >= MIN_MARIADB_CLIENT_VERSION, "TrinityCore does not support MariaDB versions below " MIN_MARIADB_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MariaDB client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MARIADB_CLIENT_VERSION);
119 WPFatal(mysql_get_client_version() == MARIADB_PACKAGE_VERSION_ID, "Used MariaDB library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MARIADB_PACKAGE_VERSION_ID);
120#else
121 WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below " MIN_MYSQL_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MySQL client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MYSQL_CLIENT_VERSION);
122 WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
123#endif
124}
125
126template <class T>
128{
129}
130
131template <class T>
132void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
133 uint8 const asyncThreads, uint8 const synchThreads)
134{
135 _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
136
137 _async_threads = asyncThreads;
138 _synch_threads = synchThreads;
139}
140
141template <class T>
143{
144 WPFatal(_connectionInfo.get(), "Connection info was not set!");
145
146 TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
147 "Asynchronous connections: {}, synchronous connections: {}.",
148 GetDatabaseName(), _async_threads, _synch_threads);
149
150 _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
151
152 uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
153
154 if (error)
155 return error;
156
157 error = OpenConnections(IDX_SYNCH, _synch_threads);
158
159 if (error)
160 return error;
161
162 for (std::unique_ptr<T> const& connection : _connections[IDX_ASYNC])
163 connection->StartWorkerThread(_ioContext.get());
164
165 TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
166 "{} total connections running.", GetDatabaseName(),
167 (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
168
169 return 0;
170}
171
172template <class T>
174{
175 TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
176
177 if (_ioContext)
178 _ioContext->stop();
179
181 _connections[IDX_ASYNC].clear();
182
183 _ioContext.reset();
184
185 TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
186 "Proceeding with synchronous connections.",
187 GetDatabaseName());
188
193 _connections[IDX_SYNCH].clear();
194
195 TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
196}
197
198template <class T>
200{
201 for (auto& connections : _connections)
202 {
203 for (auto& connection : connections)
204 {
205 connection->LockIfReady();
206 if (!connection->PrepareStatements())
207 {
208 connection->Unlock();
209 Close();
210 return false;
211 }
212 else
213 connection->Unlock();
214
215 size_t const preparedSize = connection->m_stmts.size();
216 if (_preparedStatementSize.size() < preparedSize)
217 _preparedStatementSize.resize(preparedSize);
218
219 for (size_t i = 0; i < preparedSize; ++i)
220 {
221 // already set by another connection
222 // (each connection only has prepared statements of it's own type sync/async)
223 if (_preparedStatementSize[i] > 0)
224 continue;
225
226 if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get())
227 {
228 uint32 const paramCount = stmt->GetParameterCount();
229
230 // TC only supports uint8 indices.
231 ASSERT(paramCount < std::numeric_limits<uint8>::max());
232
233 _preparedStatementSize[i] = static_cast<uint8>(paramCount);
234 }
235 }
236 }
237 }
238
239 return true;
240}
241
242template <class T>
243QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
244{
245 if (!connection)
246 connection = GetFreeConnection();
247
248 QueryResult result = BasicStatementTask::Query(connection, sql);
249 connection->Unlock();
250
251 return result;
252}
253
254template <class T>
256{
257 T* connection = GetFreeConnection();
259 connection->Unlock();
260
262 delete stmt;
263
264 return ret;
265}
266
267template <class T>
269{
270 std::future<QueryResult> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
271 {
272 T* conn = GetAsyncConnectionForCurrentThread();
273 return BasicStatementTask::Query(conn, sql.c_str());
274 }));
275 return QueryCallback(std::move(result));
276}
277
278template <class T>
280{
281 std::future<PreparedQueryResult> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
282 {
283 T* conn = GetAsyncConnectionForCurrentThread();
284 return PreparedStatementTask::Query(conn, stmt.get());
285 }));
286 return QueryCallback(std::move(result));
287}
288
289template <class T>
291{
292 std::future<void> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)]
293 {
294 T* conn = GetAsyncConnectionForCurrentThread();
295 SQLQueryHolderTask::Execute(conn, holder.get());
296 }));
297 return { std::move(holder), std::move(result) };
298}
299
300template <class T>
302{
303 return std::make_shared<Transaction<T>>();
304}
305
306template <class T>
308{
309#ifdef TRINITY_DEBUG
313 switch (transaction->GetSize())
314 {
315 case 0:
316 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
317 return;
318 case 1:
319 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
320 break;
321 default:
322 break;
323 }
324#endif // TRINITY_DEBUG
325
326 boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)]
327 {
328 T* conn = GetAsyncConnectionForCurrentThread();
329 TransactionTask::Execute(conn, transaction);
330 });
331}
332
333template <class T>
335{
336#ifdef TRINITY_DEBUG
340 switch (transaction->GetSize())
341 {
342 case 0:
343 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
344 break;
345 case 1:
346 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
347 break;
348 default:
349 break;
350 }
351#endif // TRINITY_DEBUG
352
353 std::future<bool> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)]
354 {
355 T* conn = GetAsyncConnectionForCurrentThread();
356 return TransactionTask::Execute(conn, transaction);
357 }));
358 return TransactionCallback(std::move(result));
359}
360
361template <class T>
363{
364 T* connection = GetFreeConnection();
365 int errorCode = connection->ExecuteTransaction(transaction);
366 if (!errorCode)
367 {
368 connection->Unlock(); // OK, operation succesful
369 return;
370 }
371
374 if (errorCode == ER_LOCK_DEADLOCK)
375 {
376 //todo: handle multiple sync threads deadlocking in a similar way as async threads
377 uint8 loopBreaker = 5;
378 for (uint8 i = 0; i < loopBreaker; ++i)
379 {
380 if (!connection->ExecuteTransaction(transaction))
381 break;
382 }
383 }
384
386 transaction->Cleanup();
387
388 connection->Unlock();
389}
390
391template <class T>
393{
394 return new PreparedStatement<T>(index, _preparedStatementSize[index]);
395}
396
397template <class T>
399{
400 if (str.empty())
401 return;
402
403 char* buf = new char[str.size() * 2 + 1];
404 EscapeString(buf, str.c_str(), uint32(str.size()));
405 str = buf;
406 delete[] buf;
407}
408
409template <class T>
411{
413 for (auto& connection : _connections[IDX_SYNCH])
414 {
415 if (connection->LockIfReady())
416 {
417 connection->Ping();
418 connection->Unlock();
419 }
420 }
421
425 auto const count = _connections[IDX_ASYNC].size();
426 for (uint8 i = 0; i < count; ++i)
427 {
428 boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)]
429 {
430 T* conn = GetAsyncConnectionForCurrentThread();
431 conn->Ping();
432 });
433 }
434}
435
436#ifdef TRINITY_DEBUG
437template <class T>
438void DatabaseWorkerPool<T>::WarnAboutSyncQueries([[maybe_unused]] bool warn)
439{
440 WarnSyncQueries<T> = warn;
441}
442#endif
443
444template <class T>
446{
447 for (uint8 i = 0; i < numConnections; ++i)
448 {
449 // Create the connection
450 constexpr std::array<ConnectionFlags, IDX_SIZE> flags = { { CONNECTION_ASYNC, CONNECTION_SYNCH } };
451
452 std::unique_ptr<T> connection = std::make_unique<T>(*_connectionInfo, flags[type]);
453
454 if (uint32 error = connection->Open())
455 {
456 // Failed to open a connection or invalid version, abort and cleanup
457 _connections[type].clear();
458 return error;
459 }
460#ifndef LIBMARIADB
461 else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
462#else
463 else if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
464#endif
465 {
466#ifndef LIBMARIADB
467 TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
468#else
469 TC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
470#endif
471
472 return 1;
473 }
474 else
475 {
476 _connections[type].push_back(std::move(connection));
477 }
478 }
479
480 // Everything is fine
481 return 0;
482}
483
484template <class T>
485unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
486{
487 if (!to || !from || !length)
488 return 0;
489
490 return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
491}
492
493template <class T>
495{
496 return _queueSize;
497}
498
499template <class T>
501{
502#ifdef TRINITY_DEBUG
503 if (WarnSyncQueries<T>)
504 {
505 TC_LOG_WARN("sql.performances", "Sync query at:\n{}", boost::stacktrace::to_string(boost::stacktrace::stacktrace()));
506 }
507#endif
508
509 uint8 i = 0;
510 auto const num_cons = _connections[IDX_SYNCH].size();
511 T* connection = nullptr;
513 for (;;)
514 {
515 connection = _connections[IDX_SYNCH][i++ % num_cons].get();
517 if (connection->LockIfReady())
518 break;
519 }
520
521 return connection;
522}
523
524template <class T>
526{
527 std::thread::id id = std::this_thread::get_id();
528 for (auto&& connection : _connections[IDX_ASYNC])
529 if (connection->GetWorkerThreadId() == id)
530 return connection.get();
531
532 return nullptr;
533}
534
535template <class T>
537{
538 return _connectionInfo->database.c_str();
539}
540
541template <class T>
543{
544 if (!sql)
545 return;
546
547 boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
548 {
549 T* conn = GetAsyncConnectionForCurrentThread();
550 BasicStatementTask::Execute(conn, sql.c_str());
551 });
552}
553
554template <class T>
556{
557 boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
558 {
559 T* conn = GetAsyncConnectionForCurrentThread();
560 PreparedStatementTask::Execute(conn, stmt.get());
561 });
562}
563
564template <class T>
566{
567 if (!sql)
568 return;
569
570 T* connection = GetFreeConnection();
571 BasicStatementTask::Execute(connection, sql);
572 connection->Unlock();
573}
574
575template <class T>
577{
578 T* connection = GetFreeConnection();
579 PreparedStatementTask::Execute(connection, stmt);
580 connection->Unlock();
581
583 delete stmt;
584}
585
586template <class T>
588{
589 if (!trans)
590 Execute(sql);
591 else
592 trans->Append(sql);
593}
594
595template <class T>
597{
598 if (!trans)
599 Execute(stmt);
600 else
601 trans->Append(stmt);
602}
603
std::shared_ptr< ResultSet > QueryResult
std::shared_ptr< Transaction< T > > SQLTransaction
std::shared_ptr< PreparedResultSet > PreparedQueryResult
#define MIN_MARIADB_CLIENT_VERSION
#define MIN_MARIADB_CLIENT_VERSION_STRING
#define MIN_MYSQL_SERVER_VERSION_STRING
#define MIN_MARIADB_SERVER_VERSION
#define MIN_MYSQL_CLIENT_VERSION
#define MIN_MARIADB_SERVER_VERSION_STRING
#define MIN_MYSQL_SERVER_VERSION
#define MIN_MYSQL_CLIENT_VERSION_STRING
uint8_t uint8
Definition: Define.h:144
#define TC_DATABASE_API
Definition: Define.h:111
uint32_t uint32
Definition: Define.h:142
uint16 flags
Definition: DisableMgr.cpp:49
#define WPFatal(cond,...)
Definition: Errors.h:58
#define ASSERT
Definition: Errors.h:68
#define TC_LOG_DEBUG(filterType__, message__,...)
Definition: Log.h:179
#define TC_LOG_ERROR(filterType__, message__,...)
Definition: Log.h:188
#define TC_LOG_INFO(filterType__, message__,...)
Definition: Log.h:182
#define TC_LOG_WARN(filterType__, message__,...)
Definition: Log.h:185
@ CONNECTION_SYNCH
@ CONNECTION_ASYNC
static bool Execute(MySQLConnection *conn, char const *sql)
static QueryResult Query(MySQLConnection *conn, char const *sql)
uint32 OpenConnections(InternalIndex type, uint8 numConnections)
void CommitTransaction(SQLTransaction< T > transaction)
QueryResult Query(char const *sql, T *connection=nullptr)
bool PrepareStatements()
Prepares all prepared statements.
T * GetAsyncConnectionForCurrentThread() const
void SetConnectionInfo(std::string const &infoString, uint8 const asyncThreads, uint8 const synchThreads)
static void WarnAboutSyncQueries(bool warn)
void ExecuteOrAppend(SQLTransaction< T > &trans, char const *sql)
std::atomic< size_t > _queueSize
SQLTransaction< T > BeginTransaction()
Begins an automanaged transaction pointer that will automatically rollback if not commited....
void DirectCommitTransaction(SQLTransaction< T > &transaction)
void KeepAlive()
Keeps all our MySQL connections alive, prevent the server from disconnecting us.
T::Statements PreparedStatementIndex
char const * GetDatabaseName() const
PreparedStatement< T > * GetPreparedStatement(PreparedStatementIndex index)
void DirectExecute(char const *sql)
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr< SQLQueryHolder< T > > holder)
void EscapeString(std::string &str)
Apply escape string'ing for current collation. (utf8)
void Execute(char const *sql)
QueryCallback AsyncQuery(char const *sql)
TransactionCallback AsyncCommitTransaction(SQLTransaction< T > transaction)
static PreparedQueryResult Query(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, SQLQueryHolderBase *holder)
Definition: QueryHolder.cpp:75
static bool Execute(MySQLConnection *conn, std::shared_ptr< TransactionBase > trans)
Definition: Transaction.cpp:59
void Execute(Creature *me, EventMap &events, uint32 eventId)
decltype(auto) post(boost::asio::io_context &ioContext, T &&t)
Definition: IoContext.h:54
constexpr std::size_t size()
Definition: UpdateField.h:769
QueueSizeTracker & operator=(QueueSizeTracker const &other)
QueueSizeTracker(DatabaseWorkerPool *pool)
QueueSizeTracker(QueueSizeTracker &&other) noexcept
QueueSizeTracker & operator=(QueueSizeTracker &&other) noexcept
QueueSizeTracker(QueueSizeTracker const &other)