Поддельные потери в реализации очереди без блокировки C ++

Я пытаюсь реализовать очередь без блокировки, которая использует линейный круговой буфер для хранения данных. В отличие от очереди без блокировки общего назначения, у меня есть следующие расслабляющие условия:

  • Я знаю наихудшее количество элементов, которые когда-либо будут храниться в очереди. Очередь является частью системы, которая работает с фиксированным набором элементов. Код никогда не будет пытаться сохранить больше элементов в очереди, поскольку в этом фиксированном наборе есть элементы.
  • Нет мультипроизводителя /мультипотребителя. Очередь будет использоваться либо в настройке мультипроизводителя /одного потребителя , либо в настройке одного производителя /мультипотребителя.

Концептуально очередь реализована следующим образом

  • Стандартный кольцевой буфер степени двойки. Базовая структура данных представляет собой стандартный кольцевой буфер, использующий трюк со степенью двойки . Индексы чтения и записи только увеличиваются. Они ограничены размером базового массива при индексации в массив с помощью простой битовой маски. Указатель чтения атомарно увеличивается в pop(), указатель записи атомарно увеличивается в push().
  • Переменная размера позволяет получить доступ к pop(). Дополнительная переменная «размер» отслеживает количество элементов в очереди. Это избавляет от необходимости выполнять арифметику по индексам чтения и записи. Переменная размера атомарно увеличивается после выполнения всей операции записи, то есть данные записываются в резервное хранилище и курсор записи увеличивается. Я использую операцию сравнения и замены (CAS) уменьшать размер атомарно в pop() и продолжать, только если размер не равен нулю. Таким образом, pop() должно гарантированно вернуть действительные данные.

Моя реализация очереди следующая. Обратите внимание на код отладки, который останавливает выполнение всякий раз, когда pop() пытается прочитать память, ранее записанную push(). Это никогда не должно происходить, поскольку - по крайней мере, концептуально - pop() может продолжаться только в том случае, если в очереди есть элементы (не должно быть переполнений) .

#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging

template <typename T>
class Queue {
private:
    uint32_t m_data_size;   // Number of elements allocated
    std::atomic<T> *m_data; // Queue data, size is power of two
    uint32_t m_mask;        // Bitwise AND mask for m_rd_ptr and m_wr_ptr
    std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
    std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
    std::atomic<uint32_t> m_size;   // Number of elements in the queue

    static uint32_t upper_power_of_two(uint32_t v) {
        v--; // https://graphics.stanford.edu/~seander/bithacks.html
        v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
        v++;
        return v;
    }

public:
    struct Optional { // Minimal replacement for std::optional
        bool good;
        T value;
        Optional() : good(false) {}
        Optional(T value) : good(true), value(std::move(value)) {}
        explicit operator bool() const { return good; }
    };

    Queue(uint32_t max_size)
        : // XXX Allocate 1 MiB of additional memory for debugging purposes
          m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
          m_data(new std::atomic<T>[m_data_size]),
          m_mask(m_data_size - 1),
          m_rd_ptr(0),
          m_wr_ptr(0),
          m_size(0) {
        // XXX Debug code begin
        // Fill the memory with a marker so we can detect invalid reads
        for (uint32_t i = 0; i < m_data_size; i++) {
            m_data[i] = 0xDEADBEAF;
        }
        // XXX Debug code end
    }

    ~Queue() { delete[] m_data; }

    Optional pop() {
        // Atomically decrement the size variable
        uint32_t size = m_size.load();
        while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
        }

        // The queue is empty, abort
        if (size <= 0) {
            return Optional();
        }

        // Read the actual element, atomically increase the read pointer
        T res = m_data[(m_rd_ptr++) & m_mask].load();

        // XXX Debug code begin
        if (res == T(0xDEADBEAF)) {
            std::raise(SIGTRAP);
        }
        // XXX Debug code end
        return res;
    }

    void push(T t) {
        m_data[(m_wr_ptr++) & m_mask].store(t);
        m_size++;
    }

    bool empty() const { return m_size == 0; }
};

Тем не менее, переполнение происходит и может быть легко запущено в многопоточном стресс-тесте. В этом конкретном тесте я поддерживаю две очереди: q1 и q2

Это прекрасно работает, если существует только один рабочий поток (один производитель /один потребитель) или если все рабочие потоки находятся на одном ЦП с основным потоком. Однако он завершается неудачно, как только два рабочих потока явно планируются на другой процессор, отличный от основного потока.

Следующий код реализует этот тест

#include <pthread.h>
#include <thread>
#include <vector>

static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
                                   Queue<int> &queue_rd, Queue<int> &queue_wr) {
    for (size_t i = 0; i < (1UL << 24); i++) {
        auto res = queue_rd.pop();
        if (res) {
            queue_wr.push(res.value);
        }
    }
    done_count++;
}

static void set_thread_affinity(pthread_t thread, int cpu) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
                               &cpuset) != 0) {
        throw "Error while calling pthread_setaffinity_np";
    }
}

int main() {
    static constexpr uint32_t n_threads{2U}; // Number of worker threads
    //static constexpr uint32_t n_threads{1U}; // < Works fine
    static constexpr uint32_t max_size{16U}; // Elements in the queue
    std::atomic<uint32_t> done_count{0};     // Number of finished threads
    Queue<int> queue1(max_size), queue2(max_size);

    // Launch n_threads threads, make sure the main thread and the two worker
    // threads are on different CPUs.
    std::vector<std::thread> threads;
    for (uint32_t i = 0; i < n_threads; i++) {
        threads.emplace_back(queue_stress_test_main, std::ref(done_count),
                             std::ref(queue1), std::ref(queue2));
        set_thread_affinity(threads.back().native_handle(), 0);
    }
    set_thread_affinity(pthread_self(), 1);
    //set_thread_affinity(pthread_self(), 0); // < Works fine

    // Pump data from queue2 into queue1
    uint32_t elems_written = 0;
    while (done_count < n_threads || !queue2.empty()) {
        // Initially fill queue1 with all values from 0..max_size-1
        if (elems_written < max_size) {
            queue1.push(elems_written++);
        }

        // Read elements from queue2 and put them into queue1
        auto res = queue2.pop();
        if (res) {
            queue1.push(res.value);
        }
    }

    // Wait for all threads to finish
    for (uint32_t i = 0; i < n_threads; i++) {
        threads[i].join();
    }
}

Большую часть времени эта программа вызывает ловушку в коде очереди, что означает, что pop() пытается прочитать память, которая никогда не была тронут push() - хотя pop() должен быть успешным только в том случае, если push() вызывается как минимум так же часто, как pop()

Вы можете скомпилировать и запустить вышеупомянутую программу с помощью GCC /clang в Linux, используя

c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue

Либо просто объедините два вышеупомянутых блока кода, либо загрузите полную программу .

Обратите внимание, что я начинающий, когда дело доходит до структур данных без блокировки. Я прекрасно понимаю, что для C ++ существует множество проверенных реализаций очередей без блокировок. Однако я просто не могу понять, почему приведенный выше код не работает должным образом.

4 голоса | спросил Andreas Stöckel 30 AMpMon, 30 Apr 2018 09:04:24 +030004Monday 2018, 09:04:24

1 ответ


0
У вас есть две ошибки, одна из которых может вызвать сбой, который вы наблюдаете.Давайте посмотрим на ваш push-код, за исключением того, что мы допустим только одну операцию для каждого оператора:Теперь для очереди с двумя производителями существует окно уязвимости к состоянию гонки между операциями 1 и 4:До:Производитель А:Планировщик усыпляет Producer A здесьПроизводитель Б:После:Теперь потребитель работает, видит ---- +: = 5 =: + ---- и читает из ---- +: = 6 =: + ---- при увеличении ---- +: = 7=: + ---- от 1 до 2. Но ---- +: = 8 =: + ---- еще не написано продюсером A, а продюсер B написал ---- +: =9 =: + ---- .Вторая ошибка - дополнительный случай в ---- +: = 10 =: + ----, когда поток потребителя прерывается между действием ---- +: = 11 =: + ---- и ---- +: = 12 =: + ---- звонок.Это может привести к чтению значений не по порядку, потенциально настолько не по порядку, что очередь полностью обведена и перезаписывает исходное значение.Тот факт, что две операции в одном выражении источника являются атомарными, не делает весь оператор атомарным.
ответил Ben Voigt 30 AMpMon, 30 Apr 2018 10:20:14 +030020Monday 2018, 10:20:14

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132