Team Fortress 2 Source Code as on 22/4/2020
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

553 lines
15 KiB

  1. // network.cpp - written and placed in the public domain by Wei Dai
  2. #include "pch.h"
  3. #include "network.h"
  4. #include "wait.h"
  5. #define CRYPTOPP_TRACE_NETWORK 0
  6. NAMESPACE_BEGIN(CryptoPP)
  7. #ifdef HIGHRES_TIMER_AVAILABLE
  8. lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
  9. {
  10. if (!m_maxBytesPerSecond)
  11. return ULONG_MAX;
  12. const double curTime = GetCurTimeAndCleanUp();
  13. CRYPTOPP_UNUSED(curTime);
  14. lword total = 0;
  15. for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
  16. total += m_ops[i].second;
  17. return SaturatingSubtract(m_maxBytesPerSecond, total);
  18. }
  19. double LimitedBandwidth::TimeToNextTransceive()
  20. {
  21. if (!m_maxBytesPerSecond)
  22. return 0;
  23. if (!m_nextTransceiveTime)
  24. ComputeNextTransceiveTime();
  25. return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
  26. }
  27. void LimitedBandwidth::NoteTransceive(lword size)
  28. {
  29. if (m_maxBytesPerSecond)
  30. {
  31. double curTime = GetCurTimeAndCleanUp();
  32. m_ops.push_back(std::make_pair(curTime, size));
  33. m_nextTransceiveTime = 0;
  34. }
  35. }
  36. void LimitedBandwidth::ComputeNextTransceiveTime()
  37. {
  38. double curTime = GetCurTimeAndCleanUp();
  39. lword total = 0;
  40. for (unsigned int i=0; i!=m_ops.size(); ++i)
  41. total += m_ops[i].second;
  42. m_nextTransceiveTime =
  43. (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
  44. }
  45. double LimitedBandwidth::GetCurTimeAndCleanUp()
  46. {
  47. if (!m_maxBytesPerSecond)
  48. return 0;
  49. double curTime = m_timer.ElapsedTimeAsDouble();
  50. while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
  51. m_ops.pop_front();
  52. return curTime;
  53. }
  54. void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
  55. {
  56. double nextTransceiveTime = TimeToNextTransceive();
  57. if (nextTransceiveTime)
  58. container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
  59. }
  60. // *************************************************************
  61. size_t NonblockingSource::GeneralPump2(
  62. lword& byteCount, bool blockingOutput,
  63. unsigned long maxTime, bool checkDelimiter, byte delimiter)
  64. {
  65. m_blockedBySpeedLimit = false;
  66. if (!GetMaxBytesPerSecond())
  67. {
  68. size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
  69. m_doPumpBlocked = (ret != 0);
  70. return ret;
  71. }
  72. bool forever = (maxTime == INFINITE_TIME);
  73. unsigned long timeToGo = maxTime;
  74. Timer timer(Timer::MILLISECONDS, forever);
  75. lword maxSize = byteCount;
  76. byteCount = 0;
  77. timer.StartTimer();
  78. while (true)
  79. {
  80. lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
  81. if (curMaxSize || m_doPumpBlocked)
  82. {
  83. if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  84. size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
  85. m_doPumpBlocked = (ret != 0);
  86. if (curMaxSize)
  87. {
  88. NoteTransceive(curMaxSize);
  89. byteCount += curMaxSize;
  90. }
  91. if (ret)
  92. return ret;
  93. }
  94. if (maxSize != ULONG_MAX && byteCount >= maxSize)
  95. break;
  96. if (!forever)
  97. {
  98. timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  99. if (!timeToGo)
  100. break;
  101. }
  102. double waitTime = TimeToNextTransceive();
  103. if (!forever && waitTime > timeToGo)
  104. {
  105. m_blockedBySpeedLimit = true;
  106. break;
  107. }
  108. WaitObjectContainer container;
  109. LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
  110. container.Wait((unsigned long)waitTime);
  111. }
  112. return 0;
  113. }
  114. size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
  115. {
  116. if (messageCount == 0)
  117. return 0;
  118. messageCount = 0;
  119. lword byteCount;
  120. do {
  121. byteCount = LWORD_MAX;
  122. RETURN_IF_NONZERO(Pump2(byteCount, blocking));
  123. } while(byteCount == LWORD_MAX);
  124. if (!m_messageEndSent && SourceExhausted())
  125. {
  126. RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
  127. m_messageEndSent = true;
  128. messageCount = 1;
  129. }
  130. return 0;
  131. }
  132. lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
  133. {
  134. m_blockedBySpeedLimit = false;
  135. size_t curBufSize = GetCurrentBufferSize();
  136. if (curBufSize <= targetSize && (targetSize || !EofPending()))
  137. return 0;
  138. if (!GetMaxBytesPerSecond())
  139. return DoFlush(maxTime, targetSize);
  140. bool forever = (maxTime == INFINITE_TIME);
  141. unsigned long timeToGo = maxTime;
  142. Timer timer(Timer::MILLISECONDS, forever);
  143. lword totalFlushed = 0;
  144. timer.StartTimer();
  145. while (true)
  146. {
  147. size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
  148. if (flushSize || EofPending())
  149. {
  150. if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  151. size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
  152. if (ret)
  153. {
  154. NoteTransceive(ret);
  155. curBufSize -= ret;
  156. totalFlushed += ret;
  157. }
  158. }
  159. if (curBufSize <= targetSize && (targetSize || !EofPending()))
  160. break;
  161. if (!forever)
  162. {
  163. timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
  164. if (!timeToGo)
  165. break;
  166. }
  167. double waitTime = TimeToNextTransceive();
  168. if (!forever && waitTime > timeToGo)
  169. {
  170. m_blockedBySpeedLimit = true;
  171. break;
  172. }
  173. WaitObjectContainer container;
  174. LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
  175. container.Wait((unsigned long)waitTime);
  176. }
  177. return totalFlushed;
  178. }
  179. bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
  180. {
  181. TimedFlush(blocking ? INFINITE_TIME : 0);
  182. return hardFlush && (!!GetCurrentBufferSize() || EofPending());
  183. }
  184. // *************************************************************
  185. NetworkSource::NetworkSource(BufferedTransformation *attachment)
  186. : NonblockingSource(attachment), m_buf(1024*16)
  187. , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
  188. , m_waitingForResult(false), m_outputBlocked(false)
  189. {
  190. }
  191. unsigned int NetworkSource::GetMaxWaitObjectCount() const
  192. {
  193. return LimitedBandwidth::GetMaxWaitObjectCount()
  194. + GetReceiver().GetMaxWaitObjectCount()
  195. + AttachedTransformation()->GetMaxWaitObjectCount();
  196. }
  197. void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
  198. {
  199. if (BlockedBySpeedLimit())
  200. LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
  201. else if (!m_outputBlocked)
  202. {
  203. if (m_dataBegin == m_dataEnd)
  204. AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
  205. else
  206. container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
  207. }
  208. AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
  209. }
  210. size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
  211. {
  212. NetworkReceiver &receiver = AccessReceiver();
  213. lword maxSize = byteCount;
  214. byteCount = 0;
  215. bool forever = maxTime == INFINITE_TIME;
  216. Timer timer(Timer::MILLISECONDS, forever);
  217. BufferedTransformation *t = AttachedTransformation();
  218. if (m_outputBlocked)
  219. goto DoOutput;
  220. while (true)
  221. {
  222. if (m_dataBegin == m_dataEnd)
  223. {
  224. if (receiver.EofReceived())
  225. break;
  226. if (m_waitingForResult)
  227. {
  228. if (receiver.MustWaitForResult() &&
  229. !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  230. CallStack("NetworkSource::DoPump() - wait receive result", 0)))
  231. break;
  232. unsigned int recvResult = receiver.GetReceiveResult();
  233. #if CRYPTOPP_TRACE_NETWORK
  234. OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
  235. #endif
  236. m_dataEnd += recvResult;
  237. m_waitingForResult = false;
  238. if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
  239. goto ReceiveNoWait;
  240. }
  241. else
  242. {
  243. m_dataEnd = m_dataBegin = 0;
  244. if (receiver.MustWaitToReceive())
  245. {
  246. if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  247. CallStack("NetworkSource::DoPump() - wait receive", 0)))
  248. break;
  249. receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
  250. m_waitingForResult = true;
  251. }
  252. else
  253. {
  254. ReceiveNoWait:
  255. m_waitingForResult = true;
  256. // call Receive repeatedly as long as data is immediately available,
  257. // because some receivers tend to return data in small pieces
  258. #if CRYPTOPP_TRACE_NETWORK
  259. OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
  260. #endif
  261. while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
  262. {
  263. unsigned int recvResult = receiver.GetReceiveResult();
  264. #if CRYPTOPP_TRACE_NETWORK
  265. OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
  266. #endif
  267. m_dataEnd += recvResult;
  268. if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
  269. {
  270. m_waitingForResult = false;
  271. break;
  272. }
  273. }
  274. }
  275. }
  276. }
  277. else
  278. {
  279. m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
  280. if (checkDelimiter)
  281. m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
  282. DoOutput:
  283. size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
  284. if (result)
  285. {
  286. if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  287. CallStack("NetworkSource::DoPump() - wait attachment", 0)))
  288. goto DoOutput;
  289. else
  290. {
  291. m_outputBlocked = true;
  292. return result;
  293. }
  294. }
  295. m_outputBlocked = false;
  296. byteCount += m_putSize;
  297. m_dataBegin += m_putSize;
  298. if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
  299. break;
  300. if (maxSize != ULONG_MAX && byteCount == maxSize)
  301. break;
  302. // once time limit is reached, return even if there is more data waiting
  303. // but make 0 a special case so caller can request a large amount of data to be
  304. // pumped as long as it is immediately available
  305. if (maxTime > 0 && timer.ElapsedTime() > maxTime)
  306. break;
  307. }
  308. }
  309. return 0;
  310. }
  311. // *************************************************************
  312. NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
  313. : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
  314. , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
  315. , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
  316. , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
  317. , m_currentSpeed(0), m_maxObservedSpeed(0)
  318. {
  319. }
  320. float NetworkSink::ComputeCurrentSpeed()
  321. {
  322. if (m_speedTimer.ElapsedTime() > 1000)
  323. {
  324. m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
  325. m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
  326. m_byteCountSinceLastTimerReset = 0;
  327. m_speedTimer.StartTimer();
  328. // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
  329. }
  330. return m_currentSpeed;
  331. }
  332. float NetworkSink::GetMaxObservedSpeed() const
  333. {
  334. lword m = GetMaxBytesPerSecond();
  335. return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
  336. }
  337. unsigned int NetworkSink::GetMaxWaitObjectCount() const
  338. {
  339. return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
  340. }
  341. void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
  342. {
  343. if (BlockedBySpeedLimit())
  344. LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
  345. else if (m_wasBlocked)
  346. AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
  347. else if (!m_buffer.IsEmpty())
  348. AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
  349. else if (EofPending())
  350. AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
  351. }
  352. size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
  353. {
  354. if (m_eofState == EOF_DONE)
  355. {
  356. if (length || messageEnd)
  357. throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
  358. return 0;
  359. }
  360. if (m_eofState > EOF_NONE)
  361. goto EofSite;
  362. {
  363. if (m_skipBytes)
  364. {
  365. assert(length >= m_skipBytes);
  366. inString += m_skipBytes;
  367. length -= m_skipBytes;
  368. }
  369. m_buffer.Put(inString, length);
  370. if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
  371. TimedFlush(0, 0);
  372. size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
  373. if (blocking)
  374. TimedFlush(INFINITE_TIME, targetSize);
  375. if (m_buffer.CurrentSize() > targetSize)
  376. {
  377. assert(!blocking);
  378. m_wasBlocked = true;
  379. m_skipBytes += length;
  380. size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
  381. return STDMAX<size_t>(blockedBytes, 1);
  382. }
  383. m_wasBlocked = false;
  384. m_skipBytes = 0;
  385. }
  386. if (messageEnd)
  387. {
  388. m_eofState = EOF_PENDING_SEND;
  389. EofSite:
  390. TimedFlush(blocking ? INFINITE_TIME : 0, 0);
  391. if (m_eofState != EOF_DONE)
  392. return 1;
  393. }
  394. return 0;
  395. }
  396. lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
  397. {
  398. NetworkSender &sender = AccessSender();
  399. bool forever = maxTime == INFINITE_TIME;
  400. Timer timer(Timer::MILLISECONDS, forever);
  401. unsigned int totalFlushSize = 0;
  402. while (true)
  403. {
  404. if (m_buffer.CurrentSize() <= targetSize)
  405. break;
  406. if (m_needSendResult)
  407. {
  408. if (sender.MustWaitForResult() &&
  409. !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
  410. CallStack("NetworkSink::DoFlush() - wait send result", 0)))
  411. break;
  412. unsigned int sendResult = sender.GetSendResult();
  413. #if CRYPTOPP_TRACE_NETWORK
  414. OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
  415. #endif
  416. m_buffer.Skip(sendResult);
  417. totalFlushSize += sendResult;
  418. m_needSendResult = false;
  419. if (!m_buffer.AnyRetrievable())
  420. break;
  421. }
  422. unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
  423. if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
  424. break;
  425. size_t contiguousSize = 0;
  426. const byte *block = m_buffer.Spy(contiguousSize);
  427. #if CRYPTOPP_TRACE_NETWORK
  428. OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
  429. #endif
  430. sender.Send(block, contiguousSize);
  431. m_needSendResult = true;
  432. if (maxTime > 0 && timeOut == 0)
  433. break; // once time limit is reached, return even if there is more data waiting
  434. }
  435. m_byteCountSinceLastTimerReset += totalFlushSize;
  436. ComputeCurrentSpeed();
  437. if (m_buffer.IsEmpty() && !m_needSendResult)
  438. {
  439. if (m_eofState == EOF_PENDING_SEND)
  440. {
  441. sender.SendEof();
  442. m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
  443. }
  444. while (m_eofState == EOF_PENDING_DELIVERY)
  445. {
  446. unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
  447. if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
  448. break;
  449. if (sender.EofSent())
  450. m_eofState = EOF_DONE;
  451. }
  452. }
  453. return totalFlushSize;
  454. }
  455. #endif // #ifdef HIGHRES_TIMER_AVAILABLE
  456. NAMESPACE_END