Распределенная подготовка многих небольших моделей ML

У меня есть приложение для научных исследований, которое включает в себя обучение десяткам тысяч небольших гауссовых моделей. Под «маленьким» я подразумеваю, что любая отдельная модель может быть легко обучена на одном из наших рабочих серверов. Фактически, мы должны иметь возможность тренировать сразу несколько на каждом узле.

Я изучаю Spark с пряжей, но Spark, похоже, действительно предназначен для обучения больших моделей на нескольких машинах, а не для многих небольших моделей, содержащихся на отдельных машинах.

Я представляю рабочий процесс, который выглядит следующим образом:

  1. данные группы по ключу
  2. отправить целые группы данных на отдельные рабочие машины,
  3. модели поездов для групп на одном рабочем компьютере.
  4. Откройте или сохраните подготовленные модели.

Я мог бы использовать некоторые рекомендации о том, как это реализовать. Модельное обучение неловко распараллеливается.

1 голос | спросил andrew 17 thEurope/Moscowp30Europe/Moscow09bEurope/MoscowMon, 17 Sep 2018 21:22:13 +0300 2018, 21:22:13

2 ответа


2

Нет никакого конкретного права или неправильного способа сделать это, потому что это зависит от ваших проектов и от того, можете ли вы использовать структуру своих данных для эффективности.

например. для одноразового проекта вы можете установить необходимое программное обеспечение на всех серверах, подготовить пакеты работы, подготовить SSH-логин для всех серверов, а затем использовать GNU Parallel, чтобы все серверы заняты обработкой рабочих пакетов. Это особенно хорошо подходит для ad-hoc-подхода, где исходные данные и модели вывода хранятся в виде простых файлов, и если вам удобно работать с командной строкой.

Если вы хотите регулярно и автоматически обучать новые модели, может быть лучше создать очередь рабочих элементов, то есть общую базу данных, содержащую все рабочие элементы и результаты. Затем вы используете какое-либо программное обеспечение для развертывания программного обеспечения для рабочего сервера на всех узлах вашего кластера. Этот рабочий сервер ожидает очереди для рабочих пакетов и записывает результаты обратно в базу данных. Это можно было бы сочетать с умным автоматическим масштабированием, чтобы адаптировать количество рабочих к количеству ожидающих работы, но это может быть излишним для простого проекта.

В любом случае:

  1. Начните с написания простого рабочего программного обеспечения, которое может обучать модели локально.
  2. Расширьте программное обеспечение, чтобы несколько рабочих на вашем локальном компьютере могли обучаться параллельно - не используйте несколько потоков. Это может включать базу данных или программное обеспечение, подобное GNU. Параллельно синхронизировать рабочих.
  3. Найдите способ запускать эти рабочие, распределенные по нескольким компьютерам. Ваше рабочее программное обеспечение уже способно к этому, этот шаг в основном является проблемой sysadmin («ops»).
ответил amon 18 thEurope/Moscowp30Europe/Moscow09bEurope/MoscowTue, 18 Sep 2018 14:57:12 +0300 2018, 14:57:12
0

Вот пример MCVE того, как я закончил реализацию этого. Это полностью работает в PySpark V2.1.1 с помощью scikit-learn при некоторых строгих предположениях (см. Требование 2).

Требования:

  1. scikit-learn устанавливается на каждом рабочем компьютере
  2. для одной модели все данные и учебные накладные расходы могут помещаться на одном рабочем компьютере

Общий рабочий процесс:

  1. Преобразование DataFrame в RDD [(K, V)], где ключи являются идентификаторами групп и значениями являются отдельными наблюдениями данных.
  2. Используйте groupByKey, чтобы перетасовать все данные для одного ключа на одном рабочем компьютере.
  3. Модели поездов для каждого ключа на рабочих машинах.
  4. Соберите подготовленные модели для сериализации для последующего поиска

Для моего приложения этот подход работает в минутах на большом Spark-кластере. Я тренирую десятки тысяч моделей.

import numpy as np
from pyspark.sql import SparkSession
from sklearn.mixture import GaussianMixture

##
# Make the example PySpark DataFrame
##

# Generate example data
nsamps = 500
cv1_1 = np.array([[1,0],[0,1]])
cv1_2 = np.array([[2,0],[0,0.5]])
cv2_1 = np.array([[2, -1.5,],[-1.5, 2]])
cv2_2 = np.array([[2, 1.5,],[1.5, 2]])

mu1_1 = np.array([0,0])
mu1_2 = np.array([0,3])

mu2_1 = mu1_1 + np.array([5,5])
mu2_2 = mu2_1 + np.array([5,5])

# Group 1 data
x1_1 = np.matmul(np.random.randn(nsamps,2), cv1_1) + mu1_1
x1_2 = np.matmul(np.random.randn(nsamps,2), cv1_2) + mu1_2
X1 = np.concatenate([x1_1, x1_2])

# Group 2 data
# X2 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_1 = np.matmul(np.random.randn(nsamps,2), cv2_1) + mu2_1
x2_2 = np.matmul(np.random.randn(nsamps,2), cv2_2) + mu2_2
X2 = np.concatenate([x2_1, x2_2])

# Group lables
labs = 2*nsamps*["a"] + 2*nsamps*["b"]

# Create the data frame
X = np.concatenate([X1, X2]).tolist()
dat = [(i, x[0], x[1]) for (i, x) in zip(labs, X)]
cols = ["id", "x", "y"]
df = spark.createDataFrame(dat, cols)

##
# Shuffle groups to individual workers and train models
##

# group by ids
kv = df.rdd.map(lambda r: (r.id, [r.x, r.y]))
# create a distrributed RDD where each group is localized on a single worker node
groups = kv.groupByKey()
# a single group is a tuple of id and an iterable with the data
# e.g. (u'a', <pyspark.resultiterable.ResultIterable at 0x7effd7debb90>)

# helper function to train GMMs on the data iterables
def trainGMM(data_itr):
    # Returns a trained GMM
    X = np.array(data_itr.data).astype(np.float64)
    gmm = GaussianMixture(n_components=2, covariance_type='full', tol=0.001, reg_covar=1e-06, max_iter=100,
                          n_init=1, init_params='kmeans', weights_init=None, means_init=None, precisions_init=None,
                          random_state=None, warm_start=False, verbose=0, verbose_interval=10)
    gmm.fit(X)
    return gmm

# Train GMMs
gmms = groups.mapValues(trainGMM)  # still just a transformation

##
# Collect and Serialize GMMs
##

# the trained models are small, so we can collect to a single machine
collected_gmms = gmms.collect()

# pickle models for restoring later
outRoot = "local/output/dir/"

for tup in collected_gmms:
    id = tup[0]
    gmm = tup[1]
    with open("%s/%s_gmm.pkl" % (outRoot, id), 'w') as fout:
        pickle.dump(gmm, fout)
ответил andrew 25 thEurope/Moscowp30Europe/Moscow09bEurope/MoscowTue, 25 Sep 2018 19:54:16 +0300 2018, 19:54:16

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132