|
|
//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
// $NoKeywords: $
//=============================================================================//
// Nasty headers!
#include "MySqlDatabase.h"
#include "tier1/strtools.h"
#include "vmpi.h"
#include "vmpi_dispatch.h"
#include "mpi_stats.h"
#include "cmdlib.h"
#include "imysqlwrapper.h"
#include "threadhelpers.h"
#include "vmpi_tools_shared.h"
#include "tier0/icommandline.h"
/*
-- MySQL code to create the databases, create the users, and set access privileges. -- You only need to ever run this once.
create database vrad;
use mysql;
create user vrad_worker; create user vmpi_browser;
-- This updates the "user" table, which is checked when someone tries to connect to the database. grant select,insert,update on vrad.* to vrad_worker; grant select on vrad.* to vmpi_browser; flush privileges;
/*
-- SQL code to (re)create the tables.
-- Master generates a unique job ID (in job_master_start) and sends it to workers. -- Each worker (and the master) make a job_worker_start, link it to the primary job ID, -- get their own unique ID, which represents that process in that job. -- All JobWorkerID fields link to the JobWorkerID field in job_worker_start.
-- NOTE: do a "use vrad" or "use vvis" first, depending on the DB you want to create.
use vrad;
drop table job_master_start; create table job_master_start ( JobID INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, index id( JobID, MachineName(5) ), BSPFilename TINYTEXT NOT NULL, StartTime TIMESTAMP NOT NULL, MachineName TEXT NOT NULL, RunningTimeMS INTEGER UNSIGNED NOT NULL, NumWorkers INTEGER UNSIGNED NOT NULL default 0 );
drop table job_master_end; create table job_master_end ( JobID INTEGER UNSIGNED NOT NULL, PRIMARY KEY ( JobID ), NumWorkersConnected SMALLINT UNSIGNED NOT NULL, NumWorkersDisconnected SMALLINT UNSIGNED NOT NULL, ErrorText TEXT NOT NULL );
drop table job_worker_start; create table job_worker_start ( JobWorkerID INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, index index_jobid( JobID ), index index_jobworkerid( JobWorkerID ), JobID INTEGER UNSIGNED NOT NULL, -- links to job_master_start::JobID IsMaster BOOL NOT NULL, -- Set to 1 if this "worker" is the master process. RunningTimeMS INTEGER UNSIGNED NOT NULL default 0, MachineName TEXT NOT NULL, WorkerState SMALLINT UNSIGNED NOT NULL default 0, -- 0 = disconnected, 1 = connected NumWorkUnits INTEGER UNSIGNED NOT NULL default 0, -- how many work units this worker has completed CurrentStage TINYTEXT NOT NULL, -- which compile stage is it on Thread0WU INTEGER NOT NULL default 0, -- which WU thread 0 is on Thread1WU INTEGER NOT NULL default 0, -- which WU thread 1 is on Thread2WU INTEGER NOT NULL default 0, -- which WU thread 2 is on Thread3WU INTEGER NOT NULL default 0 -- which WU thread 3 is on );
drop table text_messages; create table text_messages ( JobWorkerID INTEGER UNSIGNED NOT NULL, index id( JobWorkerID, MessageIndex ), MessageIndex INTEGER UNSIGNED NOT NULL, Text TEXT NOT NULL );
drop table graph_entry; create table graph_entry ( JobWorkerID INTEGER UNSIGNED NOT NULL, index id( JobWorkerID ), MSSinceJobStart INTEGER UNSIGNED NOT NULL, BytesSent INTEGER UNSIGNED NOT NULL, BytesReceived INTEGER UNSIGNED NOT NULL );
drop table events; create table events ( JobWorkerID INTEGER UNSIGNED NOT NULL, index id( JobWorkerID ), Text TEXT NOT NULL ); */
// Stats set by the app.
int g_nWorkersConnected = 0; int g_nWorkersDisconnected = 0;
DWORD g_StatsStartTime;
CMySqlDatabase *g_pDB = NULL;
IMySQL *g_pSQL = NULL; CSysModule *g_hMySQLDLL = NULL;
char g_BSPFilename[256];
bool g_bMaster = false; unsigned long g_JobPrimaryID = 0; // This represents this job, but doesn't link to a particular machine.
unsigned long g_JobWorkerID = 0; // A unique key in the DB that represents this machine in this job.
char g_MachineName[MAX_COMPUTERNAME_LENGTH+1] = {0};
unsigned long g_CurrentMessageIndex = 0;
HANDLE g_hPerfThread = NULL; DWORD g_PerfThreadID = 0xFEFEFEFE; HANDLE g_hPerfThreadExitEvent = NULL;
// These are set by the app and they go into the database.
extern uint64 g_ThreadWUs[4];
extern uint64 VMPI_GetNumWorkUnitsCompleted( int iProc );
// ---------------------------------------------------------------------------------------------------- //
// This is a helper class to build queries like the stream IO.
// ---------------------------------------------------------------------------------------------------- //
class CMySQLQuery { friend class CMySQL;
public: // This is like a sprintf, but it will grow the string as necessary.
void Format( const char *pFormat, ... );
int Execute( IMySQL *pDB );
private: CUtlVector<char> m_QueryText; };
void CMySQLQuery::Format( const char *pFormat, ... ) { #define QUERYTEXT_GROWSIZE 1024
// This keeps growing the buffer and calling _vsnprintf until the buffer is
// large enough to hold all the data.
m_QueryText.SetSize( QUERYTEXT_GROWSIZE ); while ( 1 ) { va_list marker; va_start( marker, pFormat ); int ret = _vsnprintf( m_QueryText.Base(), m_QueryText.Count(), pFormat, marker ); va_end( marker );
if ( ret < 0 ) { m_QueryText.SetSize( m_QueryText.Count() + QUERYTEXT_GROWSIZE ); } else { m_QueryText[ m_QueryText.Count() - 1 ] = 0; break; } } }
int CMySQLQuery::Execute( IMySQL *pDB ) { int ret = pDB->Execute( m_QueryText.Base() ); m_QueryText.Purge(); return ret; }
// ---------------------------------------------------------------------------------------------------- //
// This inserts the necessary backslashes in front of backslashes or quote characters.
// ---------------------------------------------------------------------------------------------------- //
char* FormatStringForSQL( const char *pText ) { // First, count the quotes in the string. We need to put a backslash in front of each one.
int nChars = 0; const char *pCur = pText; while ( *pCur != 0 ) { if ( *pCur == '\"' || *pCur == '\\' ) ++nChars; ++pCur; ++nChars; }
pCur = pText; char *pRetVal = new char[nChars+1]; for ( int i=0; i < nChars; ) { if ( *pCur == '\"' || *pCur == '\\' ) pRetVal[i++] = '\\'; pRetVal[i++] = *pCur; ++pCur; } pRetVal[nChars] = 0;
return pRetVal; }
// -------------------------------------------------------------------------------- //
// Commands to add data to the database.
// -------------------------------------------------------------------------------- //
class CSQLDBCommandBase : public ISQLDBCommand { public: virtual ~CSQLDBCommandBase() { }
virtual void deleteThis() { delete this; } };
class CSQLDBCommand_WorkerStats : public CSQLDBCommandBase { public: virtual int RunCommand() { int nCurConnections = VMPI_GetCurrentNumberOfConnections();
// Update the NumWorkers entry.
char query[2048]; Q_snprintf( query, sizeof( query ), "update job_master_start set NumWorkers=%d where JobID=%lu", nCurConnections, g_JobPrimaryID ); g_pSQL->Execute( query );
// Update the job_master_worker_stats stuff.
for ( int i=1; i < nCurConnections; i++ ) { unsigned long jobWorkerID = VMPI_GetJobWorkerID( i );
if ( jobWorkerID != 0xFFFFFFFF ) { Q_snprintf( query, sizeof( query ), "update " "job_worker_start set WorkerState=%d, NumWorkUnits=%d where JobWorkerID=%lu", VMPI_IsProcConnected( i ), (int) VMPI_GetNumWorkUnitsCompleted( i ), VMPI_GetJobWorkerID( i ) ); g_pSQL->Execute( query ); } } return 1; } };
class CSQLDBCommand_JobMasterEnd : public CSQLDBCommandBase { public: virtual int RunCommand() { CMySQLQuery query; query.Format( "insert into job_master_end values ( %lu, %d, %d, \"no errors\" )", g_JobPrimaryID, g_nWorkersConnected, g_nWorkersDisconnected ); query.Execute( g_pSQL );
// Now set RunningTimeMS.
unsigned long runningTimeMS = GetTickCount() - g_StatsStartTime; query.Format( "update job_master_start set RunningTimeMS=%lu where JobID=%lu", runningTimeMS, g_JobPrimaryID ); query.Execute( g_pSQL ); return 1; } };
void UpdateJobWorkerRunningTime() { unsigned long runningTimeMS = GetTickCount() - g_StatsStartTime; char curStage[256]; VMPI_GetCurrentStage( curStage, sizeof( curStage ) ); CMySQLQuery query; query.Format( "update job_worker_start set RunningTimeMS=%lu, CurrentStage=\"%s\", " "Thread0WU=%d, Thread1WU=%d, Thread2WU=%d, Thread3WU=%d where JobWorkerID=%lu", runningTimeMS, curStage, (int) g_ThreadWUs[0], (int) g_ThreadWUs[1], (int) g_ThreadWUs[2], (int) g_ThreadWUs[3], g_JobWorkerID ); query.Execute( g_pSQL ); }
class CSQLDBCommand_GraphEntry : public CSQLDBCommandBase { public: CSQLDBCommand_GraphEntry( DWORD msTime, DWORD nBytesSent, DWORD nBytesReceived ) { m_msTime = msTime; m_nBytesSent = nBytesSent; m_nBytesReceived = nBytesReceived; }
virtual int RunCommand() { CMySQLQuery query; query.Format( "insert into graph_entry (JobWorkerID, MSSinceJobStart, BytesSent, BytesReceived) " "values ( %lu, %lu, %lu, %lu )", g_JobWorkerID, m_msTime, m_nBytesSent, m_nBytesReceived ); query.Execute( g_pSQL );
UpdateJobWorkerRunningTime();
++g_CurrentMessageIndex; return 1; }
DWORD m_nBytesSent; DWORD m_nBytesReceived; DWORD m_msTime; };
class CSQLDBCommand_TextMessage : public CSQLDBCommandBase { public: CSQLDBCommand_TextMessage( const char *pText ) { m_pText = FormatStringForSQL( pText ); }
virtual ~CSQLDBCommand_TextMessage() { delete [] m_pText; }
virtual int RunCommand() { CMySQLQuery query; query.Format( "insert into text_messages (JobWorkerID, MessageIndex, Text) values ( %lu, %lu, \"%s\" )", g_JobWorkerID, g_CurrentMessageIndex, m_pText ); query.Execute( g_pSQL );
++g_CurrentMessageIndex; return 1; }
char *m_pText; };
// -------------------------------------------------------------------------------- //
// Internal helpers.
// -------------------------------------------------------------------------------- //
// This is the spew output before it has connected to the MySQL database.
CCriticalSection g_SpewTextCS; CUtlVector<char> g_SpewText( 1024 );
void VMPI_Stats_SpewHook( const char *pMsg ) { CCriticalSectionLock csLock( &g_SpewTextCS ); csLock.Lock();
// Queue the text up so we can send it to the DB right away when we connect.
g_SpewText.AddMultipleToTail( strlen( pMsg ), pMsg ); }
void PerfThread_SendSpewText() { // Send the spew text to the database.
CCriticalSectionLock csLock( &g_SpewTextCS ); csLock.Lock(); if ( g_SpewText.Count() > 0 ) { g_SpewText.AddToTail( 0 ); if ( g_bMPI_StatsTextOutput ) { g_pDB->AddCommandToQueue( new CSQLDBCommand_TextMessage( g_SpewText.Base() ), NULL ); } else { // Just show one message in the vmpi_job_watch window to let them know that they need
// to use a command line option to get the output.
static bool bFirst = true; if ( bFirst ) { char msg[512]; V_snprintf( msg, sizeof( msg ), "%s not enabled", VMPI_GetParamString( mpi_Stats_TextOutput ) ); bFirst = false; g_pDB->AddCommandToQueue( new CSQLDBCommand_TextMessage( msg ), NULL ); } } g_SpewText.RemoveAll(); }
csLock.Unlock(); }
void PerfThread_AddGraphEntry( DWORD startTicks, DWORD &lastSent, DWORD &lastReceived ) { // Send the graph entry with data transmission info.
DWORD curSent = g_nBytesSent + g_nMulticastBytesSent; DWORD curReceived = g_nBytesReceived + g_nMulticastBytesReceived;
g_pDB->AddCommandToQueue( new CSQLDBCommand_GraphEntry( GetTickCount() - startTicks, curSent - lastSent, curReceived - lastReceived ), NULL );
lastSent = curSent; lastReceived = curReceived; }
// This function adds a graph_entry into the database periodically.
DWORD WINAPI PerfThreadFn( LPVOID pParameter ) { DWORD lastSent = 0; DWORD lastReceived = 0; DWORD startTicks = GetTickCount();
while ( WaitForSingleObject( g_hPerfThreadExitEvent, 1000 ) != WAIT_OBJECT_0 ) { PerfThread_AddGraphEntry( startTicks, lastSent, lastReceived );
// Send updates for text output.
PerfThread_SendSpewText();
// If we're the master, update all the worker stats.
if ( g_bMaster ) { g_pDB->AddCommandToQueue( new CSQLDBCommand_WorkerStats, NULL ); } }
// Add the remaining text and one last graph entry (which will include the current stage info).
PerfThread_SendSpewText(); PerfThread_AddGraphEntry( startTicks, lastSent, lastReceived );
SetEvent( g_hPerfThreadExitEvent ); return 0; }
// -------------------------------------------------------------------------------- //
// VMPI_Stats interface.
// -------------------------------------------------------------------------------- //
void VMPI_Stats_InstallSpewHook() { InstallExtraSpewHook( VMPI_Stats_SpewHook ); }
void UnloadMySQLWrapper() { if ( g_hMySQLDLL ) { if ( g_pSQL ) { g_pSQL->Release(); g_pSQL = NULL; } Sys_UnloadModule( g_hMySQLDLL ); g_hMySQLDLL = NULL; } }
bool LoadMySQLWrapper( const char *pHostName, const char *pDBName, const char *pUserName ) { UnloadMySQLWrapper();
// Load the DLL and the interface.
if ( !Sys_LoadInterface( "mysql_wrapper", MYSQL_WRAPPER_VERSION_NAME, &g_hMySQLDLL, (void**)&g_pSQL ) ) return false;
// Try to init the database.
if ( !g_pSQL->InitMySQL( pDBName, pHostName, pUserName ) ) { UnloadMySQLWrapper(); return false; }
return true; }
bool VMPI_Stats_Init_Master( const char *pHostName, const char *pDBName, const char *pUserName, const char *pBSPFilename, unsigned long *pDBJobID ) { Assert( !g_pDB );
g_bMaster = true; // Connect the database.
g_pDB = new CMySqlDatabase; if ( !g_pDB || !g_pDB->Initialize() || !LoadMySQLWrapper( pHostName, pDBName, pUserName ) ) { delete g_pDB; g_pDB = NULL; return false; }
DWORD size = sizeof( g_MachineName ); GetComputerName( g_MachineName, &size );
// Create the job_master_start row.
Q_FileBase( pBSPFilename, g_BSPFilename, sizeof( g_BSPFilename ) );
g_JobPrimaryID = 0; CMySQLQuery query; query.Format( "insert into job_master_start ( BSPFilename, StartTime, MachineName, RunningTimeMS ) values ( \"%s\", null, \"%s\", %lu )", g_BSPFilename, g_MachineName, RUNNINGTIME_MS_SENTINEL ); query.Execute( g_pSQL );
g_JobPrimaryID = g_pSQL->InsertID(); if ( g_JobPrimaryID == 0 ) { delete g_pDB; g_pDB = NULL; return false; }
// Now init the worker portion.
*pDBJobID = g_JobPrimaryID; return VMPI_Stats_Init_Worker( NULL, NULL, NULL, g_JobPrimaryID ); }
bool VMPI_Stats_Init_Worker( const char *pHostName, const char *pDBName, const char *pUserName, unsigned long DBJobID ) { g_StatsStartTime = GetTickCount(); // If pDBServerName is null, then we're the master and we just want to make the job_worker_start entry.
if ( pHostName ) { Assert( !g_pDB ); // Connect the database.
g_pDB = new CMySqlDatabase; if ( !g_pDB || !g_pDB->Initialize() || !LoadMySQLWrapper( pHostName, pDBName, pUserName ) ) { delete g_pDB; g_pDB = NULL; return false; } // Get our machine name to store in the database.
DWORD size = sizeof( g_MachineName ); GetComputerName( g_MachineName, &size ); }
g_JobPrimaryID = DBJobID; g_JobWorkerID = 0;
CMySQLQuery query; query.Format( "insert into job_worker_start ( JobID, CurrentStage, IsMaster, MachineName ) values ( %lu, \"none\", %d, \"%s\" )", g_JobPrimaryID, g_bMaster, g_MachineName ); query.Execute( g_pSQL ); g_JobWorkerID = g_pSQL->InsertID(); if ( g_JobWorkerID == 0 ) { delete g_pDB; g_pDB = NULL; return false; }
// Now create a thread that samples perf data and stores it in the database.
g_hPerfThreadExitEvent = CreateEvent( NULL, FALSE, FALSE, NULL ); g_hPerfThread = CreateThread( NULL, 0, PerfThreadFn, NULL, 0, &g_PerfThreadID );
return true; }
void VMPI_Stats_Term() { if ( !g_pDB ) return;
// Stop the thread.
SetEvent( g_hPerfThreadExitEvent ); WaitForSingleObject( g_hPerfThread, INFINITE ); CloseHandle( g_hPerfThreadExitEvent ); g_hPerfThreadExitEvent = NULL;
CloseHandle( g_hPerfThread ); g_hPerfThread = NULL;
if ( g_bMaster ) { // (Write a job_master_end entry here).
g_pDB->AddCommandToQueue( new CSQLDBCommand_JobMasterEnd, NULL ); }
// Wait for up to a second for the DB to finish writing its data.
DWORD startTime = GetTickCount(); while ( GetTickCount() - startTime < 1000 ) { if ( g_pDB->QueriesInOutQueue() == 0 ) break; }
delete g_pDB; g_pDB = NULL;
UnloadMySQLWrapper(); }
static bool ReadStringFromFile( FILE *fp, char *pStr, int strSize ) { int i=0; for ( i; i < strSize-2; i++ ) { if ( fread( &pStr[i], 1, 1, fp ) != 1 || pStr[i] == '\n' ) { break; } }
pStr[i] = 0; return i != 0; }
// This looks for pDBInfoFilename in the same path as pBaseExeFilename.
// The file has 3 lines: machine name (with database), database name, username
void GetDBInfo( const char *pDBInfoFilename, CDBInfo *pInfo ) { char baseExeFilename[512]; if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) Error( "GetModuleFileName failed." ); // Look for the info file in the same directory as the exe.
char dbInfoFilename[512]; Q_strncpy( dbInfoFilename, baseExeFilename, sizeof( dbInfoFilename ) ); Q_StripFilename( dbInfoFilename );
if ( dbInfoFilename[0] == 0 ) Q_strncpy( dbInfoFilename, ".", sizeof( dbInfoFilename ) );
Q_strncat( dbInfoFilename, "/", sizeof( dbInfoFilename ), COPY_ALL_CHARACTERS ); Q_strncat( dbInfoFilename, pDBInfoFilename, sizeof( dbInfoFilename ), COPY_ALL_CHARACTERS );
FILE *fp = fopen( dbInfoFilename, "rt" ); if ( !fp ) { Error( "Can't open %s for database info.\n", dbInfoFilename ); }
if ( !ReadStringFromFile( fp, pInfo->m_HostName, sizeof( pInfo->m_HostName ) ) || !ReadStringFromFile( fp, pInfo->m_DBName, sizeof( pInfo->m_DBName ) ) || !ReadStringFromFile( fp, pInfo->m_UserName, sizeof( pInfo->m_UserName ) ) ) { Error( "%s is not a valid database info file.\n", dbInfoFilename ); }
fclose( fp ); }
void RunJobWatchApp( char *pCmdLine ) { STARTUPINFO si; memset( &si, 0, sizeof( si ) ); si.cb = sizeof( si );
PROCESS_INFORMATION pi; memset( &pi, 0, sizeof( pi ) );
// Working directory should be the same as our exe's directory.
char dirName[512]; if ( GetModuleFileName( NULL, dirName, sizeof( dirName ) ) != 0 ) { char *s1 = V_strrchr( dirName, '\\' ); char *s2 = V_strrchr( dirName, '/' ); if ( s1 || s2 ) { // Get rid of the last slash.
s1 = max( s1, s2 ); s1[0] = 0; if ( !CreateProcess( NULL, pCmdLine, NULL, // security
NULL, TRUE, 0, // flags
NULL, // environment
dirName, // current directory
&si, &pi ) ) { Warning( "%s - error launching '%s'\n", VMPI_GetParamString( mpi_Job_Watch ), pCmdLine ); } } } }
void StatsDB_InitStatsDatabase( int argc, char **argv, const char *pDBInfoFilename ) { // Did they disable the stats database?
if ( !g_bMPI_Stats && !VMPI_IsParamUsed( mpi_Job_Watch ) ) return;
unsigned long jobPrimaryID;
// Now open the DB.
if ( g_bMPIMaster ) { CDBInfo dbInfo; GetDBInfo( pDBInfoFilename, &dbInfo );
if ( !VMPI_Stats_Init_Master( dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, argv[argc-1], &jobPrimaryID ) ) { Warning( "VMPI_Stats_Init_Master( %s, %s, %s ) failed.\n", dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName );
// Tell the workers not to use stats.
dbInfo.m_HostName[0] = 0; }
char cmdLine[2048]; Q_snprintf( cmdLine, sizeof( cmdLine ), "vmpi_job_watch -JobID %d", jobPrimaryID ); Msg( "\nTo watch this job, run this command line:\n%s\n\n", cmdLine ); if ( VMPI_IsParamUsed( mpi_Job_Watch ) ) { // Convenience thing to automatically launch the job watch for this job.
RunJobWatchApp( cmdLine ); }
// Send the database info to all the workers.
SendDBInfo( &dbInfo, jobPrimaryID ); } else { // Wait to get DB info so we can connect to the MySQL database.
CDBInfo dbInfo; unsigned long jobPrimaryID; RecvDBInfo( &dbInfo, &jobPrimaryID ); if ( dbInfo.m_HostName[0] != 0 ) { if ( !VMPI_Stats_Init_Worker( dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, jobPrimaryID ) ) Error( "VMPI_Stats_Init_Worker( %s, %s, %s, %d ) failed.\n", dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, jobPrimaryID ); } } }
unsigned long StatsDB_GetUniqueJobID() { return g_JobPrimaryID; }
unsigned long VMPI_Stats_GetJobWorkerID() { return g_JobWorkerID; }
|