Browse Source

Added `thread_pool`

Added `timer::current`, `timer::detached`
master
Jessica James 7 years ago
parent
commit
4c2b4480ed
  1. 2
      src/common/CMakeLists.txt
  2. 159
      src/common/thread_pool.cpp
  3. 30
      src/common/timer.cpp
  4. 11
      src/common/timer_manager.cpp
  5. 4
      src/include/impl/timer_manager.hpp
  6. 73
      src/include/thread_pool.hpp
  7. 2
      src/include/timer.hpp
  8. 2
      src/test/CMakeLists.txt
  9. 25
      src/test/test.hpp
  10. 103
      src/test/thread_pool.cpp
  11. 4
      src/test/timer.cpp

2
src/common/CMakeLists.txt

@ -3,6 +3,6 @@ cmake_minimum_required(VERSION 3.8)
# Setup source files # Setup source files
include_directories(../include) include_directories(../include)
set(SOURCE_FILES set(SOURCE_FILES
timer.cpp timer_manager.cpp) timer.cpp timer_manager.cpp thread_pool.cpp)
add_library(jessilib ${SOURCE_FILES}) add_library(jessilib ${SOURCE_FILES})

159
src/common/thread_pool.cpp

@ -0,0 +1,159 @@
/**
* Copyright (C) 2018 Jessica James.
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Written by Jessica James <jessica.aj@outlook.com>
*/
#include "thread_pool.hpp"
#include <cassert>
namespace jessilib {
// thread_pool
thread_pool::thread_pool()
: thread_pool{ std::thread::hardware_concurrency() } {
}
thread_pool::thread_pool(size_t in_threads)
: m_threads{ in_threads } {
assert(in_threads != 0);
while (in_threads != 0) {
thread& self = m_threads[--in_threads];
self.m_thread = std::thread([this, &self]() {
while (true) {
// Run next pending task, if there is any
self.m_task = pop_task();
if (self.m_task != nullptr) {
self.run_task();
continue;
}
if (self.m_shutdown) {
break;
}
// Push inactive thread
{
std::lock_guard<std::mutex> guard(m_inactive_threads_mutex);
m_inactive_threads.push(&self);
}
// Wait for notification
self.wait();
// Run task
self.run_task();
}
});
}
}
template<typename T>
T lock_helper(T&& in_obj, std::mutex& in_mutex) {
std::lock_guard<std::mutex> guard(in_mutex);
return std::move(in_obj);
}
thread_pool::~thread_pool() {
join();
}
void thread_pool::push(task_t in_task) {
thread* target_thread = inactive_thread();
if (target_thread != nullptr) {
target_thread->m_task = in_task;
target_thread->m_notifier.notify_one();
}
else {
std::lock_guard<std::mutex> guard(m_tasks_mutex);
m_tasks.push(in_task);
}
}
void thread_pool::join() {
std::lock_guard<std::mutex> guard(m_threads_mutex);
// Join threads
for (thread& thread : m_threads) {
// Mark thread for shutdown
thread.m_shutdown = true;
thread.m_notifier.notify_one();
// Wait for thread to complete
thread.m_thread.join();
}
// Cleanup threads
m_threads.clear();
while (!m_inactive_threads.empty()) {
m_inactive_threads.pop();
}
}
size_t thread_pool::threads() const {
//std::lock_guard<std::mutex> guard(m_threads_mutex);
return m_threads.size();
}
size_t thread_pool::active() const {
std::lock_guard<std::mutex> guard(m_inactive_threads_mutex);
return threads() - m_inactive_threads.size();
}
// thread_pool private functions
thread_pool::thread* thread_pool::inactive_thread() {
std::lock_guard<std::mutex> guard(m_inactive_threads_mutex);
if (!m_inactive_threads.empty()) {
thread* result = m_inactive_threads.front();
m_inactive_threads.pop();
return result;
}
return nullptr;
}
thread_pool::task_t thread_pool::pop_task() {
std::lock_guard<std::mutex> guard(m_tasks_mutex);
if (!m_tasks.empty()) {
task_t task = m_tasks.front();
m_tasks.pop();
return task;
}
return nullptr;
}
// thread_pool::thread
void thread_pool::thread::run_task() {
if (m_task) {
m_active = true;
m_task();
m_task = nullptr;
m_active = false;
}
}
void thread_pool::thread::wait() {
std::unique_lock<std::mutex> guard(m_notifier_mutex);
m_notifier.wait(guard);
}
} // namespace jessilib

30
src/common/timer.cpp

@ -70,6 +70,8 @@ timer::timer(duration_t in_period, function_t in_callback)
m_next{ calc_next() }, m_next{ calc_next() },
m_callback{ in_callback }, m_callback{ in_callback },
m_self{ impl::timer_manager::instance().m_detached_timers.end() } { m_self{ impl::timer_manager::instance().m_detached_timers.end() } {
// PROBLEM: timer may be executing while moving data????
// Assertion checks // Assertion checks
assert(m_callback != nullptr); assert(m_callback != nullptr);
assert(m_period.count() != 0); assert(m_period.count() != 0);
@ -90,6 +92,15 @@ timer::timer(duration_t in_period, iterations_t in_iterations, function_t in_cal
// Empty ctor body // Empty ctor body
} }
timer& timer::operator=(timer&& in_timer) {
impl::timer_manager& manager = impl::timer_manager::instance();
m_period = in_timer.m_period;
m_next = in_timer.m_next;
m_callback = std::move(in_timer.m_callback);
m_self = in_timer.m_self;
}
timer::~timer() { timer::~timer() {
// If it's null, then it was either never added (default constructed) or already removed (as part of move) // If it's null, then it was either never added (default constructed) or already removed (as part of move)
if (!null()) { if (!null()) {
@ -113,11 +124,24 @@ bool timer::null() const {
return m_callback == nullptr; return m_callback == nullptr;
} }
bool timer::current() const {
impl::timer_manager& manager = impl::timer_manager::instance();
return manager.m_current_timer == this && manager.m_thread.get_id() == std::this_thread::get_id();
}
bool timer::detached() const {
impl::timer_manager& manager = impl::timer_manager::instance();
std::lock_guard<std::mutex> lock(manager.m_detached_timers_mutex);
return m_self != manager.m_detached_timers.end();
}
void timer::detach() { void timer::detach() {
impl::timer_manager& manager = impl::timer_manager::instance(); impl::timer_manager& manager = impl::timer_manager::instance();
assert(!manager.is_current(*this)); // you cannot detach a timer from within itself, because that would destroy the callback you're currently executing assert(!current()); // you cannot detach a timer from within itself, because that would destroy the callback you're currently executing
assert(!manager.is_detached(*this)); // you cannot detach a timer that is already detached assert(!detached()); // you cannot detach a timer that is already detached
std::lock_guard<std::mutex> lock(manager.m_detached_timers_mutex); std::lock_guard<std::mutex> lock(manager.m_detached_timers_mutex);
manager.m_detached_timers.emplace_back(std::move(*this)); manager.m_detached_timers.emplace_back(std::move(*this));
--manager.m_detached_timers.back().m_self; // Is this a race condition? --manager.m_detached_timers.back().m_self; // Is this a race condition?
@ -126,7 +150,7 @@ void timer::detach() {
void timer::cancel() { void timer::cancel() {
impl::timer_manager& manager = impl::timer_manager::instance(); impl::timer_manager& manager = impl::timer_manager::instance();
if (manager.is_current(*this)) { if (current()) {
manager.m_active_timers.erase(this); manager.m_active_timers.erase(this);
return; return;
} }

11
src/common/timer_manager.cpp

@ -45,7 +45,7 @@ void timer_manager::loop() {
// Set active // Set active
timer* target = *itr; timer* target = *itr;
m_current_timer = target; m_current_timer = target;
bool detached = is_detached(*target); bool detached = target->detached();
// Execute timer // Execute timer
target->m_callback(*target); target->m_callback(*target);
@ -80,14 +80,5 @@ void timer_manager::loop() {
} }
} }
bool timer_manager::is_current(timer& in_timer) {
return m_current_timer == &in_timer && m_thread.get_id() == std::this_thread::get_id();
}
bool timer_manager::is_detached(timer& in_timer) {
std::lock_guard<std::mutex> lock(m_detached_timers_mutex);
return in_timer.m_self != m_detached_timers.end();
}
} // namespace impl } // namespace impl
} // namespace jessilib } // namespace jessilib

4
src/include/impl/timer_manager.hpp

@ -48,8 +48,8 @@ class timer_manager {
void loop(); void loop();
// helpers // helpers
bool is_current(timer& in_timer); bool is_current(const timer& in_timer);
bool is_detached(timer& in_timer); bool is_detached(const timer& in_timer);
// Members // Members
std::list<timer> m_detached_timers; std::list<timer> m_detached_timers;

73
src/include/thread_pool.hpp

@ -0,0 +1,73 @@
/**
* Copyright (C) 2018 Jessica James.
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Written by Jessica James <jessica.aj@outlook.com>
*/
#pragma once
#include <cstdint>
#include <functional>
#include <vector>
#include <thread>
#include <queue>
#include <atomic>
#include <mutex>
#include <condition_variable>
namespace jessilib {
class thread_pool {
public:
// Types
using task_t = std::function<void(void)>;
thread_pool();
thread_pool(size_t in_threads);
thread_pool(const thread_pool&) = delete;
thread_pool(thread_pool&&) = delete;
~thread_pool();
void push(task_t in_task); // push a task to the pool
void join(); // join all threads
size_t threads() const; // how many threads are in the pool
size_t active() const; // how many threads are running tasks
private:
struct thread {
void run_task();
void wait();
std::atomic<bool> m_active{ false };
std::atomic<bool> m_shutdown{ false };
task_t m_task;
std::condition_variable m_notifier;
std::mutex m_notifier_mutex;
std::thread m_thread;
};
thread* inactive_thread();
task_t pop_task();
std::vector<thread> m_threads;
std::queue<thread*> m_inactive_threads;
std::queue<task_t> m_tasks;
mutable std::mutex m_threads_mutex;
mutable std::mutex m_inactive_threads_mutex;
mutable std::mutex m_tasks_mutex;
}; // class thread_pool
} // namespace jessilib

2
src/include/timer.hpp

@ -67,6 +67,8 @@ public:
duration_t period() const; duration_t period() const;
function_t function() const; function_t function() const;
bool null() const; bool null() const;
bool current() const;
bool detached() const;
// Mutators // Mutators
void detach(); void detach();

2
src/test/CMakeLists.txt

@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.8)
# Setup source files # Setup source files
include_directories(../include) include_directories(../include)
set(SOURCE_FILES set(SOURCE_FILES
timer.cpp) timer.cpp thread_pool.cpp)
# Setup gtest # Setup gtest
add_subdirectory(googletest/googletest) add_subdirectory(googletest/googletest)

25
src/test/test.hpp

@ -0,0 +1,25 @@
/**
* Copyright (C) 2018 Jessica James.
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Written by Jessica James <jessica.aj@outlook.com>
*/
#pragma once
// Common includes
#include "gtest/gtest.h"
// Helper macros
#define repeat( ITERATIONS ) for (size_t iteration__ = 0; iteration__ != (ITERATIONS); ++iteration__)

103
src/test/thread_pool.cpp

@ -0,0 +1,103 @@
/**
* Copyright (C) 2018 Jessica James.
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* Written by Jessica James <jessica.aj@outlook.com>
*/
#include <chrono>
#include "test.hpp"
#include "thread_pool.hpp"
using namespace jessilib;
using namespace std::literals;
constexpr size_t total_iterations{ 100 };
TEST(ThreadPoolTest, initialDefault) {
thread_pool pool;
std::this_thread::sleep_for(10ms);
EXPECT_EQ(pool.threads(), std::thread::hardware_concurrency());
std::this_thread::sleep_for(10ms);
EXPECT_EQ(pool.active(), 0);
pool.join();
EXPECT_EQ(pool.active(), 0);
EXPECT_EQ(pool.threads(), 0);
}
TEST(ThreadPoolTest, initialSizeDefined) {
thread_pool pool{ 7 };
EXPECT_EQ(pool.threads(), 7);
std::this_thread::sleep_for(10ms);
EXPECT_EQ(pool.active(), 0);
pool.join();
EXPECT_EQ(pool.active(), 0);
EXPECT_EQ(pool.threads(), 0);
}
TEST(ThreadPoolTest, push) {
std::atomic<size_t> iterations{ 0 };
thread_pool pool;
repeat (total_iterations) {
pool.push([&iterations, &pool]() {
++iterations;
});
}
pool.join();
EXPECT_EQ(iterations, total_iterations);
}
TEST(ThreadPoolTest, deadlockSingleThread) {
std::atomic<size_t> iterations{ 0 };
thread_pool pool{ 1 };
repeat (total_iterations) {
pool.push([&iterations, &pool]() {
++iterations;
// Neither of the below should cause a deadlock
EXPECT_EQ(pool.threads(), 1);
EXPECT_EQ(pool.active(), 1);
});
}
pool.join();
EXPECT_EQ(iterations, total_iterations);
}
TEST(ThreadPoolTest, deadlockMultiThread) {
std::atomic<size_t> iterations{ 0 };
thread_pool pool{ 8 };
repeat (total_iterations) {
pool.push([&iterations, &pool]() {
++iterations;
// Neither of the below should cause a deadlock
pool.threads();
pool.active();
});
}
pool.join();
EXPECT_EQ(iterations, total_iterations);
}

4
src/test/timer.cpp

@ -17,7 +17,7 @@
*/ */
#include <future> #include <future>
#include "gtest/gtest.h" #include "test.hpp"
#include "timer.hpp" #include "timer.hpp"
using namespace jessilib; using namespace jessilib;
@ -25,7 +25,7 @@ using namespace std::literals;
constexpr size_t total_iterations{ 4 }; constexpr size_t total_iterations{ 4 };
constexpr std::chrono::steady_clock::duration period = 1ms; constexpr std::chrono::steady_clock::duration period = 1ms;
constexpr std::chrono::steady_clock::duration timeout = 1s; constexpr std::chrono::steady_clock::duration timeout = period * total_iterations * 2 + 1s;
TEST(TimerTest, scoped) { TEST(TimerTest, scoped) {
size_t iterations{ 0 }; size_t iterations{ 0 };

Loading…
Cancel
Save