Counter Strike : Global Offensive Source Code
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

550 lines
15 KiB

  1. // network.cpp - written and placed in the public domain by Wei Dai
  2. #include "pch.h"
  3. #include "network.h"
  4. #include "wait.h"
  5. #define CRYPTOPP_TRACE_NETWORK 0
  6. NAMESPACE_BEGIN(CryptoPP)
  7. #ifdef HIGHRES_TIMER_AVAILABLE
  8. lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
  9. {
  10. if (!m_maxBytesPerSecond)
  11. return ULONG_MAX;
  12. double curTime = GetCurTimeAndCleanUp();
  13. lword total = 0;
  14. for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
  15. total += m_ops[i].second;
  16. return SaturatingSubtract(m_maxBytesPerSecond, total);
  17. }
  18. double LimitedBandwidth::TimeToNextTransceive()
  19. {
  20. if (!m_maxBytesPerSecond)
  21. return 0;
  22. if (!m_nextTransceiveTime)
  23. ComputeNextTransceiveTime();
  24. return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
  25. }
  26. void LimitedBandwidth::NoteTransceive(lword size)
  27. {
  28. if (m_maxBytesPerSecond)
  29. {
  30. double curTime = GetCurTimeAndCleanUp();
  31. m_ops.push_back(std::make_pair(curTime, size));
  32. m_nextTransceiveTime = 0;
  33. }
  34. }
  35. void LimitedBandwidth::ComputeNextTransceiveTime()
  36. {
  37. double curTime = GetCurTimeAndCleanUp();
  38. lword total = 0;
  39. for (unsigned int i=0; i!=m_ops.size(); ++i)
  40. total += m_ops[i].second;
  41. m_nextTransceiveTime =
  42. (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
  43. }
  44. double LimitedBandwidth::GetCurTimeAndCleanUp()
  45. {
  46. if (!m_maxBytesPerSecond)
  47. return 0;
  48. double curTime = m_timer.ElapsedTimeAsDouble();
  49. while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
  50. m_ops.pop_front();
  51. return curTime;
  52. }
  53. void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
  54. {
  55. double nextTransceiveTime = TimeToNextTransceive();
  56. if (nextTransceiveTime)
  57. container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
  58. }
  59. // *************************************************************
  60. size_t NonblockingSource::GeneralPump2(
  61. lword& byteCount, bool blockingOutput,
  62. unsigned long maxTime, bool checkDelimiter, byte delimiter)
  63. {
  64. m_blockedBySpeedLimit = false;
  65. if (!GetMaxBytesPerSecond())
  66. {
  67. size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
  68. m_doPumpBlocked = (ret != 0);
  69. return ret;
  70. }
  71. bool forever = (maxTime == INFINITE_TIME);
  72. unsigned long timeToGo = maxTime;
  73. Timer timer(Timer::MILLISECONDS, forever);
  74. lword maxSize = byteCount;
  75. byteCount = 0;
  76. timer.StartTimer();
  77. while (true)
  78. {
  79. lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
  80. if (curMaxSize || m_doPumpBlocked)
  81. {
  82. if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  83. size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
  84. m_doPumpBlocked = (ret != 0);
  85. if (curMaxSize)
  86. {
  87. NoteTransceive(curMaxSize);
  88. byteCount += curMaxSize;
  89. }
  90. if (ret)
  91. return ret;
  92. }
  93. if (maxSize != ULONG_MAX && byteCount >= maxSize)
  94. break;
  95. if (!forever)
  96. {
  97. timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  98. if (!timeToGo)
  99. break;
  100. }
  101. double waitTime = TimeToNextTransceive();
  102. if (!forever && waitTime > timeToGo)
  103. {
  104. m_blockedBySpeedLimit = true;
  105. break;
  106. }
  107. WaitObjectContainer container;
  108. LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
  109. container.Wait((unsigned long)waitTime);
  110. }
  111. return 0;
  112. }
  113. size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
  114. {
  115. if (messageCount == 0)
  116. return 0;
  117. messageCount = 0;
  118. lword byteCount;
  119. do {
  120. byteCount = LWORD_MAX;
  121. RETURN_IF_NONZERO(Pump2(byteCount, blocking));
  122. } while(byteCount == LWORD_MAX);
  123. if (!m_messageEndSent && SourceExhausted())
  124. {
  125. RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
  126. m_messageEndSent = true;
  127. messageCount = 1;
  128. }
  129. return 0;
  130. }
  131. lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
  132. {
  133. m_blockedBySpeedLimit = false;
  134. size_t curBufSize = GetCurrentBufferSize();
  135. if (curBufSize <= targetSize && (targetSize || !EofPending()))
  136. return 0;
  137. if (!GetMaxBytesPerSecond())
  138. return DoFlush(maxTime, targetSize);
  139. bool forever = (maxTime == INFINITE_TIME);
  140. unsigned long timeToGo = maxTime;
  141. Timer timer(Timer::MILLISECONDS, forever);
  142. lword totalFlushed = 0;
  143. timer.StartTimer();
  144. while (true)
  145. {
  146. size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
  147. if (flushSize || EofPending())
  148. {
  149. if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  150. size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
  151. if (ret)
  152. {
  153. NoteTransceive(ret);
  154. curBufSize -= ret;
  155. totalFlushed += ret;
  156. }
  157. }
  158. if (curBufSize <= targetSize && (targetSize || !EofPending()))
  159. break;
  160. if (!forever)
  161. {
  162. timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  163. if (!timeToGo)
  164. break;
  165. }
  166. double waitTime = TimeToNextTransceive();
  167. if (!forever && waitTime > timeToGo)
  168. {
  169. m_blockedBySpeedLimit = true;
  170. break;
  171. }
  172. WaitObjectContainer container;
  173. LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
  174. container.Wait((unsigned long)waitTime);
  175. }
  176. return totalFlushed;
  177. }
  178. bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
  179. {
  180. TimedFlush(blocking ? INFINITE_TIME : 0);
  181. return hardFlush && (!!GetCurrentBufferSize() || EofPending());
  182. }
  183. // *************************************************************
  184. NetworkSource::NetworkSource(BufferedTransformation *attachment)
  185. : NonblockingSource(attachment), m_buf(1024*16)
  186. , m_waitingForResult(false), m_outputBlocked(false)
  187. , m_dataBegin(0), m_dataEnd(0)
  188. {
  189. }
  190. unsigned int NetworkSource::GetMaxWaitObjectCount() const
  191. {
  192. return LimitedBandwidth::GetMaxWaitObjectCount()
  193. + GetReceiver().GetMaxWaitObjectCount()
  194. + AttachedTransformation()->GetMaxWaitObjectCount();
  195. }
  196. void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
  197. {
  198. if (BlockedBySpeedLimit())
  199. LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
  200. else if (!m_outputBlocked)
  201. {
  202. if (m_dataBegin == m_dataEnd)
  203. AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
  204. else
  205. container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
  206. }
  207. AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
  208. }
  209. size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
  210. {
  211. NetworkReceiver &receiver = AccessReceiver();
  212. lword maxSize = byteCount;
  213. byteCount = 0;
  214. bool forever = maxTime == INFINITE_TIME;
  215. Timer timer(Timer::MILLISECONDS, forever);
  216. BufferedTransformation *t = AttachedTransformation();
  217. if (m_outputBlocked)
  218. goto DoOutput;
  219. while (true)
  220. {
  221. if (m_dataBegin == m_dataEnd)
  222. {
  223. if (receiver.EofReceived())
  224. break;
  225. if (m_waitingForResult)
  226. {
  227. if (receiver.MustWaitForResult() &&
  228. !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  229. CallStack("NetworkSource::DoPump() - wait receive result", 0)))
  230. break;
  231. unsigned int recvResult = receiver.GetReceiveResult();
  232. #if CRYPTOPP_TRACE_NETWORK
  233. OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
  234. #endif
  235. m_dataEnd += recvResult;
  236. m_waitingForResult = false;
  237. if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
  238. goto ReceiveNoWait;
  239. }
  240. else
  241. {
  242. m_dataEnd = m_dataBegin = 0;
  243. if (receiver.MustWaitToReceive())
  244. {
  245. if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  246. CallStack("NetworkSource::DoPump() - wait receive", 0)))
  247. break;
  248. receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
  249. m_waitingForResult = true;
  250. }
  251. else
  252. {
  253. ReceiveNoWait:
  254. m_waitingForResult = true;
  255. // call Receive repeatedly as long as data is immediately available,
  256. // because some receivers tend to return data in small pieces
  257. #if CRYPTOPP_TRACE_NETWORK
  258. OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
  259. #endif
  260. while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
  261. {
  262. unsigned int recvResult = receiver.GetReceiveResult();
  263. #if CRYPTOPP_TRACE_NETWORK
  264. OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
  265. #endif
  266. m_dataEnd += recvResult;
  267. if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
  268. {
  269. m_waitingForResult = false;
  270. break;
  271. }
  272. }
  273. }
  274. }
  275. }
  276. else
  277. {
  278. m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
  279. if (checkDelimiter)
  280. m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
  281. DoOutput:
  282. size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
  283. if (result)
  284. {
  285. if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  286. CallStack("NetworkSource::DoPump() - wait attachment", 0)))
  287. goto DoOutput;
  288. else
  289. {
  290. m_outputBlocked = true;
  291. return result;
  292. }
  293. }
  294. m_outputBlocked = false;
  295. byteCount += m_putSize;
  296. m_dataBegin += m_putSize;
  297. if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
  298. break;
  299. if (maxSize != ULONG_MAX && byteCount == maxSize)
  300. break;
  301. // once time limit is reached, return even if there is more data waiting
  302. // but make 0 a special case so caller can request a large amount of data to be
  303. // pumped as long as it is immediately available
  304. if (maxTime > 0 && timer.ElapsedTime() > maxTime)
  305. break;
  306. }
  307. }
  308. return 0;
  309. }
  310. // *************************************************************
  311. NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
  312. : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
  313. , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
  314. , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
  315. , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
  316. , m_currentSpeed(0), m_maxObservedSpeed(0)
  317. {
  318. }
  319. float NetworkSink::ComputeCurrentSpeed()
  320. {
  321. if (m_speedTimer.ElapsedTime() > 1000)
  322. {
  323. m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
  324. m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
  325. m_byteCountSinceLastTimerReset = 0;
  326. m_speedTimer.StartTimer();
  327. // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
  328. }
  329. return m_currentSpeed;
  330. }
  331. float NetworkSink::GetMaxObservedSpeed() const
  332. {
  333. lword m = GetMaxBytesPerSecond();
  334. return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
  335. }
  336. unsigned int NetworkSink::GetMaxWaitObjectCount() const
  337. {
  338. return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
  339. }
  340. void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
  341. {
  342. if (BlockedBySpeedLimit())
  343. LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
  344. else if (m_wasBlocked)
  345. AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
  346. else if (!m_buffer.IsEmpty())
  347. AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
  348. else if (EofPending())
  349. AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
  350. }
  351. size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
  352. {
  353. if (m_eofState == EOF_DONE)
  354. {
  355. if (length || messageEnd)
  356. throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
  357. return 0;
  358. }
  359. if (m_eofState > EOF_NONE)
  360. goto EofSite;
  361. {
  362. if (m_skipBytes)
  363. {
  364. assert(length >= m_skipBytes);
  365. inString += m_skipBytes;
  366. length -= m_skipBytes;
  367. }
  368. m_buffer.Put(inString, length);
  369. if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
  370. TimedFlush(0, 0);
  371. size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
  372. if (blocking)
  373. TimedFlush(INFINITE_TIME, targetSize);
  374. if (m_buffer.CurrentSize() > targetSize)
  375. {
  376. assert(!blocking);
  377. m_wasBlocked = true;
  378. m_skipBytes += length;
  379. size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
  380. return STDMAX<size_t>(blockedBytes, 1);
  381. }
  382. m_wasBlocked = false;
  383. m_skipBytes = 0;
  384. }
  385. if (messageEnd)
  386. {
  387. m_eofState = EOF_PENDING_SEND;
  388. EofSite:
  389. TimedFlush(blocking ? INFINITE_TIME : 0, 0);
  390. if (m_eofState != EOF_DONE)
  391. return 1;
  392. }
  393. return 0;
  394. }
  395. lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
  396. {
  397. NetworkSender &sender = AccessSender();
  398. bool forever = maxTime == INFINITE_TIME;
  399. Timer timer(Timer::MILLISECONDS, forever);
  400. unsigned int totalFlushSize = 0;
  401. while (true)
  402. {
  403. if (m_buffer.CurrentSize() <= targetSize)
  404. break;
  405. if (m_needSendResult)
  406. {
  407. if (sender.MustWaitForResult() &&
  408. !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  409. CallStack("NetworkSink::DoFlush() - wait send result", 0)))
  410. break;
  411. unsigned int sendResult = sender.GetSendResult();
  412. #if CRYPTOPP_TRACE_NETWORK
  413. OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
  414. #endif
  415. m_buffer.Skip(sendResult);
  416. totalFlushSize += sendResult;
  417. m_needSendResult = false;
  418. if (!m_buffer.AnyRetrievable())
  419. break;
  420. }
  421. unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
  422. if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
  423. break;
  424. size_t contiguousSize = 0;
  425. const byte *block = m_buffer.Spy(contiguousSize);
  426. #if CRYPTOPP_TRACE_NETWORK
  427. OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
  428. #endif
  429. sender.Send(block, contiguousSize);
  430. m_needSendResult = true;
  431. if (maxTime > 0 && timeOut == 0)
  432. break; // once time limit is reached, return even if there is more data waiting
  433. }
  434. m_byteCountSinceLastTimerReset += totalFlushSize;
  435. ComputeCurrentSpeed();
  436. if (m_buffer.IsEmpty() && !m_needSendResult)
  437. {
  438. if (m_eofState == EOF_PENDING_SEND)
  439. {
  440. sender.SendEof();
  441. m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
  442. }
  443. while (m_eofState == EOF_PENDING_DELIVERY)
  444. {
  445. unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
  446. if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
  447. break;
  448. if (sender.EofSent())
  449. m_eofState = EOF_DONE;
  450. }
  451. }
  452. return totalFlushSize;
  453. }
  454. #endif // #ifdef HIGHRES_TIMER_AVAILABLE
  455. NAMESPACE_END