Вычисление совместной вероятности n событий из выборочной последовательности вхождений

Я пишу алгоритм, чтобы взять примерный список последовательностей событий, вычислить 1-ступенчатые переходные вероятности из последовательностей, вперед или назад, а затем рассчитать общую вероятность n событий, происходящих из этих условных вероятностей.

Eg. если список примеров последовательностей:

  

['TTAGCAGTCAGT', 'TCAGCGTTCGACTGA']

cond_probs() создает словарь {'AT': n1, 'AG': n2, 'AA': n3, 'AC': n4, ... , 'TC': n16} , где для одного, n1 + n2 + n3 + n4 = 1, из этих последовательностей. В обратном случае переходные вероятности вычисляются с помощью последовательностей в обратном направлении.

В этом словаре exp_joint_probs() создает другой словарь, который при n = 3 выглядит как {'TTA': m1, 'TTC': m2, ..., 'GAC': m64}, где 'TTA' вычисляется по P('T') * P('TT') * P('TA'). В обратном случае это P('A') * P('AT') * P('TT'), но запись словаря должна быть 'TTA'.

Мне интересно, может ли мой алгоритм быть более чистым. Я чувствую, что, возможно, использование Numpy сделает его намного приятнее, поэтому слово от эксперта Numpy было бы здорово. Я также ищу помощь в двойной проверке математики.

from collections import Counter
from itertools import product

def exp_joint_probs(seqList, n, reverse = 0):
    probs = {}
    counts = Counter(''.join(seqList))
    states = counts.keys()
    sumv = sum(counts.values())
    initProbs = {c: counts[c] / float(sumv) for c in counts}
    condProbs = cond_probs(seqList, reverse)
    paths = product(counts.keys(), repeat = n)
    for p in paths:
        p = ''.join(p)
        if reverse == 0:
            probs[p] = initProbs[p[0]] 
            for i in range(n - 1):
                probs[p] = probs[p] * condProbs.get((p[i], p[i + 1]), 0)
        else:
            probs[p] = initProbs[p[2]]
            for i in range(n)[::-1][:-1]:
                probs[p] = probs[p] * condProbs.get((p[i], p[i - 1]), 0)
   return probs

def cond_probs(seqList, reverse = 0):
    counts = {}
    probs = {}
    for seq in seqList:
        if reverse:
            for i in range(len(seq))[::-1][:-1]:
                counts[seq[i],seq[i - 1]] = counts.get((seq[i], seq[i - 1]), 0) + 1
        else:
            for i in range(len(seq) - 1):
                counts[seq[i],seq[i + 1]] = counts.get((seq[i], seq[i + 1]), 0) + 1
    states = list(set(k[0] for k in counts.keys()))
    for s in states:
        if reverse:
            sCounts = dict((k,v) for (k,v) in counts.items() if k[1] == s)
        else:
            sCounts = dict((k,v) for (k,v) in counts.items() if k[0] == s)
        sumv = sum(sCounts.values())
        probs.update({(k,float(v)/sumv) for (k, v) in sCounts.items()})
    return probs
11 голосов | спросил Matt Hong 6 MaramThu, 06 Mar 2014 04:27:49 +04002014-03-06T04:27:49+04:0004 2014, 04:27:49

2 ответа


9

Я просто посмотрю на cond_probs здесь. Вы увидите, что в этой функции есть много возможностей для ответа.

1. Комментарии к вашему коду

  1. Строка return probs имеет неправильный отступ. Ошибка копирования /вставки?

  2. Нет докстрон. Что делают эти функции и как я их называю? (Текст из вашего вопроса будет хорошей отправной точкой для docstrings.)

  3. Нет тестовых примеров. Это тот код, который будет полезен из модульных тестов и /или doctests

  4. .
  5. Вы пишете: «cond_probs() производит словарь {'AT': n1, 'AG': n2, 'AA': n3, 'AC': n4, ... , 'TC': n16} ", но когда я запускаю его, словарь имеет кортежи для ключей, а не строки:

    >>> cond_probs(['AAC'])
    {('A', 'C'): 0.5, ('A', 'A'): 0.5}
    
  6. Я не понимаю поведение, когда reverse=True. Вы пишете: «В обратном случае переходные вероятности вычисляются с помощью последовательностей в обратном порядке», но это, похоже, не так:

    >>> cond_probs(['CAA'], reverse=True)
    {('A', 'A'): 1.0}
    

    Что случилось с C? Это выглядит неправильно.

    Если бы я писал это, я бы выполнил cond_probs для запуска только в прямом направлении; чтобы сделать обратное направление, я бы прошел в обратных последовательностях.

  7. Предпочитаете использовать True и False для Booleans, а не 1 и 0. Таким образом, второй параметр cond_probs должен быть reverse=False.

  8. Вместо:

    counts = {}
    # ...
    counts[X] = counts.get(X, 0) + 1
    

    используйте collections.Counter и напишите:

    counts = Counter()
    # ...
    counts[X] += 1
    

    (Но см. ниже, как использовать collections.Counter.update )

  9. Вместо:

    range(len(seq))[::-1][:-1]
    

    записи:

    range(len(seq) - 1, 0, -1)
    

    (Но см. ниже, как этого избежать.)

  10. Предпочитает итерацию над элементами последовательностей вместо итерации по их индексам. Поэтому вместо:

    for i in range(len(seq) - 1):
        counts[seq[i],seq[i + 1]] = counts.get((seq[i], seq[i + 1]), 0) + 1
    

    используйте zip :

    for t in zip(seq, seq[1:]):
        counts[t] += 1
    

    или, что еще проще, с помощью collections.Counter.update :

    counts.update(zip(seq, seq[1:]))
    

    (Если вас беспокоит использование памяти, вы можете использовать itertools.islice и напишите islice(seq, 1, None) вместо seq[1:], чтобы избежать копирования.)

  11. В этом коде:

    for s in states:
        sCounts = dict((k,v) for (k,v) in counts.items() if k[0] == s)
    

    вы перебираете все counts для каждого состояния, собирая нужные вам. Было бы более эффективно выполнять итерацию только по переходам из s. Ниже показано, как это будет выглядеть.

2. Пересмотренный код

def cond_probs_2(sequences):
    """Calculate 1-step transitional probabilities from sequences.
    Return dictionary mapping transitions to probabilities.

    >>> sequences = ['ACABC', 'ABCAB']
    >>> sorted(cond_probs_2(sequences).items())
    ... # doctest: +NORMALIZE_WHITESPACE
    [(('A', 'B'), 0.75),
     (('A', 'C'), 0.25),
     (('B', 'C'), 1.0),
     (('C', 'A'), 1.0)]

    """
    counts = Counter()
    entries = set()
    for seq in sequences:
        entries.update(seq)
        for a, b in zip(seq, islice(seq, 1, None)):
            counts[a, b] += 1
    probs = {}
    for a in entries:
        suma = float(sum(counts[a, b] for b in entries))
        if suma != 0:
            probs.update(((a, b), counts[a, b] / suma) for b in entries
                         if counts[a, b])
    return probs

Это примерно на 30% быстрее, чем ваш код:

>>> from timeit import timeit
>>> from random import choice
>>> data = ''.join(choice('ACGT') for _ in range(1000000))
>>> cond_probs([data]) == cond_probs_2([data])
True
>>> timeit(lambda:cond_probs([data]), number=1) # yours
1.4706195190083236
>>> timeit(lambda:cond_probs_2([data]), number=1) # mine
0.9721288200234994

3. Перезапись в NumPy

Вот как я буду заниматься этой задачей в NumPy. NumPy работает лучше всего с числовыми данными фиксированного размера, поэтому я кодировал бы входные данные как маленькие целые числа:

>>> import numpy as np
>>> data = 'TTAGCAGTCAGTTCAGCGTTCGACTGA'
>>> distinct = set(data)
>>> coding = {j:i for i, j in enumerate(distinct)}
>>> coding
{'A': 0, 'C': 1, 'T': 2, 'G': 3}
>>> coded_data = np.fromiter((coding[i] for i in data), dtype=np.uint8)
>>> coded_data
array([2, 2, 0, 3, 1, 0, 3, 2, 1, 0, 3, 2, 2, 1, 0, 3, 1, 3, 2, 2, 1, 3, 0,
       1, 2, 3, 0], dtype=uint8)

(Я выбрал тип np.uint8 здесь, потому что во входных строках есть только четыре разных символа. Возможно, вам нужно будет выбрать другой тип.)

Затем вы можете кодироватьсмежные пары чисел от 0-3 как одно число от 0-15, подсчитывают количество вхождений каждой пары, используя numpy.bincount и декодировать результат с помощью numpy.reshape :

>>> n = len(distinct)
>>> pairs = coded_data[:-1] + n * coded_data[1:]
>>> counts = np.bincount(pairs, minlength=n*n).reshape(n, n)
>>> counts
array([[0, 3, 1, 2],
       [1, 0, 3, 2],
       [0, 1, 3, 3],
       [4, 2, 1, 0]])

Этот массив представляет таблицу перехода:

          source 
       │ A  C  T  G
     ──┼───────────
     A │ 0, 3, 1, 2
dest C │ 1, 0, 3, 2
     T │ 0, 1, 3, 3
     G │ 4, 2, 1, 0

Затем вы можете вычислить вероятности перехода, разделив каждый элемент на сумму его столбца:

>>> counts / counts.sum(axis=0)
array([[ 0.        ,  0.5       ,  0.125     ,  0.28571429],
       [ 0.2       ,  0.        ,  0.375     ,  0.28571429],
       [ 0.        ,  0.16666667,  0.375     ,  0.42857143],
       [ 0.8       ,  0.33333333,  0.125     ,  0.        ]])

Вот как указано выше:

from itertools import chain
import numpy as np

def cond_probs_np(sequences):
    """Calculate 1-step transitional probabilities from sequences.
    Return dictionary mapping transitions to probabilities.

    >>> sequences = ['ACABC', 'ABCAB']
    >>> sorted(cond_probs_np(sequences).items())
    ... # doctest: +NORMALIZE_WHITESPACE
    [(('A', 'B'), 0.75),
     (('A', 'C'), 0.25),
     (('B', 'C'), 1.0),
     (('C', 'A'), 1.0)]

    """
    distinct = set(chain.from_iterable(sequences))
    n = len(distinct)
    assert(n < 256)             # so that they will fit in an np.uint8
    coding = {j:i for i, j in enumerate(distinct)}
    counts = np.zeros((n, n))
    for seq in sequences:
        coded_seq = np.fromiter((coding[i] for i in seq), dtype=np.uint8)
        pairs = coded_seq[:-1] + n * coded_seq[1:]
        counts += np.bincount(pairs, minlength=n*n).reshape(n, n)
    totals = counts.sum(axis=0)
    totals[totals == 0] = 1     # avoid division by zero
    probs = counts / totals
    return {(a, b): p for a, b in product(distinct, repeat=2) 
            for p in (probs[coding[b], coding[a]],) if p}

И это примерно в семь раз быстрее, чем исходный код:

>>> cond_probs([data]) == cond_probs_np([data])
True
>>> timeit(lambda:cond_probs_np([data]), number=1)
0.2206433709943667
ответил Gareth Rees 22 MaramSat, 22 Mar 2014 04:34:48 +04002014-03-22T04:34:48+04:0004 2014, 04:34:48
2

Подсчет в Python лучше всего использовать, используя collections.Counter

Проблема, которую вы описали, звучит как цепь Маркова, а вероятности лучше всего представить как Марковская цепочка переходных цепочек . Тогда вы могли бы получить совместные вероятности с использованием матричного умножения.

ответил 200_success 6 MaramThu, 06 Mar 2014 08:55:10 +04002014-03-06T08:55:10+04:0008 2014, 08:55:10

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132