Простая диспетчерская реализацияТеперь, если строка является palindromeMy собственной реализацией snprintf в игре CPong в создании навыков C ++ в Python на основе текста RPGshort или простое решение для размещения значений от метки до столбца в коде excelSuch, много ничего себе, очень FizzBuzzReverse порядок символов слов в строковой системе входа в систему с использованием Python Flask и проблемы MySQLSalesTax (версия C #) Итерация над Dungeon MapSimple реализация общего стека Равномерное равенство точки в Java - последующий процессПроцедурный мир в Unity3DAСлужебная служба для студентовПроверка существования объекта по id, имя, отображаемое имя или веб-idIs моя реализация FizzBuzz overkill? Суммы некоторых элементов массиваUnit тестирование IValueConverterLine сегмент для объединения алгоритма столкновений PHP User SystemGuessing тип файла на основе его contentMy Rational struct, версия 1

Мне нужна простая реализация диспетчера (вызывать методы в одном потоке из многих потоков) как только вы отправляете метод curent thread, вы должны ждать результатов, поэтому я думаю о чем-то вроде этого:

public class Dispatcher
{
    private readonly BlockingCollection<Tuple<Delegate, object[]>> runQueue = new BlockingCollection<Tuple<Delegate, object[]>>();
    private readonly BlockingCollection<object> resultQueue = new BlockingCollection<object>();
    private readonly CancellationTokenSource source = new CancellationTokenSource();
    private readonly Task task;
    public Dispatcher()
    {
        Task.Run(() =>
        {
            using (source)
            using (runQueue)
            using (resultQueue)
            {
                Debug.WriteLine("Dispatcher started with thread {0}", Thread.CurrentThread.ManagedThreadId);
                while (!source.IsCancellationRequested)
                {
                    var run = runQueue.Take(source.Token);
                    resultQueue.Add(run.Item1.DynamicInvoke(run.Item2));
                }
                Debug.WriteLine("Dispatcher ended");
            }
        });
    }

    public void Stop()
    {
        source.Cancel();
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    public object Invoke(Delegate @delegate, params object[] @params)
    {
        runQueue.Add(new Tuple<Delegate, object[]>(@delegate, @params));
        return resultQueue.Take(source.Token);
    }
}

Я тестировал его с таким кодом, и кажется, что он работает хорошо ...

class Program
{
    static void Main(string[] args)
    {
        Func<int, int, int> func = (sleep, i) =>
        {
            CC.WriteLine(i, "\tTask {0} will sleep for {1}, thread {2}", i, sleep, Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(sleep);
            CC.WriteLine(i, "\tTask {0} woke up", i, sleep);
            return i; //return task number
        };
        var rnd = new Random();
        var disp = new Dispatcher();
        {
            var tasks = Enumerable.Range(0, 10).Select(i =>
                            Task.Run(() =>
                            {
                                CC.WriteLine(i, "Task {0} started, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                                CC.WriteLine(i, "Task {0} ended with {1}", i, disp.Invoke(func, rnd.Next(3, 15) * 100, i));
                            })).ToArray();
            Task.WaitAll(tasks);
            disp.Stop();
        }

        Console.ReadKey();

    }
    static class CC
    {
        public static readonly List<ConsoleColor> COLORS = Enum.GetValues(typeof(ConsoleColor)).Cast<ConsoleColor>().ToList();
        static CC()
        {
            COLORS.RemoveAt(0);
            COLORS.RemoveAt(0);
        }
        public static void WriteLine(int colorIndex, string format, params object[] args)
        {
            lock (COLORS)
            {
                Console.ForegroundColor = COLORS[colorIndex];
                Console.WriteLine(format, args);
            }
        }
    }
}

Является ли BlockingCollection хорошим выбором? Или, может быть, это может быть сделано с чем-то более простым? (SlimSemaphore)?

0 голосов | спросил 481b8423202598ecfb233c5fa68cafaddisonLunainblankjL 1 J000000Friday16 2016, 15:47:42

2 ответа


7
  • Поскольку задача в Диспетчере - это долго выполняемая задача, вы должны использовать опцию TaskCreationOptions.LongRunning. В противном случае операция блокирует поток потока, который обычно используется для коротких операций. В качестве альтернативы вы также можете использовать Thread.
  • Вместо того, чтобы использовать сбор блокировки для возврата результата, вы можете использовать TaskCompletionSource. Это изменит возвращаемое значение метода Invoke на Task<T>. Таким образом, вызывающий код может использовать шаблон async /await.
  • Предположим, что вы выбрали исключение, если был остановлен Dispatcher и Invoke. В противном случае вызов блокируется навсегда, правильно?
ответил JanDotNet 14 J000000Thursday16 2016, 18:23:16
5

Спасибо @JanDotNet за вашу помощь ... googling вокруг ваших подсказок дает мне то, что мне действительно нужно SingleThreadTaskScheduler на основе StaTaskScheduler из Рой

public sealed class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
    private BlockingCollection<Task> _tasks;
    private readonly Thread _thread;

    public SingleThreadTaskScheduler()
    {
        // Initialize the tasks collection
        _tasks = new BlockingCollection<Task>();
        _thread = new Thread(() =>
        {
            // Continually get the next task and try to execute it.
            // This will continue until the scheduler is disposed and no more tasks remain.
            foreach (var t in _tasks.GetConsumingEnumerable())
            {
                TryExecuteTask(t);
            }
        });
        _thread.IsBackground = true;
        _thread.Name = "SingleThreadTaskScheduler-"+Guid.NewGuid().ToString();
        _thread.Start();
    }

    /// <summary>Queues a Task to be executed by this scheduler.</summary>
    /// <param name="task">The task to be executed.</param>
    protected override void QueueTask(Task task)
    {
        // Push it into the blocking collection of tasks
        _tasks.Add(task);
    }

    /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
    /// <returns>An enumerable of all tasks currently scheduled.</returns>
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        // Serialize the contents of the blocking collection of tasks for the debugger
        return _tasks.ToArray();
    }

    /// <summary>Determines whether a Task may be inlined.</summary>
    /// <param name="task">The task to be executed.</param>
    /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
    /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return Thread.CurrentThread.ManagedThreadId == _thread.ManagedThreadId && TryExecuteTask(task);
    }

    /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
    public override int MaximumConcurrencyLevel
    {
        get { return 1; }
    }

    /// <summary>
    /// Cleans up the scheduler by indicating that no more tasks will be queued.
    /// This method blocks until all threads successfully shutdown.
    /// </summary>
    public void Dispose()
    {
        if (_tasks != null)
        {
            // Indicate that no new tasks will be coming in
            _tasks.CompleteAdding();
            // Wait for all threads to finish processing tasks
            _thread.Join();
            // Cleanup
            _tasks.Dispose();
            _tasks = null;
        }
    }

и использование

class Program
{
    static void Main(string[] args)
    {

        Func<int, int, int> func = (sleep, i) =>
        {
            CC.WriteLine(i, "\tTask {0} will sleep for {1}, thread {2}", i, sleep, Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(sleep);
            CC.WriteLine(i, "\tTask {0} woke up", i, sleep);
            if (i == 5)
                throw new Exception("Exception test!");
            return i; //return task number
        };
        using (var scheduler = new SingleThreadTaskScheduler())
        {
            var rnd = new Random();
            var tasks = Enumerable.Range(0, 10).Select(i =>
                Task.Run(async () =>
                {
                    CC.WriteLine(i, "Task {0} started, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                    object r = null;
                    try
                    {
                        r = await Task.Factory.StartNew(() => func(rnd.Next(3, 15) * 100, i), CancellationToken.None, TaskCreationOptions.None, scheduler).ConfigureAwait(false);
                        CC.WriteLine(i, "Task {0} ended with {1}", i, r);
                    }
                    catch (Exception ex)
                    {
                        CC.WriteLine(i, "Task {0} exception: {1}", i, ex);
                    }

                })).ToArray();
            Task.WaitAll(tasks);
            Console.ReadKey();
        }
    }
    static class CC
    {
        public static readonly List<ConsoleColor> COLORS = Enum.GetValues(typeof(ConsoleColor)).Cast<ConsoleColor>().ToList();
        static CC()
        {
            COLORS.RemoveAt(0);
            COLORS.RemoveAt(0);
        }
        public static void WriteLine(int colorIndex, string format, params object[] args)
        {
            lock (COLORS)
            {
                Console.ForegroundColor = COLORS[colorIndex];
                Console.WriteLine(format, args);
            }
        }
    }
}
ответил Selvin 14 J000000Thursday16 2016, 19:09:41

Похожие вопросы

0
Простая современная реализация класса строк C ++. Проверить, может ли быть все фильмы. Распечатайте N самых длинных строк в файле. Функция для удаления запрещенных символов. Проверьте, заканчивается ли строка с stringOptimizing List <string> performancePassword Strength DetectorProgram, который удаляет комментарии из программ на C и C ++. Вы продвигаетесь, вы продвигаетесь, все мы продвигаемся! • Тестирование того, являются ли символы строки уникальными. Отключение игры с шоколадом в одном машинном симуляторе класса 900.Энигма - улучшающий рекурсивный алгоритм performanceReally простой неизменный пользовательский классSudoku решение без использования классовOperations для банковской учетной записиCode для умножения матриц вместеJulia Fractal Рисование в C ++ Builder Шаблон для Car classDisplaying каждого числа целого числа в последовательностиReplacing буквы с номерами с его положением в алфавитеPythons Вокруг RoseSimple lexer в C ++ Self-made Linked ListRock, Paper Scissors игра в Python Определить, если 2 изображения одинаковы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132