36#include <boost/asio/use_future.hpp>
37#include <mysqld_error.h>
40#include <boost/stacktrace.hpp>
48 for (std::size_t i = 0; i < chars.length(); ++i)
54 throw "Too many . characters in version string";
56 result += partialResult * multiplier;
60 else if (c >=
'0' && c <=
'9')
63 partialResult += c -
'0';
66 throw "Invalid input character";
69 result += partialResult * multiplier;
77template<
typename Database>
78thread_local bool WarnSyncQueries =
false;
97 if (_pool != other.
_pool)
112 if (_pool != other._pool)
117 _pool = std::exchange(other._pool,
nullptr);
134 : _async_threads(0), _synch_threads(0)
139#if TRINITY_PLATFORM == TRINITY_PLATFORM_WINDOWS
140#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
141#define TRINITY_COMPILED_CLIENT_VERSION MARIADB_PACKAGE_VERSION_ID
143#define TRINITY_COMPILED_CLIENT_VERSION MYSQL_VERSION_ID
146#undef TRINITY_COMPILED_CLIENT_VERSION
157 uint8 const asyncThreads,
uint8 const synchThreads)
159 _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
161 _async_threads = asyncThreads;
162 _synch_threads = synchThreads;
168 WPFatal(_connectionInfo.get(),
"Connection info was not set!");
170 TC_LOG_INFO(
"sql.driver",
"Opening DatabasePool '{}'. "
171 "Asynchronous connections: {}, synchronous connections: {}.",
172 GetDatabaseName(), _async_threads, _synch_threads);
174 _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
176 uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
181 error = OpenConnections(IDX_SYNCH, _synch_threads);
186 for (std::unique_ptr<T>
const& connection : _connections[IDX_ASYNC])
187 connection->StartWorkerThread(_ioContext.get());
189 TC_LOG_INFO(
"sql.driver",
"DatabasePool '{}' opened successfully. "
190 "{} total connections running.", GetDatabaseName(),
191 (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
199 TC_LOG_INFO(
"sql.driver",
"Closing down DatabasePool '{}'.", GetDatabaseName());
205 _connections[IDX_ASYNC].clear();
209 TC_LOG_INFO(
"sql.driver",
"Asynchronous connections on DatabasePool '{}' terminated. "
210 "Proceeding with synchronous connections.",
217 _connections[IDX_SYNCH].clear();
219 TC_LOG_INFO(
"sql.driver",
"All connections on DatabasePool '{}' closed.", GetDatabaseName());
225 for (
auto& connections : _connections)
227 for (
auto& connection : connections)
229 connection->LockIfReady();
230 if (!connection->PrepareStatements())
232 connection->Unlock();
237 connection->Unlock();
239 size_t const preparedSize = connection->m_stmts.size();
240 if (_preparedStatementSize.size() < preparedSize)
241 _preparedStatementSize.resize(preparedSize);
243 for (
size_t i = 0; i < preparedSize; ++i)
247 if (_preparedStatementSize[i] > 0)
252 uint32 const paramCount = stmt->GetParameterCount();
255 ASSERT(paramCount < std::numeric_limits<uint8>::max());
257 _preparedStatementSize[i] =
static_cast<uint8>(paramCount);
270 connection = GetFreeConnection();
273 connection->Unlock();
281 T* connection = GetFreeConnection();
283 connection->Unlock();
294 std::future<QueryResult> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([
this, sql = std::string(sql), tracker =
QueueSizeTracker(
this)]
296 T* conn = GetAsyncConnectionForCurrentThread();
305 std::future<PreparedQueryResult> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([
this, stmt = std::unique_ptr<
PreparedStatement<T>>(stmt), tracker =
QueueSizeTracker(
this)]
307 T* conn = GetAsyncConnectionForCurrentThread();
316 std::future<void> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([
this, holder, tracker =
QueueSizeTracker(
this)]
318 T* conn = GetAsyncConnectionForCurrentThread();
321 return { std::move(holder), std::move(result) };
327 return std::make_shared<Transaction<T>>();
337 switch (transaction->GetSize())
340 TC_LOG_DEBUG(
"sql.driver",
"Transaction contains 0 queries. Not executing.");
343 TC_LOG_DEBUG(
"sql.driver",
"Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
350 boost::asio::post(_ioContext->get_executor(), [
this, transaction, tracker =
QueueSizeTracker(
this)]
352 T* conn = GetAsyncConnectionForCurrentThread();
364 switch (transaction->GetSize())
367 TC_LOG_DEBUG(
"sql.driver",
"Transaction contains 0 queries. Not executing.");
370 TC_LOG_DEBUG(
"sql.driver",
"Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
377 std::future<bool> result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([
this, transaction, tracker =
QueueSizeTracker(
this)]
379 T* conn = GetAsyncConnectionForCurrentThread();
388 T* connection = GetFreeConnection();
389 int errorCode = connection->ExecuteTransaction(transaction);
392 connection->Unlock();
398 if (errorCode == ER_LOCK_DEADLOCK)
401 uint8 loopBreaker = 5;
402 for (
uint8 i = 0; i < loopBreaker; ++i)
404 if (!connection->ExecuteTransaction(transaction))
410 transaction->Cleanup();
412 connection->Unlock();
427 char* buf =
new char[str.size() * 2 + 1];
428 EscapeString(buf, str.c_str(),
uint32(str.size()));
437 for (
auto& connection : _connections[IDX_SYNCH])
439 if (connection->LockIfReady())
442 connection->Unlock();
449 auto const count = _connections[IDX_ASYNC].size();
450 for (
uint8 i = 0; i < count; ++i)
452 boost::asio::post(_ioContext->get_executor(), [
this, tracker =
QueueSizeTracker(
this)]
454 T* conn = GetAsyncConnectionForCurrentThread();
464 WarnSyncQueries<T> = warn;
471 for (
uint8 i = 0; i < numConnections; ++i)
476 std::unique_ptr<T> connection = std::make_unique<T>(*_connectionInfo,
flags[type]);
478 if (
uint32 error = connection->Open())
481 _connections[type].clear();
484 else if (
uint32 serverVersion = connection->GetServerVersion(); serverVersion <
ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION))
486 TC_LOG_ERROR(
"sql.driver",
"TrinityCore does not support " TRINITY_MYSQL_FLAVOR
" versions below " TRINITY_REQUIRED_MYSQL_VERSION
" (found id {}, need id >= {}), please update your " TRINITY_MYSQL_FLAVOR
" server", serverVersion,
ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION));
491 _connections[type].push_back(std::move(connection));
502 if (!to || !from || !length)
505 return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
518 if (WarnSyncQueries<T>)
520 TC_LOG_WARN(
"sql.performances",
"Sync query at:\n{}", boost::stacktrace::to_string(boost::stacktrace::stacktrace()));
525 auto const num_cons = _connections[IDX_SYNCH].size();
526 T* connection =
nullptr;
530 connection = _connections[IDX_SYNCH][i++ % num_cons].get();
532 if (connection->LockIfReady())
542 std::thread::id
id = std::this_thread::get_id();
543 for (
auto&& connection : _connections[IDX_ASYNC])
544 if (connection->GetWorkerThreadId() == id)
545 return connection.get();
553 return _connectionInfo->database.c_str();
562 boost::asio::post(_ioContext->get_executor(), [
this, sql = std::string(sql), tracker =
QueueSizeTracker(
this)]
564 T* conn = GetAsyncConnectionForCurrentThread();
574 T* conn = GetAsyncConnectionForCurrentThread();
585 T* connection = GetFreeConnection();
587 connection->Unlock();
593 T* connection = GetFreeConnection();
595 connection->Unlock();
std::shared_ptr< ResultSet > QueryResult
std::shared_ptr< Transaction< T > > SQLTransaction
std::shared_ptr< PreparedResultSet > PreparedQueryResult
static consteval uint32 ParseVersionString(std::string_view chars)
#define TRINITY_COMPILED_CLIENT_VERSION
#define WPFatal(cond,...)
#define TC_LOG_DEBUG(filterType__, message__,...)
#define TC_LOG_ERROR(filterType__, message__,...)
#define TC_LOG_INFO(filterType__, message__,...)
#define TC_LOG_WARN(filterType__, message__,...)
static bool Execute(MySQLConnection *conn, char const *sql)
static QueryResult Query(MySQLConnection *conn, char const *sql)
uint32 OpenConnections(InternalIndex type, uint8 numConnections)
void CommitTransaction(SQLTransaction< T > transaction)
QueryResult Query(char const *sql, T *connection=nullptr)
bool PrepareStatements()
Prepares all prepared statements.
T * GetAsyncConnectionForCurrentThread() const
void SetConnectionInfo(std::string const &infoString, uint8 const asyncThreads, uint8 const synchThreads)
static void WarnAboutSyncQueries(bool warn)
void ExecuteOrAppend(SQLTransaction< T > &trans, char const *sql)
std::atomic< size_t > _queueSize
SQLTransaction< T > BeginTransaction()
Begins an automanaged transaction pointer that will automatically rollback if not commited....
void DirectCommitTransaction(SQLTransaction< T > &transaction)
void KeepAlive()
Keeps all our MySQL connections alive, prevent the server from disconnecting us.
T::Statements PreparedStatementIndex
char const * GetDatabaseName() const
PreparedStatement< T > * GetPreparedStatement(PreparedStatementIndex index)
void DirectExecute(char const *sql)
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr< SQLQueryHolder< T > > holder)
void EscapeString(std::string &str)
Apply escape string'ing for current collation. (utf8)
void Execute(char const *sql)
QueryCallback AsyncQuery(char const *sql)
TransactionCallback AsyncCommitTransaction(SQLTransaction< T > transaction)
static PreparedQueryResult Query(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, SQLQueryHolderBase *holder)
static bool Execute(MySQLConnection *conn, std::shared_ptr< TransactionBase > trans)
QueueSizeTracker & operator=(QueueSizeTracker const &other)
QueueSizeTracker(DatabaseWorkerPool *pool)
QueueSizeTracker(QueueSizeTracker &&other) noexcept
QueueSizeTracker & operator=(QueueSizeTracker &&other) noexcept
QueueSizeTracker(QueueSizeTracker const &other)
DatabaseWorkerPool * _pool