36#include <boost/asio/use_future.hpp>
37#include <mysqld_error.h>
40#include <boost/stacktrace.hpp>
43#define MIN_MYSQL_SERVER_VERSION 50700u
44#define MIN_MYSQL_SERVER_VERSION_STRING "5.7"
45#define MIN_MYSQL_CLIENT_VERSION 50700u
46#define MIN_MYSQL_CLIENT_VERSION_STRING "5.7"
48#define MIN_MARIADB_SERVER_VERSION 100209u
49#define MIN_MARIADB_SERVER_VERSION_STRING "10.2.9"
50#define MIN_MARIADB_CLIENT_VERSION 30003u
51#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"
56template<
typename Database>
57thread_local bool WarnSyncQueries =
false;
76 if (_pool != other.
_pool)
91 if (_pool != other._pool)
96 _pool = std::exchange(other._pool,
nullptr);
113 : _async_threads(0), _synch_threads(0)
115 WPFatal(mysql_thread_safe(),
"Used MySQL library isn't thread-safe.");
117#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
119 WPFatal(mysql_get_client_version() == MARIADB_PACKAGE_VERSION_ID,
"Used MariaDB library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MARIADB_PACKAGE_VERSION_ID);
122 WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID,
"Used MySQL library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), MYSQL_VERSION_ID);
133 uint8 const asyncThreads,
uint8 const synchThreads)
135 _connectionInfo = std::make_unique<MySQLConnectionInfo>(infoString);
137 _async_threads = asyncThreads;
138 _synch_threads = synchThreads;
144 WPFatal(_connectionInfo.get(),
"Connection info was not set!");
146 TC_LOG_INFO(
"sql.driver",
"Opening DatabasePool '{}'. "
147 "Asynchronous connections: {}, synchronous connections: {}.",
148 GetDatabaseName(), _async_threads, _synch_threads);
150 _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
152 uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
157 error = OpenConnections(IDX_SYNCH, _synch_threads);
162 for (std::unique_ptr<T>
const& connection : _connections[IDX_ASYNC])
163 connection->StartWorkerThread(_ioContext.get());
165 TC_LOG_INFO(
"sql.driver",
"DatabasePool '{}' opened successfully. "
166 "{} total connections running.", GetDatabaseName(),
167 (_connections[IDX_SYNCH].
size() + _connections[IDX_ASYNC].
size()));
175 TC_LOG_INFO(
"sql.driver",
"Closing down DatabasePool '{}'.", GetDatabaseName());
181 _connections[IDX_ASYNC].clear();
185 TC_LOG_INFO(
"sql.driver",
"Asynchronous connections on DatabasePool '{}' terminated. "
186 "Proceeding with synchronous connections.",
193 _connections[IDX_SYNCH].clear();
195 TC_LOG_INFO(
"sql.driver",
"All connections on DatabasePool '{}' closed.", GetDatabaseName());
201 for (
auto& connections : _connections)
203 for (
auto& connection : connections)
205 connection->LockIfReady();
206 if (!connection->PrepareStatements())
208 connection->Unlock();
213 connection->Unlock();
215 size_t const preparedSize = connection->m_stmts.size();
216 if (_preparedStatementSize.size() < preparedSize)
217 _preparedStatementSize.resize(preparedSize);
219 for (
size_t i = 0; i < preparedSize; ++i)
223 if (_preparedStatementSize[i] > 0)
228 uint32 const paramCount = stmt->GetParameterCount();
231 ASSERT(paramCount < std::numeric_limits<uint8>::max());
233 _preparedStatementSize[i] =
static_cast<uint8>(paramCount);
246 connection = GetFreeConnection();
249 connection->Unlock();
257 T* connection = GetFreeConnection();
259 connection->Unlock();
270 std::future<QueryResult> result =
boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([
this, sql = std::string(sql), tracker =
QueueSizeTracker(
this)]
272 T* conn = GetAsyncConnectionForCurrentThread();
283 T* conn = GetAsyncConnectionForCurrentThread();
294 T* conn = GetAsyncConnectionForCurrentThread();
297 return { std::move(holder), std::move(result) };
303 return std::make_shared<Transaction<T>>();
313 switch (transaction->GetSize())
316 TC_LOG_DEBUG(
"sql.driver",
"Transaction contains 0 queries. Not executing.");
319 TC_LOG_DEBUG(
"sql.driver",
"Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
328 T* conn = GetAsyncConnectionForCurrentThread();
340 switch (transaction->GetSize())
343 TC_LOG_DEBUG(
"sql.driver",
"Transaction contains 0 queries. Not executing.");
346 TC_LOG_DEBUG(
"sql.driver",
"Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
355 T* conn = GetAsyncConnectionForCurrentThread();
364 T* connection = GetFreeConnection();
365 int errorCode = connection->ExecuteTransaction(transaction);
368 connection->Unlock();
374 if (errorCode == ER_LOCK_DEADLOCK)
377 uint8 loopBreaker = 5;
378 for (
uint8 i = 0; i < loopBreaker; ++i)
380 if (!connection->ExecuteTransaction(transaction))
386 transaction->Cleanup();
388 connection->Unlock();
403 char* buf =
new char[str.size() * 2 + 1];
404 EscapeString(buf, str.c_str(),
uint32(str.size()));
413 for (
auto& connection : _connections[IDX_SYNCH])
415 if (connection->LockIfReady())
418 connection->Unlock();
425 auto const count = _connections[IDX_ASYNC].size();
426 for (
uint8 i = 0; i < count; ++i)
430 T* conn = GetAsyncConnectionForCurrentThread();
440 WarnSyncQueries<T> = warn;
447 for (
uint8 i = 0; i < numConnections; ++i)
452 std::unique_ptr<T> connection = std::make_unique<T>(*_connectionInfo,
flags[type]);
454 if (
uint32 error = connection->Open())
457 _connections[type].clear();
476 _connections[type].push_back(std::move(connection));
487 if (!to || !from || !length)
490 return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
503 if (WarnSyncQueries<T>)
505 TC_LOG_WARN(
"sql.performances",
"Sync query at:\n{}", boost::stacktrace::to_string(boost::stacktrace::stacktrace()));
510 auto const num_cons = _connections[IDX_SYNCH].size();
511 T* connection =
nullptr;
515 connection = _connections[IDX_SYNCH][i++ % num_cons].get();
517 if (connection->LockIfReady())
527 std::thread::id
id = std::this_thread::get_id();
528 for (
auto&& connection : _connections[IDX_ASYNC])
529 if (connection->GetWorkerThreadId() ==
id)
530 return connection.get();
538 return _connectionInfo->database.c_str();
549 T* conn = GetAsyncConnectionForCurrentThread();
559 T* conn = GetAsyncConnectionForCurrentThread();
570 T* connection = GetFreeConnection();
572 connection->Unlock();
578 T* connection = GetFreeConnection();
580 connection->Unlock();
std::shared_ptr< ResultSet > QueryResult
std::shared_ptr< Transaction< T > > SQLTransaction
std::shared_ptr< PreparedResultSet > PreparedQueryResult
#define MIN_MARIADB_CLIENT_VERSION
#define MIN_MARIADB_CLIENT_VERSION_STRING
#define MIN_MYSQL_SERVER_VERSION_STRING
#define MIN_MARIADB_SERVER_VERSION
#define MIN_MYSQL_CLIENT_VERSION
#define MIN_MARIADB_SERVER_VERSION_STRING
#define MIN_MYSQL_SERVER_VERSION
#define MIN_MYSQL_CLIENT_VERSION_STRING
#define WPFatal(cond,...)
#define TC_LOG_DEBUG(filterType__, message__,...)
#define TC_LOG_ERROR(filterType__, message__,...)
#define TC_LOG_INFO(filterType__, message__,...)
#define TC_LOG_WARN(filterType__, message__,...)
static bool Execute(MySQLConnection *conn, char const *sql)
static QueryResult Query(MySQLConnection *conn, char const *sql)
uint32 OpenConnections(InternalIndex type, uint8 numConnections)
void CommitTransaction(SQLTransaction< T > transaction)
QueryResult Query(char const *sql, T *connection=nullptr)
bool PrepareStatements()
Prepares all prepared statements.
T * GetAsyncConnectionForCurrentThread() const
void SetConnectionInfo(std::string const &infoString, uint8 const asyncThreads, uint8 const synchThreads)
static void WarnAboutSyncQueries(bool warn)
void ExecuteOrAppend(SQLTransaction< T > &trans, char const *sql)
std::atomic< size_t > _queueSize
SQLTransaction< T > BeginTransaction()
Begins an automanaged transaction pointer that will automatically rollback if not commited....
void DirectCommitTransaction(SQLTransaction< T > &transaction)
void KeepAlive()
Keeps all our MySQL connections alive, prevent the server from disconnecting us.
T::Statements PreparedStatementIndex
char const * GetDatabaseName() const
PreparedStatement< T > * GetPreparedStatement(PreparedStatementIndex index)
void DirectExecute(char const *sql)
SQLQueryHolderCallback DelayQueryHolder(std::shared_ptr< SQLQueryHolder< T > > holder)
void EscapeString(std::string &str)
Apply escape string'ing for current collation. (utf8)
void Execute(char const *sql)
QueryCallback AsyncQuery(char const *sql)
TransactionCallback AsyncCommitTransaction(SQLTransaction< T > transaction)
static PreparedQueryResult Query(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, PreparedStatementBase *stmt)
static bool Execute(MySQLConnection *conn, SQLQueryHolderBase *holder)
static bool Execute(MySQLConnection *conn, std::shared_ptr< TransactionBase > trans)
void Execute(Creature *me, EventMap &events, uint32 eventId)
decltype(auto) post(boost::asio::io_context &ioContext, T &&t)
constexpr std::size_t size()
QueueSizeTracker & operator=(QueueSizeTracker const &other)
QueueSizeTracker(DatabaseWorkerPool *pool)
QueueSizeTracker(QueueSizeTracker &&other) noexcept
QueueSizeTracker & operator=(QueueSizeTracker &&other) noexcept
QueueSizeTracker(QueueSizeTracker const &other)
DatabaseWorkerPool * _pool