Python Threading стандартный ввод /вывод

У меня есть файл, который содержит много данных. Каждая строка является записью. И я пытаюсь сделать некоторую работу ETL против всего файла. Прямо сейчас я использую стандартный ввод для чтения данных построчно. Круто то, что ваш скрипт может быть очень гибким для интеграции с другими командами скрипта и оболочки. Я записываю результат в стандартный вывод. Например.

$ cat input_file
line1 
line2
line3
line4
...

Мой текущий код на Python выглядит следующим образом - parse.py

import sys
for line in sys.stdin:
    result = ETL(line)    # ETL is some self defined function which takes a while to execute.
    print result

Приведенный ниже код показывает, как это работает сейчас:

cat input_file | python parse.py > output_file

Я посмотрел на модуль Threading в Python и мне интересно, будет ли производительность значительно улучшена, если я использую этот модуль.

Вопрос1: Как планировать квоты для каждого потока, почему?

...
counter = 0
buffer = []
for line in sys.stdin:
    buffer.append(line)
    if counter % 5 == 0:   # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine
        counter = 0
        thread = parser(buffer)
        buffer = []
        thread.start() 

Вопрос2: Несколько потоков могут одновременно выводить результат обратно на стандартный вывод, как их организовать и избежать ситуации, описанной ниже?

import threading
import time

class parser(threading.Thread):
    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            print elem + 'Finished'

work = ['a', 'b', 'c', 'd', 'e', 'f']

thread1 = parser(['a', 'b'])  
thread2 = parser(['c', 'd'])
thread3 = parser(['e', 'f'])

thread1.start()
thread2.start()
thread3.start()   

Вывод действительно уродливый, где одна строка содержит выводы из двух потоков.

aFinished
cFinishedeFinished

bFinished
fFinished
dFinished
4 голоса | спросил B.Mr.W. 21 AM00000080000005731 2013, 08:41:57

2 ответа


0

Сначала ответим на ваш второй вопрос. Вот что такое мьютексы для. Вы можете получить более чистый вывод, который вам нужен, используя блокировку для координации между синтаксическими анализаторами и гарантируя, что только один поток имеет доступ к выходному потоку в течение заданного периода времени:

class parser(threading.Thread):
    output_lock = threading.Lock()

    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            with self.output_lock:
                print elem + 'Finished'

Что касается вашего первого вопроса, обратите внимание, что, возможно, многопоточность не принесет пользы для вашей конкретной рабочей нагрузки. Это в значительной степени зависит от того, является ли работа, которую вы выполняете с каждой строкой ввода (ваша функция ETL), в основном, связана с процессором или IO. Если первое (что, я подозреваю, вероятно), потоки не помогут, из-за глобальной блокировки интерпретатора . В этом случае вы хотели бы использовать модуль multiprocessing для распределения работы между несколькими процессами, а не между потоками.

Но вы можете получить те же результаты с более простым в реализации рабочим процессом: разделите входной файл на части n (используя, например, команда split); вызывать сценарий извлечения и преобразования отдельно для каждого субфайла; затем объединить полученные выходные файлы.

Один клочок: «использование стандартного ввода для построчного чтения данных, поскольку он не загружает весь файл в память», связано с неправильным представлением. Вы можете прочитать файл построчно из Python, например, заменив sys.stdin на объект файла в такой конструкции, как:

for line in sys.stdin:

См. также метод readline() для файловых объектов и обратите внимание, что read() может принимать в качестве параметра максимальное количество байтов для чтения.

ответил Alp 21 AM000000100000002431 2013, 10:03:24
0

Будет ли полезна многопоточность, зависит от вашей ситуации. В частности, если ваша функция ETL() требует большого доступа к диску, то многопоточность, вероятно, даст вам довольно значительное повышение скорости.

Отвечая на ваш первый вопрос, я всегда обнаруживал, что это просто зависит. При определении идеального числа потоков действует множество факторов, и многие из них зависят от программы. Например, если у вас много обращений к диску (что довольно медленно), вам нужно, чтобы больше потоков использовали время простоя во время ожидания доступа к диску. Однако, если программа связана с процессором, тонны потоков могут быть не очень полезными. Таким образом, хотя можно проанализировать все факторы, чтобы получить идеальное количество потоков, обычно гораздо быстрее сделать первоначальное предположение и затем откорректировать его.

Более конкретно, однако, присвоение определенного количества строк каждому потоку, вероятно, не лучший способ разделить работу. Рассмотрим, например, если для обработки одной строки требуется особенно много времени. Было бы лучше, если бы один поток мог работать с этой одной строкой, а другие потоки могли бы тем временем сделать еще несколько строк. Лучший способ справиться с этим - использовать Очередь. Если вы помещаете каждую строку в очередь, то каждый поток может вытянуть строку из очереди, обработать ее и повторять до тех пор, пока очередь не станет пустой. Таким образом, работа распределяется так, что ни один поток не обходится без работы (конечно, до конца).

Теперь второй вопрос. Вы совершенно правы, что запись в стандартный поток из нескольких потоков одновременно не является идеальным решением. В идеале вы должны расположить все так, чтобы запись на стандартный вывод происходила только в одном месте. Отличный способ сделать это - использовать Очередь. Если каждый поток записывает свой вывод в общую очередь, вы можете создать дополнительный поток, единственной задачей которого является извлечение элементов из этой очереди и печать их в стандартный вывод. Ограничив печать одним потоком, вы избежите проблем, присущих нескольким потокам, которые пытаются печатать одновременно.

ответил mculhane 21 AM000000100000005531 2013, 10:07:55

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132