|
|
// wait.cpp - written and placed in the public domain by Wei Dai
#include "pch.h"
#include "wait.h"
#include "misc.h"
#ifdef SOCKETS_AVAILABLE
#ifdef USE_BERKELEY_STYLE_SOCKETS
#include <errno.h>
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#endif
NAMESPACE_BEGIN(CryptoPP)
unsigned int WaitObjectContainer::MaxWaitObjects() { #ifdef USE_WINDOWS_STYLE_SOCKETS
return MAXIMUM_WAIT_OBJECTS * (MAXIMUM_WAIT_OBJECTS-1); #else
return FD_SETSIZE; #endif
}
WaitObjectContainer::WaitObjectContainer(WaitObjectsTracer* tracer) : m_tracer(tracer), m_eventTimer(Timer::MILLISECONDS) , m_sameResultCount(0), m_noWaitTimer(Timer::MILLISECONDS) { Clear(); m_eventTimer.StartTimer(); }
void WaitObjectContainer::Clear() { #ifdef USE_WINDOWS_STYLE_SOCKETS
m_handles.clear(); #else
m_maxFd = 0; FD_ZERO(&m_readfds); FD_ZERO(&m_writefds); #endif
m_noWait = false; m_firstEventTime = 0; }
inline void WaitObjectContainer::SetLastResult(LastResultType result) { if (result == m_lastResult) m_sameResultCount++; else { m_lastResult = result; m_sameResultCount = 0; } }
void WaitObjectContainer::DetectNoWait(LastResultType result, CallStack const& callStack) { if (result == m_lastResult && m_noWaitTimer.ElapsedTime() > 1000) { if (m_sameResultCount > m_noWaitTimer.ElapsedTime()) { if (m_tracer) { std::string desc = "No wait loop detected - m_lastResult: "; desc.append(IntToString(m_lastResult)).append(", call stack:"); for (CallStack const* cs = &callStack; cs; cs = cs->Prev()) desc.append("\n- ").append(cs->Format()); m_tracer->TraceNoWaitLoop(desc); } try { throw 0; } catch (...) {} // help debugger break
}
m_noWaitTimer.StartTimer(); m_sameResultCount = 0; } }
void WaitObjectContainer::SetNoWait(CallStack const& callStack) { DetectNoWait(LASTRESULT_NOWAIT, CallStack("WaitObjectContainer::SetNoWait()", &callStack)); m_noWait = true; }
void WaitObjectContainer::ScheduleEvent(double milliseconds, CallStack const& callStack) { if (milliseconds <= 3) DetectNoWait(LASTRESULT_SCHEDULED, CallStack("WaitObjectContainer::ScheduleEvent()", &callStack)); double thisEventTime = m_eventTimer.ElapsedTimeAsDouble() + milliseconds; if (!m_firstEventTime || thisEventTime < m_firstEventTime) m_firstEventTime = thisEventTime; }
#ifdef USE_WINDOWS_STYLE_SOCKETS
struct WaitingThreadData { bool waitingToWait, terminate; HANDLE startWaiting, stopWaiting; const HANDLE *waitHandles; unsigned int count; HANDLE threadHandle; DWORD threadId; DWORD* error; };
WaitObjectContainer::~WaitObjectContainer() { try // don't let exceptions escape destructor
{ if (!m_threads.empty()) { HANDLE threadHandles[MAXIMUM_WAIT_OBJECTS]; unsigned int i; for (i=0; i<m_threads.size(); i++) { WaitingThreadData &thread = *m_threads[i]; while (!thread.waitingToWait) // spin until thread is in the initial "waiting to wait" state
Sleep(0); thread.terminate = true; threadHandles[i] = thread.threadHandle; } PulseEvent(m_startWaiting); ::WaitForMultipleObjects((DWORD)m_threads.size(), threadHandles, TRUE, INFINITE); for (i=0; i<m_threads.size(); i++) CloseHandle(threadHandles[i]); CloseHandle(m_startWaiting); CloseHandle(m_stopWaiting); } } catch (...) { } }
void WaitObjectContainer::AddHandle(HANDLE handle, CallStack const& callStack) { DetectNoWait(m_handles.size(), CallStack("WaitObjectContainer::AddHandle()", &callStack)); m_handles.push_back(handle); }
DWORD WINAPI WaitingThread(LPVOID lParam) { std::auto_ptr<WaitingThreadData> pThread((WaitingThreadData *)lParam); WaitingThreadData &thread = *pThread; std::vector<HANDLE> handles;
while (true) { thread.waitingToWait = true; ::WaitForSingleObject(thread.startWaiting, INFINITE); thread.waitingToWait = false;
if (thread.terminate) break; if (!thread.count) continue;
handles.resize(thread.count + 1); handles[0] = thread.stopWaiting; std::copy(thread.waitHandles, thread.waitHandles+thread.count, handles.begin()+1);
DWORD result = ::WaitForMultipleObjects((DWORD)handles.size(), &handles[0], FALSE, INFINITE);
if (result == WAIT_OBJECT_0) continue; // another thread finished waiting first, so do nothing
SetEvent(thread.stopWaiting); if (!(result > WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + handles.size())) { assert(!"error in WaitingThread"); // break here so we can see which thread has an error
*thread.error = ::GetLastError(); } }
return S_OK; // return a value here to avoid compiler warning
}
void WaitObjectContainer::CreateThreads(unsigned int count) { size_t currentCount = m_threads.size(); if (currentCount == 0) { m_startWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL); m_stopWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL); }
if (currentCount < count) { m_threads.resize(count); for (size_t i=currentCount; i<count; i++) { m_threads[i] = new WaitingThreadData; WaitingThreadData &thread = *m_threads[i]; thread.terminate = false; thread.startWaiting = m_startWaiting; thread.stopWaiting = m_stopWaiting; thread.waitingToWait = false; thread.threadHandle = CreateThread(NULL, 0, &WaitingThread, &thread, 0, &thread.threadId); } } }
bool WaitObjectContainer::Wait(unsigned long milliseconds) { if (m_noWait || (m_handles.empty() && !m_firstEventTime)) { SetLastResult(LASTRESULT_NOWAIT); return true; }
bool timeoutIsScheduledEvent = false;
if (m_firstEventTime) { double timeToFirstEvent = SaturatingSubtract(m_firstEventTime, m_eventTimer.ElapsedTimeAsDouble());
if (timeToFirstEvent <= milliseconds) { milliseconds = (unsigned long)timeToFirstEvent; timeoutIsScheduledEvent = true; }
if (m_handles.empty() || !milliseconds) { if (milliseconds) Sleep(milliseconds); SetLastResult(timeoutIsScheduledEvent ? LASTRESULT_SCHEDULED : LASTRESULT_TIMEOUT); return timeoutIsScheduledEvent; } }
if (m_handles.size() > MAXIMUM_WAIT_OBJECTS) { // too many wait objects for a single WaitForMultipleObjects call, so use multiple threads
static const unsigned int WAIT_OBJECTS_PER_THREAD = MAXIMUM_WAIT_OBJECTS-1; unsigned int nThreads = (unsigned int)((m_handles.size() + WAIT_OBJECTS_PER_THREAD - 1) / WAIT_OBJECTS_PER_THREAD); if (nThreads > MAXIMUM_WAIT_OBJECTS) // still too many wait objects, maybe implement recursive threading later?
throw Err("WaitObjectContainer: number of wait objects exceeds limit"); CreateThreads(nThreads); DWORD error = S_OK; for (unsigned int i=0; i<m_threads.size(); i++) { WaitingThreadData &thread = *m_threads[i]; while (!thread.waitingToWait) // spin until thread is in the initial "waiting to wait" state
Sleep(0); if (i<nThreads) { thread.waitHandles = &m_handles[i*WAIT_OBJECTS_PER_THREAD]; thread.count = UnsignedMin(WAIT_OBJECTS_PER_THREAD, m_handles.size() - i*WAIT_OBJECTS_PER_THREAD); thread.error = &error; } else thread.count = 0; }
ResetEvent(m_stopWaiting); PulseEvent(m_startWaiting);
DWORD result = ::WaitForSingleObject(m_stopWaiting, milliseconds); if (result == WAIT_OBJECT_0) { if (error == S_OK) return true; else throw Err("WaitObjectContainer: WaitForMultipleObjects in thread failed with error " + IntToString(error)); } SetEvent(m_stopWaiting); if (result == WAIT_TIMEOUT) { SetLastResult(timeoutIsScheduledEvent ? LASTRESULT_SCHEDULED : LASTRESULT_TIMEOUT); return timeoutIsScheduledEvent; } else throw Err("WaitObjectContainer: WaitForSingleObject failed with error " + IntToString(::GetLastError())); } else { #if TRACE_WAIT
static Timer t(Timer::MICROSECONDS); static unsigned long lastTime = 0; unsigned long timeBeforeWait = t.ElapsedTime(); #endif
DWORD result = ::WaitForMultipleObjects((DWORD)m_handles.size(), &m_handles[0], FALSE, milliseconds); #if TRACE_WAIT
if (milliseconds > 0) { unsigned long timeAfterWait = t.ElapsedTime(); OutputDebugString(("Handles " + IntToString(m_handles.size()) + ", Woke up by " + IntToString(result-WAIT_OBJECT_0) + ", Busied for " + IntToString(timeBeforeWait-lastTime) + " us, Waited for " + IntToString(timeAfterWait-timeBeforeWait) + " us, max " + IntToString(milliseconds) + "ms\n").c_str()); lastTime = timeAfterWait; } #endif
if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + m_handles.size()) { if (result == m_lastResult) m_sameResultCount++; else { m_lastResult = result; m_sameResultCount = 0; } return true; } else if (result == WAIT_TIMEOUT) { SetLastResult(timeoutIsScheduledEvent ? LASTRESULT_SCHEDULED : LASTRESULT_TIMEOUT); return timeoutIsScheduledEvent; } else throw Err("WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(::GetLastError())); } }
#else // #ifdef USE_WINDOWS_STYLE_SOCKETS
void WaitObjectContainer::AddReadFd(int fd, CallStack const& callStack) // TODO: do something with callStack
{ FD_SET(fd, &m_readfds); m_maxFd = STDMAX(m_maxFd, fd); }
void WaitObjectContainer::AddWriteFd(int fd, CallStack const& callStack) // TODO: do something with callStack
{ FD_SET(fd, &m_writefds); m_maxFd = STDMAX(m_maxFd, fd); }
bool WaitObjectContainer::Wait(unsigned long milliseconds) { if (m_noWait || (!m_maxFd && !m_firstEventTime)) return true;
bool timeoutIsScheduledEvent = false;
if (m_firstEventTime) { double timeToFirstEvent = SaturatingSubtract(m_firstEventTime, m_eventTimer.ElapsedTimeAsDouble()); if (timeToFirstEvent <= milliseconds) { milliseconds = (unsigned long)timeToFirstEvent; timeoutIsScheduledEvent = true; } }
timeval tv, *timeout;
if (milliseconds == INFINITE_TIME) timeout = NULL; else { tv.tv_sec = milliseconds / 1000; tv.tv_usec = (milliseconds % 1000) * 1000; timeout = &tv; }
int result = select(m_maxFd+1, &m_readfds, &m_writefds, NULL, timeout);
if (result > 0) return true; else if (result == 0) return timeoutIsScheduledEvent; else throw Err("WaitObjectContainer: select failed with error " + errno); }
#endif
// ********************************************************
std::string CallStack::Format() const { return m_info; }
std::string CallStackWithNr::Format() const { return std::string(m_info) + " / nr: " + IntToString(m_nr); }
std::string CallStackWithStr::Format() const { return std::string(m_info) + " / " + std::string(m_z); }
bool Waitable::Wait(unsigned long milliseconds, CallStack const& callStack) { WaitObjectContainer container; GetWaitObjects(container, callStack); // reduce clutter by not adding this func to stack
return container.Wait(milliseconds); }
NAMESPACE_END
#endif
|