TrinityCore
DatabaseWorkerPool.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#include "DatabaseWorkerPool.h"
19#include "AdhocStatement.h"
20#include "Common.h"
21#include "Errors.h"
22#include "IoContext.h"
27#include "Log.h"
29#include "PreparedStatement.h"
31#include "QueryCallback.h"
32#include "QueryHolder.h"
33#include "QueryResult.h"
34#include "Transaction.h"
35#include "MySQLWorkaround.h"
36#include <boost/asio/use_future.hpp>
37#include <mysqld_error.h>
38#include <utility>
39#ifdef TRINITY_DEBUG
40#include <sstream>
41#include <boost/stacktrace.hpp>
42#endif
43
44#define MIN_MYSQL_SERVER_VERSION 50700u
45#define MIN_MYSQL_SERVER_VERSION_STRING "5.7"
46#define MIN_MYSQL_CLIENT_VERSION 50700u
47#define MIN_MYSQL_CLIENT_VERSION_STRING "5.7"
48
49#define MIN_MARIADB_SERVER_VERSION 100209u
50#define MIN_MARIADB_SERVER_VERSION_STRING "10.2.9"
51#define MIN_MARIADB_CLIENT_VERSION 30003u
52#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"
53
54template<typename T>
56{
57 explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool)
58 {
59 ++_pool->_queueSize;
60 }
61
62 QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; }
63 QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { }
64
66 {
67 if (this != &other)
68 {
69 if (_pool != other._pool)
70 {
71 if (_pool)
72 --_pool->_queueSize;
73 if (other._pool)
74 ++other._pool->_queueSize;
75 }
76 _pool = other._pool;
77 }
78 return *this;
79 }
81 {
82 if (this != &other)
83 {
84 if (_pool != other._pool)
85 {
86 if (_pool)
87 --_pool->_queueSize;
88 }
89 _pool = std::exchange(other._pool, nullptr);
90 }
91 return *this;
92 }
93
95 {
96 if (_pool)
97 --_pool->_queueSize;
98 }
99
100private:
102};
103
104template <class T>
106 : _async_threads(0), _synch_threads(0)
107{
108 WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
109
110#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
111 WPFatal(mysql_get_client_version() >= MIN_MARIADB_CLIENT_VERSION, "TrinityCore does not support MariaDB versions below " MIN_MARIADB_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MariaDB client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MARIADB_CLIENT_VERSION);
112 WPFatal(mysql_get_client_version() == MARIADB_PACKAGE_VERSION_ID, "Used MariaDB library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MARIADB_PACKAGE_VERSION_ID);
113#else
114 WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below " MIN_MYSQL_CLIENT_VERSION_STRING " (found %s id %lu, need id >= %u), please update your MySQL client library", mysql_get_client_info(), mysql_get_client_version(), MIN_MYSQL_CLIENT_VERSION);
115 WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
116#endif
117}
118
119template <class T>
121{
122}
123
124template <class T>
125void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
126 uint8 const asyncThreads, uint8 const synchThreads)
127{
128 _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
129
130 _async_threads = asyncThreads;
131 _synch_threads = synchThreads;
132}
133
134template <class T>
136{
137 WPFatal(_connectionInfo.get(), "Connection info was not set!");
138
139 TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
140 "Asynchronous connections: {}, synchronous connections: {}.",
141 GetDatabaseName(), _async_threads, _synch_threads);
142
143 _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
144
145 uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
146
147 if (error)
148 return error;
149
150 error = OpenConnections(IDX_SYNCH, _synch_threads);
151
152 if (error)
153 return error;
154
155 for (std::unique_ptr<T> const& connection : _connections[IDX_ASYNC])
156 connection->StartWorkerThread(_ioContext.get());
157
158 TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
159 "{} total connections running.", GetDatabaseName(),
160 (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
161
162 return 0;
163}
164
165template <class T>
167{
168 TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
169
170 if (_ioContext)
171 _ioContext->stop();
172
174 _connections[IDX_ASYNC].clear();
175
176 _ioContext.reset();
177
178 TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
179 "Proceeding with synchronous connections.",
180 GetDatabaseName());
181
186 _connections[IDX_SYNCH].clear();
187
188 TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
189}
190
191template <class T>
193{
194 for (auto& connections : _connections)
195 {
196 for (auto& connection : connections)
197 {
198 connection->LockIfReady();
199 if (!connection->PrepareStatements())
200 {
201 connection->Unlock();
202 Close();
203 return false;
204 }
205 else
206 connection->Unlock();
207
208 size_t const preparedSize = connection->m_stmts.size();
209 if (_preparedStatementSize.size() < preparedSize)
210 _preparedStatementSize.resize(preparedSize);
211
212 for (size_t i = 0; i < preparedSize; ++i)
213 {
214 // already set by another connection
215 // (each connection only has prepared statements of it's own type sync/async)
216 if (_preparedStatementSize[i] > 0)
217 continue;
218
219 if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get())
220 {
221 uint32 const paramCount = stmt->GetParameterCount();
222
223 // TC only supports uint8 indices.
224 ASSERT(paramCount < std::numeric_limits<uint8>::max());
225
226 _preparedStatementSize[i] = static_cast<uint8>(paramCount);
227 }
228 }
229 }
230 }
231
232 return true;
233}
234
235template <class T>
236QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= nullptr*/)
237{
238 if (!connection)
239 connection = GetFreeConnection();
240
241 QueryResult result = BasicStatementTask::Query(connection, sql);
242 connection->Unlock();
243
244 return result;
245}
246
247template <class T>
249{
250 T* connection = GetFreeConnection();
252 connection->Unlock();
253
255 delete stmt;
256
257 return ret;
258}
259
260template <class T>
262{
263 QueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
264 {
265 T* conn = GetAsyncConnectionForCurrentThread();
266 return BasicStatementTask::Query(conn, sql.c_str());
267 }));
268 return QueryCallback(std::move(result));
269}
270
271template <class T>
273{
274 PreparedQueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
275 {
276 T* conn = GetAsyncConnectionForCurrentThread();
277 return PreparedStatementTask::Query(conn, stmt.get());
278 }));
279 return QueryCallback(std::move(result));
280}
281
282template <class T>
284{
285 QueryResultHolderFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)]
286 {
287 T* conn = GetAsyncConnectionForCurrentThread();
288 SQLQueryHolderTask::Execute(conn, holder.get());
289 }));
290 return { std::move(holder), std::move(result) };
291}
292
293template <class T>
295{
296 return std::make_shared<Transaction<T>>();
297}
298
299template <class T>
301{
302#ifdef TRINITY_DEBUG
306 switch (transaction->GetSize())
307 {
308 case 0:
309 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
310 return;
311 case 1:
312 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
313 break;
314 default:
315 break;
316 }
317#endif // TRINITY_DEBUG
318
319 boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)]
320 {
321 T* conn = GetAsyncConnectionForCurrentThread();
322 TransactionTask::Execute(conn, transaction);
323 });
324}
325
326template <class T>
328{
329#ifdef TRINITY_DEBUG
333 switch (transaction->GetSize())
334 {
335 case 0:
336 TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
337 break;
338 case 1:
339 TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
340 break;
341 default:
342 break;
343 }
344#endif // TRINITY_DEBUG
345
346 TransactionFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)]
347 {
348 T* conn = GetAsyncConnectionForCurrentThread();
349 return TransactionTask::Execute(conn, transaction);
350 }));
351 return TransactionCallback(std::move(result));
352}
353
354template <class T>
356{
357 T* connection = GetFreeConnection();
358 int errorCode = connection->ExecuteTransaction(transaction);
359 if (!errorCode)
360 {
361 connection->Unlock(); // OK, operation succesful
362 return;
363 }
364
367 if (errorCode == ER_LOCK_DEADLOCK)
368 {
369 //todo: handle multiple sync threads deadlocking in a similar way as async threads
370 uint8 loopBreaker = 5;
371 for (uint8 i = 0; i < loopBreaker; ++i)
372 {
373 if (!connection->ExecuteTransaction(transaction))
374 break;
375 }
376 }
377
379 transaction->Cleanup();
380
381 connection->Unlock();
382}
383
384template <class T>
386{
387 return new PreparedStatement<T>(index, _preparedStatementSize[index]);
388}
389
390template <class T>
392{
393 if (str.empty())
394 return;
395
396 char* buf = new char[str.size() * 2 + 1];
397 EscapeString(buf, str.c_str(), uint32(str.size()));
398 str = buf;
399 delete[] buf;
400}
401
402template <class T>
404{
406 for (auto& connection : _connections[IDX_SYNCH])
407 {
408 if (connection->LockIfReady())
409 {
410 connection->Ping();
411 connection->Unlock();
412 }
413 }
414
418 auto const count = _connections[IDX_ASYNC].size();
419 for (uint8 i = 0; i < count; ++i)
420 {
421 boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)]
422 {
423 T* conn = GetAsyncConnectionForCurrentThread();
424 conn->Ping();
425 });
426 }
427}
428
429template <class T>
431{
432 for (uint8 i = 0; i < numConnections; ++i)
433 {
434 // Create the connection
435 constexpr std::array<ConnectionFlags, IDX_SIZE> flags = { { CONNECTION_ASYNC, CONNECTION_SYNCH } };
436
437 std::unique_ptr<T> connection = std::make_unique<T>(*_connectionInfo, flags[type]);
438
439 if (uint32 error = connection->Open())
440 {
441 // Failed to open a connection or invalid version, abort and cleanup
442 _connections[type].clear();
443 return error;
444 }
445#ifndef LIBMARIADB
446 else if (connection->GetServerVersion() < MIN_MYSQL_SERVER_VERSION)
447#else
448 else if (connection->GetServerVersion() < MIN_MARIADB_SERVER_VERSION)
449#endif
450 {
451#ifndef LIBMARIADB
452 TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below " MIN_MYSQL_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MYSQL_SERVER_VERSION);
453#else
454 TC_LOG_ERROR("sql.driver", "TrinityCore does not support MariaDB versions below " MIN_MARIADB_SERVER_VERSION_STRING " (found id {}, need id >= {}), please update your MySQL server", connection->GetServerVersion(), MIN_MARIADB_SERVER_VERSION);
455#endif
456
457 return 1;
458 }
459 else
460 {
461 _connections[type].push_back(std::move(connection));
462 }
463 }
464
465 // Everything is fine
466 return 0;
467}
468
469template <class T>
470unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, unsigned long length)
471{
472 if (!to || !from || !length)
473 return 0;
474
475 return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
476}
477
478template <class T>
480{
481 return _queueSize;
482}
483
484template <class T>
486{
487#ifdef TRINITY_DEBUG
488 if (_warnSyncQueries)
489 {
490 std::ostringstream ss;
491 ss << boost::stacktrace::stacktrace();
492 TC_LOG_WARN("sql.performances", "Sync query at:\n{}", ss.str());
493 }
494#endif
495
496 uint8 i = 0;
497 auto const num_cons = _connections[IDX_SYNCH].size();
498 T* connection = nullptr;
500 for (;;)
501 {
502 connection = _connections[IDX_SYNCH][i++ % num_cons].get();
504 if (connection->LockIfReady())
505 break;
506 }
507
508 return connection;
509}
510
511template <class T>
513{
514 std::thread::id id = std::this_thread::get_id();
515 for (auto&& connection : _connections[IDX_ASYNC])
516 if (connection->GetWorkerThreadId() == id)
517 return connection.get();
518
519 return nullptr;
520}
521
522template <class T>
524{
525 return _connectionInfo->database.c_str();
526}
527
528template <class T>
530{
531 if (!sql)
532 return;
533
534 boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
535 {
536 T* conn = GetAsyncConnectionForCurrentThread();
537 BasicStatementTask::Execute(conn, sql.c_str());
538 });
539}
540
541template <class T>
543{
544 boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
545 {
546 T* conn = GetAsyncConnectionForCurrentThread();
547 PreparedStatementTask::Execute(conn, stmt.get());
548 });
549}
550
551template <class T>
553{
554 if (!sql)
555 return;
556
557 T* connection = GetFreeConnection();
558 BasicStatementTask::Execute(connection, sql);
559 connection->Unlock();
560}
561
562template <class T>
564{
565 T* connection = GetFreeConnection();
566 PreparedStatementTask::Execute(connection, stmt);
567 connection->Unlock();
568
570 delete stmt;
571}
572
573template <class T>
575{
576 if (!trans)
577 Execute(sql);
578 else
579 trans->Append(sql);
580}
581
582template <class T>
584{
585 if (!trans)
586 Execute(stmt);
587 else
588 trans->Append(stmt);
589}
590
std::future< PreparedQueryResult > PreparedQueryResultFuture
std::future< QueryResult > QueryResultFuture
std::shared_ptr< ResultSet > QueryResult
std::shared_ptr< Transaction< T > > SQLTransaction
std::shared_ptr< PreparedResultSet > PreparedQueryResult
std::future< bool > TransactionFuture
std::future< void > QueryResultHolderFuture
#define MIN_MARIADB_CLIENT_VERSION
#define MIN_MARIADB_CLIENT_VERSION_STRING
#define MIN_MYSQL_SERVER_VERSION_STRING
#define MIN_MARIADB_SERVER_VERSION
#define MIN_MYSQL_CLIENT_VERSION
#define MIN_MARIADB_SERVER_VERSION_STRING
#define MIN_MYSQL_SERVER_VERSION
#define MIN_MYSQL_CLIENT_VERSION_STRING
uint8_t uint8
Definition: Define.h:144
#define TC_DATABASE_API
Definition: Define.h:111
uint32_t uint32
Definition: Define.h:142
uint16 flags
Definition: DisableMgr.cpp:49
#define WPFatal(cond,...)
Definition: Errors.h:58
#define ASSERT
Definition: Errors.h:68
#define TC_LOG_WARN(filterType__,...)
Definition: Log.h:162
#define TC_LOG_DEBUG(filterType__,...)
Definition: Log.h:156
#define TC_LOG_ERROR(filterType__,...)
Definition: Log.h:165
#define TC_LOG_INFO(filterType__,...)
Definition: Log.h:159
@ CONNECTION_SYNCH
@ CONNECTION_ASYNC
static bool Execute(MySQLConnection *conn, char const *sql)
static QueryResult Query(MySQLConnection *conn, char const *sql)
uint32 OpenConnections(InternalIndex type, uint8 numConnections)
void CommitTransaction(SQLTransaction< T > transaction)
QueryResult Query(char const *sql, T *connection=nullptr)
bool PrepareStatements()
Prepares all prepared statements.
T * GetAsyncConnectionForCurrentThread() const
void SetConnectionInfo(std::string const &infoString, uint8 const asyncThreads, uint8 const synchThreads)
void ExecuteOrAppend(SQLTransaction< T > &trans, char const *sql)
std::atomic< size_t > _queueSize
SQLTransaction< T > BeginTransaction()
Begins an automanaged transaction pointer that will automatically rollback if not commited....
void DirectCommitTransaction(SQLTransaction< T > &transaction)
void KeepAlive()
Keeps all our MySQL connections alive, prevent the server from disconnecting us.
T::Statements PreparedStatementIndex
char const * GetDatabaseName() const
PreparedStatement< T > * GetPreparedStatement(PreparedStatementIndex index)
void DirectExecute(char const *sql)
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr< SQLQueryHolder< T > > holder)
void EscapeString(std::string &str)
Apply escape string'ing for current collation. (utf8)
void Execute(char const *sql)
QueryCallback AsyncQuery(char const *sql)
TransactionCallback AsyncCommitTransaction(SQLTransaction< T > transaction)
static PreparedQueryResult Query(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, SQLQueryHolderBase *holder)
Definition: QueryHolder.cpp:75
static bool Execute(MySQLConnection *conn, std::shared_ptr< TransactionBase > trans)
Definition: Transaction.cpp:57
decltype(auto) post(boost::asio::io_context &ioContext, T &&t)
Definition: IoContext.h:54
constexpr std::size_t size()
Definition: UpdateField.h:796
QueueSizeTracker & operator=(QueueSizeTracker const &other)
QueueSizeTracker(DatabaseWorkerPool *pool)
QueueSizeTracker(QueueSizeTracker &&other) noexcept
QueueSizeTracker & operator=(QueueSizeTracker &&other) noexcept
QueueSizeTracker(QueueSizeTracker const &other)