AsynFunctionT.hpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /*
  2. 异步 任务 处理
  3. */
  4. #include <process.h>
  5. #include <functional>
  6. #include <list>
  7. template<class ParamType=LPVOID> // char* string void*
  8. class AsynTaskHandle
  9. {
  10. typedef struct tagTask
  11. {
  12. std::function<void(ParamType)> fun;
  13. ParamType lpParam;
  14. }AsynTask, *PAsynTask;
  15. protected:
  16. volatile bool m_bQuit;
  17. HANDLE* m_phThread;
  18. unsigned int m_nThreadCount;
  19. //UINT m_dwThreadId;
  20. //HANDLE m_hListEvent;
  21. HANDLE m_hSemaphore; //信号量 句柄
  22. CRITICAL_SECTION m_csLock; //维持队列同步
  23. std::list<PAsynTask> m_TaskList;
  24. public:
  25. AsynTaskHandle(unsigned int nThreadCount=4)
  26. : m_bQuit(false)
  27. , m_nThreadCount(nThreadCount)
  28. {
  29. InitializeCriticalSection(&m_csLock);
  30. m_hSemaphore = CreateSemaphore(NULL, 0, 100, NULL);
  31. //m_hListEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
  32. assert(m_nThreadCount);
  33. m_phThread = new HANDLE[m_nThreadCount];
  34. for (unsigned int i=0; i<m_nThreadCount; ++i)
  35. {
  36. m_phThread[i] = (HANDLE)_beginthreadex(NULL, 0, ThreadFun, this, NULL, NULL);
  37. }
  38. }
  39. virtual ~AsynTaskHandle()
  40. {
  41. if(NULL != m_hSemaphore)
  42. {
  43. CloseHandle(m_hSemaphore);
  44. m_hSemaphore = NULL;
  45. }
  46. if(m_TaskList.size() > 0) // 要退出了 且 任务没执行完 要清理缓存
  47. {
  48. for each (auto var in m_TaskList)
  49. {
  50. delete var;
  51. }
  52. }
  53. for (unsigned int i=0; i<m_nThreadCount; ++i)
  54. {
  55. CloseHandle(m_phThread[i]);
  56. }
  57. delete[] m_phThread;
  58. m_phThread = NULL;
  59. DeleteCriticalSection(&m_csLock);
  60. }
  61. public:
  62. template<class Fun, class This>
  63. void AddTask(Fun fun, This t, ParamType lpParam)
  64. {
  65. auto task = new AsynTask;
  66. task->fun = std::bind(fun, t, std::placeholders::_1);
  67. task->lpParam = lpParam;
  68. EnterCriticalSection(&m_csLock);
  69. m_TaskList.push_back(task); //交换数据
  70. LeaveCriticalSection(&m_csLock);
  71. //SetEvent(m_hListEvent);
  72. ReleaseSemaphore(m_hSemaphore, 1, NULL);
  73. }
  74. void AddTask(std::function<void(ParamType)> fun, ParamType lpParam)
  75. {
  76. auto task = new AsynTask;
  77. task->fun = fun;
  78. task->lpParam = lpParam;
  79. EnterCriticalSection(&m_csLock);
  80. m_TaskList.push_back(task); //交换数据
  81. LeaveCriticalSection(&m_csLock);
  82. //SetEvent(m_hListEvent);
  83. ReleaseSemaphore(m_hSemaphore, 1, NULL);
  84. }
  85. void Close(DWORD dwWait=1000)
  86. {
  87. m_bQuit = true;
  88. //SetEvent(m_hListEvent);
  89. ReleaseSemaphore(m_hSemaphore, m_nThreadCount, NULL);
  90. //
  91. for (unsigned int i=0; i<m_nThreadCount; ++i)
  92. {
  93. WaitForSingleObject(m_hThread[i], dwWait);
  94. }
  95. }
  96. protected:
  97. unsigned int Handle()
  98. {
  99. while (!m_bQuit)
  100. {
  101. ::WaitForSingleObject(m_hSemaphore, INFINITE);
  102. if(m_bQuit)
  103. break;
  104. /*OutputDebugString(_T("AsyncFun"));*/
  105. while (!m_bQuit)
  106. {
  107. PAsynTask pTask = NULL;
  108. EnterCriticalSection(&m_csLock);
  109. if(m_TaskList.size() > 0)
  110. {
  111. pTask = m_TaskList.front();
  112. m_TaskList.pop_front();
  113. }
  114. LeaveCriticalSection(&m_csLock);
  115. if(NULL == pTask)
  116. break;
  117. pTask->fun(pTask->lpParam);
  118. delete pTask;
  119. }
  120. }
  121. return 0;
  122. }
  123. private:
  124. //静态 线程 函数
  125. static unsigned int __stdcall ThreadFun(void* lp)
  126. {
  127. AsynTaskHandle* pThis = (AsynTaskHandle*)lp;
  128. if(NULL == pThis )
  129. return 0;
  130. return pThis->Handle();
  131. }
  132. };