Параллельный FIFO в C ++ 11

Я реализовал простой FIFO, который может быть произвольно использован либо одним потоком, либо способом передачи данных между потоками. Класс templated с аргументами для типов, которые будет содержать очередь, а также количества элементов в очереди (данные очереди хранятся в std::array). Для использования с одним потоком он создается посредством:

ST_FIFO<T, BUFFER_SIZE>

и для использования в многопоточном контексте:

MT_FIFO<T, BUFFER_SIZE>

Он имеет следующие методы:

  • push (с семантикой и без перемещения): поместите один элемент в буфер. Всегда успешно - если буфер заполнен, выдается самый старый элемент.
  • try_push: поместите один элемент в буфер. Если буфер заполнен, нажатие не выполняется.
  • multi_push: нажмите несколько элементов на буфер. В основном полезно для многопоточности, где все элементы подталкиваются под одну и ту же блокировку. Всегда удается.
  • try_multi_push: тот же, что и multi_push, но сбой, если буфер заполнен.
  • pop: выталкивает один элемент из буфера. В однопоточной версии возвращается немедленно, если буфер пуст. В многопоточной версии этот метод блокирует (используя переменную условия), пока данные не станут доступными.
  • multi_pop: выдает заданное количество элементов из буфера. Если буфер содержит меньше элементов, чем запрошенный номер, возвращает все элементы в буфере. В многопоточной версии блокируются до тех пор, пока не станет доступным один элемент. Возвращает количество элементов, выведенных из буфера.
  • peek: получить элемент из буфера, не удаляя его. Может быть любым элементом в буфере, но должен быть меньше количества элементов в буфере.
  • size: возвращает количество элементов в буфере.
  • is_empty: возвращает true, если буфер пуст.

В частности, для многопоточной версии:

  • try_pop: выдает элемент из буфера. Если буфер пуст, немедленно возвращается false.
  • wait_off: если поток ожидает появления элемента, этот метод можно вызвать из другого потока, чтобы завершить ожидание (полезно разблокировать поток при выходе.
  • wait_on: ожидание.

После этого я нашел в этой статье и в этой статье , которые реализуют параллельную очередь в очень похожим образом. Я думаю, что у меня все хорошо. Любые комментарии? Должен ли я освобождать блокировки перед уведомлением переменных условия? (если я это делаю, хладнокровно жалуется, что блокировки не удерживаются, но это большая сделка?) Есть одна вещь, которую я хотел бы выяснить, если это возможно. Теперь, когда установлены блокировки, невозможно одновременно нажать и выталкивать из буфера. Наличие одной блокировки для входа и одно для выхода будет проблематичным, если очередь пуста или имеет один элемент. Любые мысли об этом?

fifo.h

#ifndef FIFO_H
#define FIFO_H

#include <array>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <type_traits>

//compiler I have is not c++14 compliant
template< bool B, class T = void >
using enable_if_t = typename std::enable_if<B,T>::type;

template<class T, std::size_t CAPACITY, bool THREADSAFE = true>
class FIFO_t
{
public:
    FIFO_t(): buf_capacity_(CAPACITY + 1), // add dummy unused element so we can tell the difference between when buffer is full and when it is empty.
        input_index_(0), output_index_(0), wait_flag_(true) { }

    // copy constructor needs to be fancy because the mutex is not copyable or
    // movable.  Uses private helper functions to pick single or
    // multi-threaded version

    FIFO_t(const FIFO_t &that):buf_capacity_(CAPACITY + 1)
    {
        copy_fifo(that, std::integral_constant<bool, THREADSAFE>{});
    }

    FIFO_t& operator=(const FIFO_t& that)
    {
        return copy_fifo(that, std::integral_constant<bool, THREADSAFE>{});
    }

//Single thread public definitions*********************************************

    //push --pop oldest element if queue is full
    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    push(const T &data)
    {
        return push_(data);
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    push(T &&data)
    {
        return push_(std::move(data));
    }

    // push and fail if queue is full
    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    try_push(const T &data)
    {
        return try_push_(data);
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    multi_push(const T data[], size_t count)
    {
        for (size_t i = 0; i < count; ++i)
        {
            push_(data[i]);
        }
        return true;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, size_t>
    try_multi_push(const T data[], size_t count)
    {
        size_t num_pushed = 0;
        for (size_t i = 0; i < count && get_size() < capacity() ; ++i)
        {
            try_push_(data[i]);
            num_pushed++;
        }

        return num_pushed;
    }

    // no need for pop_nowait, wait_on or wait_off functions in
    // single threaded version

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    pop(T &data)
    {
        return pop_(data);
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, size_t>
    multi_pop(T data[], size_t count)
    {
        size_t num_popped = 0;
        for (size_t i = 0; i < count && !is_empty_(); ++i)
        {
            pop_(data[i]);
            num_popped++;
        }
        return num_popped;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    peek(std::size_t ind, T &data) const
    {
        std::size_t buf_size = get_size();
        if (ind >= buf_size)
        {
            return false;
        }
        else
        {
            data = buffer_data_[(output_index_ + ind) % buf_capacity_];
            return true;
        }
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, std::size_t>
    size() const
    {
        return get_size();
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<not trigger, bool>
    is_empty() const
    {
       return is_empty_();
    }

//mutlithreaded public definitions

    //push --pop oldest element if queue is full
    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    push(const T &data)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);
        bool result = push_(data);
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    push(T &&data)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);
        bool result = push_(std::move(data));
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    // push and fail if queue is full
    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    try_push(const T &data)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);
            this_lock.lock();
        bool result = try_push_(data);
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    multi_push(const T data[], size_t count)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);

        for (size_t i = 0; i < count; ++i)
        {
            push_(data[i]);
        }
        //this_lock.unlock();
        cv_.notify_one();

        return true;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, size_t>
    try_multi_push(const T data[], size_t count)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);

        size_t num_pushed = 0;
        for (size_t i = 0; i < count && get_size() < capacity() ; ++i)
        {
            try_push_(data[i]);
            num_pushed++;
        }
        //this_lock.unlock();
        cv_.notify_one();

        return num_pushed;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    pop(T &data)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);
        cv_.wait(this_lock, [this]{return !(is_empty_() && wait_flag_);});
        return pop_(data);
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    try_pop(T &data)
    {
        std::unique_lock<std::mutex> lock(this->mutex_);
        return pop_(data);
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, size_t>
    multi_pop(T data[], size_t count)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);
        cv_.wait(this_lock, [this]{return !(is_empty_() && wait_flag_);});

        size_t num_popped = 0;
        for (size_t i = 0; i < count && !is_empty_(); ++i)
        {
            pop_(data[i]);
            num_popped++;
        }
        return num_popped;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    peek(std::size_t ind, T &data) const
    {
        std::lock_guard<std::mutex> this_lock(this->mutex_);
        std::size_t buf_size = get_size();

        if (ind >= buf_size)
        {
            return false;
        }
        else
        {
            data = buffer_data_[(output_index_ + ind) % buf_capacity_];
            return true;
        }
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, size_t>
    size() const
    {
        std::lock_guard<std::mutex> this_lock(this->mutex_);
        return get_size();
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger, bool>
    is_empty() const
    {
        std::lock_guard<std::mutex> this_lock(this->mutex_);
        return is_empty_();
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger>
    wait_on()
    {
        std::lock_guard<std::mutex> this_lock(this->mutex_);
        wait_flag_ = true;
    }

    template<bool trigger = THREADSAFE>
    enable_if_t<trigger>
    wait_off()
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_);
            wait_flag_ = false;
        //this_lock.unlock();
            cv_.notify_all();
        }

    //same for single and multi-threaded versions
    std::size_t capacity() const
    {
        return  CAPACITY;
    }

private:
    std::array<T, CAPACITY+1> buffer_data_;
    const std::size_t buf_capacity_;
    std::size_t input_index_;
    std::size_t output_index_;
    std::atomic_bool wait_flag_;
    mutable std::mutex mutex_;
    mutable std::condition_variable cv_;


    FIFO_t& copy_fifo(const FIFO_t& that, std::true_type)
    {
        std::unique_lock<std::mutex> this_lock(this->mutex_, std::defer_lock);
        std::unique_lock<std::mutex> that_lock(that.mutex_, std::defer_lock);
        std::lock(this_lock, that_lock);

        buffer_data_ = that.buffer_data_;
        input_index_ = that.input_index_;
        output_index_ = that.output_index_;
        wait_flag_    = that.wait_flag_.load();
        return *this;
    }

    FIFO_t& copy_fifo(const FIFO_t& that, std::false_type)
    {
        buffer_data_ = that.buffer_data_;
        input_index_ = that.input_index_;
        output_index_ = that.output_index_;
        wait_flag_    = that.wait_flag_.load();
        return *this;
    }

    // non thread safe version for internal use (assumes calling function
    // will acquire the lock)
    std::size_t get_size() const
    {
        if (input_index_ == output_index_)
        {
            return 0;
        }
        else if (input_index_ > output_index_)
        {
            return input_index_ - output_index_;
        }
        else
        {
            return input_index_ + buf_capacity_ - output_index_;
        }
    }

    bool is_empty_() const
    {
        return (input_index_ == output_index_);
    }

    bool push_(const T &data)
    {
        if(get_size() == CAPACITY)
        {
            T temp;
            pop_(temp);
        }

        buffer_data_[input_index_] = data;
        input_index_ = (input_index_ + 1) % buf_capacity_;
        return true;
    }

    bool push_(T &&data)
    {
        if (get_size() == CAPACITY)
        {
            T temp;
            pop_(temp);
        }

        buffer_data_[input_index_] = std::move(data);
        input_index_ = (input_index_ + 1) % buf_capacity_;
        return true;
    }

    bool try_push_(const T &data)
    {
        if (get_size() == CAPACITY)
        {
            return false;
        }
        else
        {
            buffer_data_[input_index_] = data;
            input_index_ = (input_index_ + 1) % buf_capacity_;
            return true;
        }
    }

    bool pop_(T &data)
    {
        if (is_empty_())
        {
            return false;
        }
        else
        {
            data = std::move(buffer_data_[output_index_]);
            output_index_ = (output_index_ + 1) % buf_capacity_;
            return true;
        }
    }
};

template<class T, std::size_t N>
using ST_FIFO = FIFO_t<T, N, false>; //alias for a single threaded fifo

template<class T, std::size_t N> //alias for a multi-threaded fifo
using MT_FIFO = FIFO_t<T, N, true>;

#endif // FIFO_H

Тест-драйвер (не завершен):

main.cpp

#include <iostream>
#include <vector>
#include "fifo.h"
#include <cstdlib>
#include <chrono>
#include <unistd.h>
#include <thread>
#include <future>
#include <map>

bool TEST_Initially_Empty()
{
    ST_FIFO<int, 10> buf;

    if (buf.size() == 0)
    {
        return true;
    }
    else
    {
        return false;
    }
}

bool TEST_Put_Get_FIFO()
{
    const int num_elements = 5;
    int a[num_elements], b[num_elements];
    ST_FIFO<int, 10> buf;

    for(unsigned i = 0; i < num_elements; ++i)
    {
        a[i] = std::rand() % 1000;
    }

    for (unsigned i = 0; i < num_elements; ++i)
    {
        if (!buf.push(a[i]))
        {
            return false;
        }
    }

    for(unsigned i = 0; i < num_elements; ++i)
    {
        if (!buf.pop(b[i]))
        {
            return false;
        }
    }

    for (unsigned i = 0; i < num_elements; ++i)
    {
        if (a[i] != b[i])
        {
            return false;
        }
    }

    return true;
}

bool TEST_Put_to_full()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 10> buf;

    for (std::size_t i = 0; i < cnt; ++i)
    {
        if (!buf.push(i))
        {
            return false;
        }
    }

    if (buf.try_push(cnt))
    {
        return false;
    }

    return true;
}

bool TEST_Put_to_full_DiscardOldest()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 10> buf;
    int a[10], temp, temp2;

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
        if (!buf.push(i))
        {
            std::cout << "returning false" << std::endl;
            return false;
        }
    }

    buf.push(cnt);
    buf.peek(0,temp);
    buf.peek(9,temp2);
    if (temp != 1 || temp2 != 10)
    {
        return false;
    }

    buf.pop(temp);
    if (a[1] != temp)
    {
        return false;
    }

    return true;
}

bool TEST_Put_to_full_DiscardNewest()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 10> buf;
    int a[10], temp;

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
        if (!buf.try_push(i))
        {
            std::cout << "returning false" << std::endl;
            return false;
        }
    }

    if (buf.try_push(cnt))
    {
        return false;
    }
    buf.pop(temp);
    if (a[0] != temp)
    {
        return false;
    }

    return true;
}

bool TEST_multi_push()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 10> buf;
    int a[10], temp;

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
    }
    buf.multi_push(a, 10);

    for (int i=0; i<10; ++i)
    {
        buf.peek(i,temp);
        if (temp !=a[i])
        {
            return false;
        }
    }
    return true;
}

bool TEST_multi_push_overflow()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 7> buf;
    int a[cnt], temp;

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
    }
    buf.multi_push(a, cnt);

    for (int i=0; i<7; ++i)
    {
        buf.peek(i,temp);
        if (temp !=a[i+3])
        {
           return false;
        }
    }
    return true;
}

bool TEST_try_multi_push()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 10> buf;
    int a[cnt], temp;

    buf.push(234);
    buf.push(249);
    buf.push(233);
    buf.pop(temp);
    buf.pop(temp);
    buf.pop(temp);

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
    }
    buf.try_multi_push(a, cnt);

    for (int i=0; i<10; ++i)
    {
        buf.peek(i,temp);
        if (temp !=a[i])
        {
           return false;
        }
    }
    return true;
}

bool TEST_try_multi_push_overflow()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 7> buf;
    int a[cnt], temp;

    buf.push(234);
    buf.push(249);
    buf.push(233);
    buf.pop(temp);
    buf.pop(temp);
    buf.pop(temp);

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
    }
    buf.try_multi_push(a, cnt);

    for (int i=0; i<7; ++i)
    {
        buf.peek(i,temp);
        if (temp !=a[i])
        {
           return false;
        }
    }
    return true;
}

bool TEST_multi_pop()
{
    std::size_t cnt = 10;

    ST_FIFO<int, 10> buf;
    int a[cnt], b[cnt], temp;

    for (std::size_t i = 0; i < cnt; ++i)
    {
        a[i] = i;
    }
    buf.push(28);
    buf.push(240);
    buf.try_multi_push(a, cnt);

    buf.pop(temp);
    if (temp != 28)
    {
        return false;
    }
    buf.pop(temp);

    if (temp != 240)
    {
        return false;
    }

    int count = buf.multi_pop(b,10);
    if (count != 8)
    {
        std::cout << "count = " << count << ", expected value 8." << std::endl;
        return false;
    }

    for (int i=0; i<count; ++i)
    {

        if (b[i] !=a[i])
        {
           return false;
        }
    }
    return true;
}

void emptyTest(MT_FIFO<int, 100> &buf, const std::size_t num_elements)
{
    for (int ctr = 0; ctr < 5; ++ctr)
    {
        std::cout << "waiting..." << ctr << std::endl;
        usleep(1000000);
    }


    for(std::size_t i = 0; i < num_elements; ++i)
    {
        buf.push(std::rand() % 1000);
    }
}

bool TEST_Get_From_Empty()
{
    MT_FIFO<int,100> buf;
    const std::size_t num_elements = 5;
    int a[num_elements];

    std::thread t1(emptyTest, std::ref(buf), num_elements);

    std::chrono::system_clock::time_point start =
            std::chrono::system_clock::now();
    if(buf.pop(a[0]))
    {
        std::chrono::system_clock::time_point stop =
                std::chrono::system_clock::now();
        double diff = std::chrono::duration_cast<std::chrono::duration<double>> (stop-start).count();
        if (diff < 4)
        {
            std::cout << "diff = " << diff << std::endl;
            return false;
        }
    }

    t1.join();

    return true;
}

bool TEST_peek()
{
    const std::size_t num_elements = 100;
    ST_FIFO<int,100> buf;
    int a[num_elements];
    int b[num_elements];

    if (buf.peek(0,a[0]))
    {
        return false;
    }

    //condition buffer so next time we fill it, the counters will transition
    for(std::size_t i = 0; i < num_elements/2; ++i)
    {
        if (! buf.push(rand() % 1000))
        {
            return false;
        }

        if (! buf.pop(a[i]))
        {
            return false;
        }
    }

    if (! buf.is_empty())
    {
        return false;
    }

    for (std::size_t i = 0; i < num_elements; ++i)
    {
        a[i] = rand() % 1000;
        if(!buf.push(a[i]))
        {
            return false;
        }
        if (!buf.peek(i,b[i]))
        {
            return false;
        }
        if (a[i] != b[i])
        {
            return false;
        }

    }

    if (buf.peek(num_elements,b[0]))
    {
        return false;
    }

    return true;
}

bool TEST_Wrap_around()
{
    const int num_elements = 100;
    int a[num_elements], b[num_elements];
    bool test_result = true;
    ST_FIFO<int,10> buf;

    for(unsigned i = 0; i < num_elements - 1; ++i)
    {
        a[i] = std::rand() % 1000;
        a[i + 1] = std::rand() % 1000;

        if (!buf.push(a[i]))
        {
            test_result = false;
        }

        if (!buf.push(a[i+1]))
        {
            test_result =  false;
        }

        if (!buf.pop(b[i]))
        {
            test_result = false;
        }
        if (!buf.pop(b[i+1]))
        {
            test_result = false;
        }

        if (a[i] != b[i] || a[i + 1] != b[i + 1])
        {
            test_result = false;
        }
    }

    return test_result;
}

bool TEST_size()
{
    ST_FIFO<unsigned char, 15> buf;

    for (std::size_t i = 0; i < buf.capacity(); ++i)
    {
        if (buf.size() != i)
        {
            std::cout << "size fail buf_size = " << buf.size() << ", i = " << i << std::endl;
            return false;
        }
        if (! buf.push(rand() % 255))
        {
            std::cout << "push fail i = " << i << std::endl;
            return false;
        }
        if (buf.size() != i+1)
        {
            std::cout << "size fail buf_size = " << buf.size() << ", i = " << i << std::endl;
            return false;
        }
    }
    return true;
}

bool TEST_capacity()
{
    std::size_t num_elements = 15;
    ST_FIFO<unsigned char, 15> buf;

    if (buf.capacity() != num_elements)
    {
        return false;
    }
    return true;
}

bool TEST_st_copy()
{
    const int num_elements = 5;
    int a[num_elements], b[num_elements];
    ST_FIFO<int, 10> buf;

    for(unsigned i = 0; i < num_elements; ++i)
    {
        a[i] = std::rand() % 1000;
    }

    for (unsigned i = 0; i < num_elements; ++i)
    {
        if (!buf.push(a[i]))
        {
            return false;
        }
    }

    ST_FIFO<int,10> buf2(buf);


    for (unsigned i = 0; i < num_elements; i++)
    {
        buf2.pop(b[i]);
        if (b[i] != a[i])
        {
            return false;
        }
    }
    return true;
}


bool TEST_mt_copy()
{
    const int num_elements = 5;
    int a[num_elements], b[num_elements];
    MT_FIFO<int, 10> buf;

    for(unsigned i = 0; i < num_elements; ++i)
    {
        a[i] = std::rand() % 1000;
    }

    for (unsigned i = 0; i < num_elements; ++i)
    {
        if (!buf.push(a[i]))
        {
            return false;
        }
    }

    MT_FIFO<int,10> buf2(buf);


    for (unsigned i = 0; i < num_elements; i++)
    {
        buf2.pop(b[i]);
        if (b[i] != a[i])
        {
            return false;
        }
    }
    return true;
}

template<class T>
struct ts_obj
{
    ts_obj() {}
    ts_obj(T obj):obj_(obj) {}

    T operator()()
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return obj_;
    }

     void operator=(const T &obj)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        obj_ = obj;
    }


private:
    T obj_;
    std::mutex mutex_;
};


void worker_thread(MT_FIFO<std::packaged_task<int()>,10> &fifo, ts_obj<bool> &flag)
{
    while(flag())
    {
        std::packaged_task<int()> task{};
        if (fifo.pop(task))
        {
            task();
        }
        else
        {
            //std::cout << "didn't execute task..." << std::endl;
        }
    }
}

bool TEST_MT()
{
    ts_obj<bool> flag(true);
    MT_FIFO<std::packaged_task<int()>, 10> fifo;

    std::thread thread1(&worker_thread, std::ref(fifo), std::ref(flag));
    std::packaged_task<int()> task1([]{return 1;});

    auto result = task1.get_future();

    fifo.push(std::move(task1));


    sleep(1);
    //std::cout << "Result is " << result.get() << std::endl;
    flag = false;
    fifo.wait_off();

    thread1.join();
    return true;
}

enum class task_status{
    INVALID = -1,
    NOT_STARTED = 0,
    IN_PROGRESS,
    INTERRUPTED,
    FAILED,
    SUCCESS
};

const static std::map<task_status, std::string> enum_str {
    {task_status::INVALID, "INVALID"},
    {task_status::NOT_STARTED, "NOT_STARTED"},
    {task_status::IN_PROGRESS, "IN_PROGRESS"},
    {task_status::INTERRUPTED, "INTERRUPTED"},
    {task_status::FAILED, "FAILED"},
    {task_status::SUCCESS, "SUCCESS"}
};

class baseTask
{
public:
    baseTask(): m_status(task_status::INVALID) {}
    virtual ~baseTask () {}
    virtual void execute() = 0;
    virtual void cancel() = 0;
    task_status status() {return m_status();}
protected:
    //std::atomic<task_status> m_status;
    ts_obj<task_status> m_status;
private:

};

class derivedtask : public baseTask
{
public:
    derivedtask() {}
    derivedtask(int wait):wait_time(wait), flag(false) {m_status = task_status::NOT_STARTED;}
    ~derivedtask() {}
    void execute()
    {
        m_status = task_status::IN_PROGRESS;
        int ctr = 0;
        while (ctr < wait_time && !flag())
        {
            //std::cout << "waiting " << ctr << std::endl;
            sleep(1);
            ctr++;
        }

        if (flag())
        {
            m_status = task_status::INTERRUPTED;
        }
        else
        {
            m_status = task_status::SUCCESS;
        }

    }

    void cancel()
    {
        flag = true;
    }

private:
    int wait_time = 0;
    ts_obj<bool> flag;
};

void worker_thread2(MT_FIFO<std::shared_ptr<baseTask>, 10> &fifo, ts_obj<bool> &flag)
{
    while(flag())
    {
        std::shared_ptr<baseTask> task{};
        if (fifo.pop(task))
        {
            task->execute();
        }
        else
        {
            //std::cout << "didn't execute task..." << std::endl;
        }

    }
}

bool TEST_MT2()
{
    ts_obj<bool> flag(true);
    MT_FIFO<std::shared_ptr<baseTask>, 10> fifo;

    std::thread thread1(&worker_thread2, std::ref(fifo), std::ref(flag));
    //std::packaged_task<int()> task1([]{return 1;});
    std::shared_ptr<baseTask> task1(new derivedtask(5));
    std::shared_ptr<baseTask> task2(new derivedtask(5));
    //auto result = task1.get_future();

    fifo.push(task1);
    fifo.push(task2);
    int ctr = 0;
    while (task1->status() == task_status::NOT_STARTED ||
           task1->status() == task_status::IN_PROGRESS)
    {
        ctr++;
        //std::cout <<"waiting for task 1 to complete" << std::endl;
        sleep(1);
        if (ctr == 3)
        {
            task1->cancel();
        }
    }
    while (task2->status() == task_status::NOT_STARTED ||
           task2->status() == task_status::IN_PROGRESS)
    {
        //std::cout <<"waiting for task 2 to complete" << std::endl;
        sleep(1);
    }
    //std::cout << "Result is " << enum_str.at(task1->status()) << std::endl;
    //std::cout << "Result is " << enum_str.at(task2->status()) << std::endl;

    task1.reset();
    task2.reset();

    flag = false;
    fifo.wait_off();

    thread1.join();
    return true;
}

int main()
{
    std::cout << "TEST Initially empty: " << (TEST_Initially_Empty() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Get Put: " << (TEST_Put_Get_FIFO() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Wrap Around: " << (TEST_Wrap_around() ? "PASS" : "FAIL")<< std::endl  << std::endl;
    std::cout << "TEST Put_to_full: " << (TEST_Put_to_full() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Put_to_full, discarding oldest: " << (TEST_Put_to_full_DiscardOldest() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Put_to_full, discarding newest: " << (TEST_Put_to_full_DiscardNewest() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Multi-push: " << (TEST_multi_push() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Multi-push-overflow: " << (TEST_multi_push_overflow() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Try-Multi-push: " << (TEST_try_multi_push() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Try-Multi-push-overflow: " << (TEST_try_multi_push_overflow() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Try-multi-pop-overflow: " << (TEST_multi_pop() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Get_from_empty: " << (TEST_Get_From_Empty() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Peek: " << (TEST_peek() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Size: " << (TEST_size() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST Capacity: " << (TEST_capacity() ? "PASS" : "FAIL")  << std::endl << std::endl;
    std::cout << "TEST ST Copy: " << (TEST_st_copy() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST MT Copy: " << (TEST_mt_copy() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST MT: " << (TEST_MT() ? "PASS" : "FAIL") << std::endl << std::endl;
    std::cout << "TEST MT2: " << (TEST_MT2() ? "PASS" : "FAIL") << std::endl << std::endl;
    return 0;
}

UPDATE: Основываясь на полученной обратной связи, я разделил ее на два класса. С целью возиться и учиться это также показалось хорошим примером использования Curiously Recurring Template Pattern, поэтому я использовал его в моих пересмотренных классах. Я также сделал несколько небольших настроек, которые заменяют изменяемые ссылочные аргументы указателями на функции pop и peek. Вот он:

#ifndef FIFO_H
#define FIFO_H

#include <array>
#include <mutex>
#include <condition_variable>
#include <atomic>

// A FIFO class.  Implements a statically allocated FIFO.  Two versions
// are defined--a single threaded version and a threadsafe version.
// uses CRTP for experimentation.
template<class T, size_t CAPACITY, class DERIVED>
class FIFO_BASE_t
{
public:
    bool push(const T &data) {return derived_type().derived_push(data);}

    bool push(T &&data){return derived_type().derived_push(std::move(data));}

    bool try_push(const T &data){return derived_type().derived_try_push(data);}

    bool multi_push(const T data[], size_t count)
        {return derived_type().derived_multi_push(data, count);}

    size_t try_multi_push(const T data[], size_t count)
        {return derived_type().derived_try_multi_push(data, count);}

    bool pop(T *data) {return derived_type().derived_pop(data);}

    size_t multi_pop(T data[], size_t count)
        {return derived_type().derived_multi_pop(data, count);}

    bool peek(size_t index, T *data) const
        {return derived_const().derived_peek(index, data);}

    size_t size() const {return derived_const().derived_size();}

    bool is_empty() const {return derived_const().derived_is_empty();}

protected:
    FIFO_BASE_t() {}
    DERIVED& copy_fifo(const DERIVED& rhs)
    {
        buffer_data_  = rhs.buffer_data_;
        input_index_  = rhs.input_index_;
        output_index_ = rhs.output_index_;
        return derived_type();
    }

    size_t size_() const
    {
        if (input_index_ == output_index_)
        {
            return 0;
        }
        if (input_index_ > output_index_)
        {
            return input_index_ - output_index_;
        }

        return input_index_ + CAPACITY + 1 - output_index_;
    }

    bool is_empty_() const
    {
        return input_index_ == output_index_;
    }

    bool push_one_(const T &data)
    {
        if(size_() == CAPACITY)
        {
            T temp;
            pop_one_(&temp);
        }

        buffer_data_[input_index_] = data;
        input_index_ = (input_index_ + 1) % (CAPACITY + 1);
        return true;
    }

    bool push_one_(T &&data)
    {
        if (size_() == CAPACITY)
        {
            T temp;
            pop_one_(&temp);
        }

        buffer_data_[input_index_] = std::move(data);
        input_index_ = (input_index_ + 1) % (CAPACITY + 1);
        return true;
    }

    bool try_push_one_(const T &data)
    {
        if (size_() < CAPACITY)
        {
            buffer_data_[input_index_] = data;
            input_index_ = (input_index_ + 1) % (CAPACITY + 1);
            return true;
        }
        return false;
    }

    bool multi_push_(const T data[], size_t count)
    {
        for (size_t i = 0; i < count; ++i)
        {
            push_one_(data[i]);
        }
        return true;
    }

    size_t try_multi_push_(const T data[], size_t count)
    {
        size_t num_pushed = 0;
        while(num_pushed < count)
        {
            if(!try_push_one_(data[num_pushed]))
            {
                break;
            }
            num_pushed++;
        }
        return num_pushed;
    }

    bool pop_one_(T *data)
    {
        if (is_empty_())
        {
            return false;
        }
        *data = std::move(buffer_data_[output_index_]);
        output_index_ = (output_index_ + 1) % (CAPACITY + 1);
        return true;
    }

    size_t multi_pop_(T data[], size_t count)
    {
        size_t num_popped = 0;
        while(num_popped < count)
        {
            if (!pop_one_(&data[num_popped]))
            {
                break;
            }
            num_popped++;
        }
        return num_popped;
    }

    bool peek_(size_t index, T *data) const
    {
        if (index < size_())
        {
            *data = buffer_data_[(output_index_ + index) % (CAPACITY + 1)];
            return true;
        }
        return false;
    }

private:
    DERIVED& derived_type() {return static_cast<DERIVED&>(*this); }
    const DERIVED& derived_const() const
        {return static_cast<const DERIVED&>(*this);}

    std::array<T, CAPACITY + 1> buffer_data_;
    size_t input_index_ = 0;
    size_t output_index_ = 0;

};


//FIFo for single threaded operation
//T is object type the FIFO will store. CAPACITY is the number of items the
// FIFO can concurrently store.
template<class T, size_t CAPACITY>
class ST_FIFO: public FIFO_BASE_t<T, CAPACITY, ST_FIFO<T, CAPACITY>>
{
public:
    ST_FIFO() {}
    ST_FIFO(const ST_FIFO& rhs)
    {
        this->copy_fifo(rhs);
    }

    ST_FIFO& operator=(const ST_FIFO& rhs)
    {
        return this->copy_fifo(rhs);
    }

    friend class FIFO_BASE_t<T, CAPACITY, ST_FIFO>;
private:
    bool derived_push(const T &data) {return this->push_one_(data);}

    bool derived_push(T &&data){return this->push_one_(std::move(data));}

    bool derived_try_push(const T &data){return this->try_push_one_(data);}

    bool derived_multi_push(const T data[], size_t count)
        {return this->multi_push_(data, count);}

    size_t derived_try_multi_push(const T data[], size_t count)
        {return this->try_multi_push_(data, count);}

    bool derived_pop(T *data) {return this->pop_one_(data);}

    size_t derived_multi_pop(T data[], size_t count)
        {return this->multi_pop_(data, count);}

    bool derived_peek(size_t index, T *data) const {return this->peek_(index, data);}

    size_t derived_size() const {return this->size_();}

    bool derived_is_empty() const {return this->is_empty_();}
};


//threadsafe version of FIFO
//T is object type the FIFO will store. CAPACITY is the number of items the
// FIFO can concurrently store.
template<class T, size_t CAPACITY>
class MT_FIFO: public FIFO_BASE_t<T, CAPACITY, MT_FIFO<T, CAPACITY>>
{
public:
    MT_FIFO():wait_flag_(true) {}

    MT_FIFO(const MT_FIFO& rhs)
    {
        lock(mutex_, rhs.mutex_);
        std::lock_guard<std::mutex> (mutex_, std::adopt_lock);
        std::lock_guard<std::mutex> (rhs.mutex_, std::adopt_lock);
        wait_flag_ = wait_flag_.load();
        this->copy_fifo(rhs);
    }

    MT_FIFO& operator=(const MT_FIFO& rhs)
    {
        if(this == &rhs)
        {
            return this;
        }
        lock(mutex_, rhs.mutex_);
        std::lock_guard<std::mutex> (mutex_, std::adopt_lock);
        std::lock_guard<std::mutex> (rhs.mutex_, std::adopt_lock);
        wait_flag_ = wait_flag_.load();
        return this->copy_fifo(rhs);
    }

    bool try_pop(T *data)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return this->pop_one_(data);
    }

    void wait_off() {wait_flag_ = false; cv_.notify_all();}

    void wait_on() {wait_flag_ = true;}


  friend class FIFO_BASE_t<T, CAPACITY, MT_FIFO>;
private:
    bool derived_push(const T &data)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        bool result = this->push_one_(data);
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    bool derived_push(T &&data)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        bool result = this->push_one_(std::move(data));
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    bool derived_try_push(const T &data)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        bool result =  this->try_push_one_(data);
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    bool derived_multi_push(const T data[], size_t count)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        bool result = multi_push_(data, count);
        //this_lock.unlock();
        cv_.notify_one();
        return result;
    }

    size_t derived_try_multi_push(const T data[], size_t count)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        size_t num_pushed =  try_multi_push_(data, count);
        //this_lock.unlock();
        cv_.notify_one();
        return num_pushed;
    }

    bool derived_pop(T *data)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        cv_.wait(this_lock, [this]{return !(this->is_empty_() && wait_flag_);});
        return this->pop_one_(data);
    }

    size_t derived_multi_pop(T data[], size_t count)
    {
        std::unique_lock<std::mutex> this_lock(mutex_);
        cv_.wait(this_lock, [this]{return !(this->is_empty_() && wait_flag_);});
        return multi_pop_(data, count);
    }

    bool derived_peek(size_t index, T *data) const
    {
        std::lock_guard<std::mutex> this_lock(mutex_);
        return peek_(index, data);
    }

    size_t derived_size() const
    {
        std::lock_guard<std::mutex> this_lock(mutex_);
        return this->size_();
    }

    bool derived_is_empty() const
    {
        std::lock_guard<std::mutex> this_lock(mutex_);
        return this->is_empty_();
    }

    std::atomic_bool wait_flag_;
    mutable std::mutex mutex_;
    mutable std::condition_variable cv_;

};


#endif // FIFO_H

см. его в coliru

8 голосов | спросил schrödinbug 6 Mayam17 2017, 07:05:09

1 ответ


4
  1. Не злоупотребляйте SFINAE, чтобы обеспечить два класса в одном.
    Он не работает для полей и обычно неуклюж.

    Вместо этого, либо инвестируйте в личное наследование и разумное использование using, либо напишите их отдельно.

  2. Твой код operator= в самозадаче в многопоточном случае.

  3. Если вы делаете раннее возвращение, нет смысла писать блок else.

    if (boolean_expression)
    {
        some_code;
        return true;
    }
    else
    {
        some_other_code;
        return false;
    }
    

    Лучше измените это на:

    if (boolean_expression) {
        some_code;
        return true;
    }
    some_other_code;
    return false;
    
  4. Вы можете напрямую вернуть bool, не нужно ставить его в состояние:

    if (boolean_expression)
        return true;
    else
        return false;
    

    Лучше измените это на:

    return boolean_expression;
    
  5. buf_capacity_ имеет ровно два использования: отмена вашего класса и пессимизирующего кода.
    Это всегда точно CAPACITY + 1.

  6. Большая часть вашего многопоточного кода имеет форму:

    std::unique_lock<std::mutex> this_lock(this->mutex_);
    auto result = somecode();
    //this_lock.unlock();
    cv_.notify_one(); // or occassionally better cv_.notify_all();
    return result;
    

    Определение простого частного шаблона-члена помогает очистить его:

    template<bool all = false, class F>
    auto locked(F f) noexcept(noexcept(f())) {
        auto result = (std::unique_lock<std::mutex>(mutex_), f()); // minimal lock-duration
        all ? cv_.notify_all() : cv_.notify_one();
        return result;
    }
    // Used like:
    return locked([&]{ return somecode(); });
    

Размещено переписывание с помощью моих предложений для обзора здесь: (необязательно параллельный) FIFO

ответил Deduplicator 6 Maypm17 2017, 21:27:53

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132