From 4c2b4480ed0ab5608131d5726cf932b8c8c0e578 Mon Sep 17 00:00:00 2001 From: Jessica James Date: Wed, 3 Jan 2018 17:39:36 -0600 Subject: [PATCH] Added `thread_pool` Added `timer::current`, `timer::detached` --- src/common/CMakeLists.txt | 2 +- src/common/thread_pool.cpp | 159 +++++++++++++++++++++++++++++ src/common/timer.cpp | 30 +++++- src/common/timer_manager.cpp | 11 +- src/include/impl/timer_manager.hpp | 4 +- src/include/thread_pool.hpp | 73 +++++++++++++ src/include/timer.hpp | 2 + src/test/CMakeLists.txt | 2 +- src/test/test.hpp | 25 +++++ src/test/thread_pool.cpp | 103 +++++++++++++++++++ src/test/timer.cpp | 4 +- 11 files changed, 396 insertions(+), 19 deletions(-) create mode 100644 src/common/thread_pool.cpp create mode 100644 src/include/thread_pool.hpp create mode 100644 src/test/test.hpp create mode 100644 src/test/thread_pool.cpp diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 8f3b1c3..6edfa5d 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -3,6 +3,6 @@ cmake_minimum_required(VERSION 3.8) # Setup source files include_directories(../include) set(SOURCE_FILES - timer.cpp timer_manager.cpp) + timer.cpp timer_manager.cpp thread_pool.cpp) add_library(jessilib ${SOURCE_FILES}) diff --git a/src/common/thread_pool.cpp b/src/common/thread_pool.cpp new file mode 100644 index 0000000..a05e14b --- /dev/null +++ b/src/common/thread_pool.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2018 Jessica James. + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY + * SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION + * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN + * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Written by Jessica James + */ + +#include "thread_pool.hpp" +#include + +namespace jessilib { + +// thread_pool + +thread_pool::thread_pool() + : thread_pool{ std::thread::hardware_concurrency() } { +} + +thread_pool::thread_pool(size_t in_threads) + : m_threads{ in_threads } { + assert(in_threads != 0); + + while (in_threads != 0) { + thread& self = m_threads[--in_threads]; + self.m_thread = std::thread([this, &self]() { + while (true) { + // Run next pending task, if there is any + self.m_task = pop_task(); + if (self.m_task != nullptr) { + self.run_task(); + continue; + } + + if (self.m_shutdown) { + break; + } + + // Push inactive thread + { + std::lock_guard guard(m_inactive_threads_mutex); + m_inactive_threads.push(&self); + } + + // Wait for notification + self.wait(); + + // Run task + self.run_task(); + } + }); + } +} + +template +T lock_helper(T&& in_obj, std::mutex& in_mutex) { + std::lock_guard guard(in_mutex); + return std::move(in_obj); +} + +thread_pool::~thread_pool() { + join(); +} + +void thread_pool::push(task_t in_task) { + thread* target_thread = inactive_thread(); + + if (target_thread != nullptr) { + target_thread->m_task = in_task; + target_thread->m_notifier.notify_one(); + } + else { + std::lock_guard guard(m_tasks_mutex); + m_tasks.push(in_task); + } +} + +void thread_pool::join() { + std::lock_guard guard(m_threads_mutex); + + // Join threads + for (thread& thread : m_threads) { + // Mark thread for shutdown + thread.m_shutdown = true; + thread.m_notifier.notify_one(); + + // Wait for thread to complete + thread.m_thread.join(); + } + + // Cleanup threads + m_threads.clear(); + while (!m_inactive_threads.empty()) { + m_inactive_threads.pop(); + } +} + +size_t thread_pool::threads() const { + //std::lock_guard guard(m_threads_mutex); + return m_threads.size(); +} + +size_t thread_pool::active() const { + std::lock_guard guard(m_inactive_threads_mutex); + return threads() - m_inactive_threads.size(); +} + +// thread_pool private functions + +thread_pool::thread* thread_pool::inactive_thread() { + std::lock_guard guard(m_inactive_threads_mutex); + + if (!m_inactive_threads.empty()) { + thread* result = m_inactive_threads.front(); + m_inactive_threads.pop(); + return result; + } + + return nullptr; +} + +thread_pool::task_t thread_pool::pop_task() { + std::lock_guard guard(m_tasks_mutex); + if (!m_tasks.empty()) { + task_t task = m_tasks.front(); + m_tasks.pop(); + return task; + } + + return nullptr; +} + +// thread_pool::thread + +void thread_pool::thread::run_task() { + if (m_task) { + m_active = true; + m_task(); + m_task = nullptr; + m_active = false; + } +} + +void thread_pool::thread::wait() { + std::unique_lock guard(m_notifier_mutex); + m_notifier.wait(guard); +} + +} // namespace jessilib diff --git a/src/common/timer.cpp b/src/common/timer.cpp index 5686d95..55332ea 100644 --- a/src/common/timer.cpp +++ b/src/common/timer.cpp @@ -70,6 +70,8 @@ timer::timer(duration_t in_period, function_t in_callback) m_next{ calc_next() }, m_callback{ in_callback }, m_self{ impl::timer_manager::instance().m_detached_timers.end() } { + // PROBLEM: timer may be executing while moving data???? + // Assertion checks assert(m_callback != nullptr); assert(m_period.count() != 0); @@ -90,6 +92,15 @@ timer::timer(duration_t in_period, iterations_t in_iterations, function_t in_cal // Empty ctor body } +timer& timer::operator=(timer&& in_timer) { + impl::timer_manager& manager = impl::timer_manager::instance(); + + m_period = in_timer.m_period; + m_next = in_timer.m_next; + m_callback = std::move(in_timer.m_callback); + m_self = in_timer.m_self; +} + timer::~timer() { // If it's null, then it was either never added (default constructed) or already removed (as part of move) if (!null()) { @@ -113,11 +124,24 @@ bool timer::null() const { return m_callback == nullptr; } +bool timer::current() const { + impl::timer_manager& manager = impl::timer_manager::instance(); + + return manager.m_current_timer == this && manager.m_thread.get_id() == std::this_thread::get_id(); +} + +bool timer::detached() const { + impl::timer_manager& manager = impl::timer_manager::instance(); + + std::lock_guard lock(manager.m_detached_timers_mutex); + return m_self != manager.m_detached_timers.end(); +} + void timer::detach() { impl::timer_manager& manager = impl::timer_manager::instance(); - assert(!manager.is_current(*this)); // you cannot detach a timer from within itself, because that would destroy the callback you're currently executing - assert(!manager.is_detached(*this)); // you cannot detach a timer that is already detached + assert(!current()); // you cannot detach a timer from within itself, because that would destroy the callback you're currently executing + assert(!detached()); // you cannot detach a timer that is already detached std::lock_guard lock(manager.m_detached_timers_mutex); manager.m_detached_timers.emplace_back(std::move(*this)); --manager.m_detached_timers.back().m_self; // Is this a race condition? @@ -126,7 +150,7 @@ void timer::detach() { void timer::cancel() { impl::timer_manager& manager = impl::timer_manager::instance(); - if (manager.is_current(*this)) { + if (current()) { manager.m_active_timers.erase(this); return; } diff --git a/src/common/timer_manager.cpp b/src/common/timer_manager.cpp index 490b7ed..5ec9dc2 100644 --- a/src/common/timer_manager.cpp +++ b/src/common/timer_manager.cpp @@ -45,7 +45,7 @@ void timer_manager::loop() { // Set active timer* target = *itr; m_current_timer = target; - bool detached = is_detached(*target); + bool detached = target->detached(); // Execute timer target->m_callback(*target); @@ -80,14 +80,5 @@ void timer_manager::loop() { } } -bool timer_manager::is_current(timer& in_timer) { - return m_current_timer == &in_timer && m_thread.get_id() == std::this_thread::get_id(); -} - -bool timer_manager::is_detached(timer& in_timer) { - std::lock_guard lock(m_detached_timers_mutex); - return in_timer.m_self != m_detached_timers.end(); -} - } // namespace impl } // namespace jessilib diff --git a/src/include/impl/timer_manager.hpp b/src/include/impl/timer_manager.hpp index c8e9ab0..5b9b529 100644 --- a/src/include/impl/timer_manager.hpp +++ b/src/include/impl/timer_manager.hpp @@ -48,8 +48,8 @@ class timer_manager { void loop(); // helpers - bool is_current(timer& in_timer); - bool is_detached(timer& in_timer); + bool is_current(const timer& in_timer); + bool is_detached(const timer& in_timer); // Members std::list m_detached_timers; diff --git a/src/include/thread_pool.hpp b/src/include/thread_pool.hpp new file mode 100644 index 0000000..541e87c --- /dev/null +++ b/src/include/thread_pool.hpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2018 Jessica James. + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY + * SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION + * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN + * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Written by Jessica James + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace jessilib { + +class thread_pool { +public: + // Types + using task_t = std::function; + + thread_pool(); + thread_pool(size_t in_threads); + thread_pool(const thread_pool&) = delete; + thread_pool(thread_pool&&) = delete; + ~thread_pool(); + + void push(task_t in_task); // push a task to the pool + void join(); // join all threads + size_t threads() const; // how many threads are in the pool + size_t active() const; // how many threads are running tasks + +private: + struct thread { + void run_task(); + void wait(); + + std::atomic m_active{ false }; + std::atomic m_shutdown{ false }; + task_t m_task; + std::condition_variable m_notifier; + std::mutex m_notifier_mutex; + std::thread m_thread; + }; + + thread* inactive_thread(); + task_t pop_task(); + + std::vector m_threads; + std::queue m_inactive_threads; + std::queue m_tasks; + + mutable std::mutex m_threads_mutex; + mutable std::mutex m_inactive_threads_mutex; + mutable std::mutex m_tasks_mutex; +}; // class thread_pool + +} // namespace jessilib diff --git a/src/include/timer.hpp b/src/include/timer.hpp index aec7656..051dbd9 100644 --- a/src/include/timer.hpp +++ b/src/include/timer.hpp @@ -67,6 +67,8 @@ public: duration_t period() const; function_t function() const; bool null() const; + bool current() const; + bool detached() const; // Mutators void detach(); diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index d836d32..18cbd18 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.8) # Setup source files include_directories(../include) set(SOURCE_FILES - timer.cpp) + timer.cpp thread_pool.cpp) # Setup gtest add_subdirectory(googletest/googletest) diff --git a/src/test/test.hpp b/src/test/test.hpp new file mode 100644 index 0000000..9439ea4 --- /dev/null +++ b/src/test/test.hpp @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2018 Jessica James. + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY + * SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION + * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN + * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Written by Jessica James + */ + +#pragma once + +// Common includes +#include "gtest/gtest.h" + +// Helper macros +#define repeat( ITERATIONS ) for (size_t iteration__ = 0; iteration__ != (ITERATIONS); ++iteration__) diff --git a/src/test/thread_pool.cpp b/src/test/thread_pool.cpp new file mode 100644 index 0000000..4047481 --- /dev/null +++ b/src/test/thread_pool.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2018 Jessica James. + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY + * SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION + * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN + * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Written by Jessica James + */ + +#include +#include "test.hpp" +#include "thread_pool.hpp" + +using namespace jessilib; +using namespace std::literals; + +constexpr size_t total_iterations{ 100 }; + +TEST(ThreadPoolTest, initialDefault) { + thread_pool pool; + std::this_thread::sleep_for(10ms); + + EXPECT_EQ(pool.threads(), std::thread::hardware_concurrency()); + std::this_thread::sleep_for(10ms); + EXPECT_EQ(pool.active(), 0); + + pool.join(); + + EXPECT_EQ(pool.active(), 0); + EXPECT_EQ(pool.threads(), 0); +} + +TEST(ThreadPoolTest, initialSizeDefined) { + thread_pool pool{ 7 }; + + EXPECT_EQ(pool.threads(), 7); + std::this_thread::sleep_for(10ms); + EXPECT_EQ(pool.active(), 0); + + pool.join(); + + EXPECT_EQ(pool.active(), 0); + EXPECT_EQ(pool.threads(), 0); +} + +TEST(ThreadPoolTest, push) { + std::atomic iterations{ 0 }; + thread_pool pool; + + repeat (total_iterations) { + pool.push([&iterations, &pool]() { + ++iterations; + }); + } + + pool.join(); + EXPECT_EQ(iterations, total_iterations); +} + +TEST(ThreadPoolTest, deadlockSingleThread) { + std::atomic iterations{ 0 }; + thread_pool pool{ 1 }; + + repeat (total_iterations) { + pool.push([&iterations, &pool]() { + ++iterations; + + // Neither of the below should cause a deadlock + EXPECT_EQ(pool.threads(), 1); + EXPECT_EQ(pool.active(), 1); + }); + } + + pool.join(); + EXPECT_EQ(iterations, total_iterations); +} + +TEST(ThreadPoolTest, deadlockMultiThread) { + std::atomic iterations{ 0 }; + thread_pool pool{ 8 }; + + repeat (total_iterations) { + pool.push([&iterations, &pool]() { + ++iterations; + + // Neither of the below should cause a deadlock + pool.threads(); + pool.active(); + }); + } + + pool.join(); + EXPECT_EQ(iterations, total_iterations); +} diff --git a/src/test/timer.cpp b/src/test/timer.cpp index 3e823f9..123d2be 100644 --- a/src/test/timer.cpp +++ b/src/test/timer.cpp @@ -17,7 +17,7 @@ */ #include -#include "gtest/gtest.h" +#include "test.hpp" #include "timer.hpp" using namespace jessilib; @@ -25,7 +25,7 @@ using namespace std::literals; constexpr size_t total_iterations{ 4 }; constexpr std::chrono::steady_clock::duration period = 1ms; -constexpr std::chrono::steady_clock::duration timeout = 1s; +constexpr std::chrono::steady_clock::duration timeout = period * total_iterations * 2 + 1s; TEST(TimerTest, scoped) { size_t iterations{ 0 };