Недостающие строки при записи файла с многопроцессорной блокировкой Python

Это мой код:

from multiprocessing import Pool, Lock
from datetime import datetime as dt

console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()

def writer(message):
    lock.acquire()
    with open(console_out, 'a') as out:
        out.write(message)
        out.flush()
    lock.release()

def conf_wrapper(state):
    import ProcessingModule as procs
    import sqlalchemy as sal

    stcd, nrows = state
    engine = sal.create_engine('postgresql://foo:[email protected]:5432/schema')

    writer("State {s} started  at: {n}"
           "\n".format(s=str(stcd).zfill(2), n=dt.now()))

    with engine.connect() as conn, conn.begin():
        procs.processor(conn, stcd, nrows, chunksize)

    writer("\tState {s} finished  at: {n}"
           "\n".format(s=str(stcd).zfill(2), n=dt.now()))

def main():
    nprocesses = 12
    maxproc = 1
    state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]

    with open(console_out, 'w') as out:
        out.write("Starting at {n}\n".format(n=dt.now()))
        out.write("Using {p} processes..."
                  "\n".format(p=nprocesses))

    with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
        pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)

    with open(console_out, 'a') as out:
        out.write("\nAll done at {n}".format(n=dt.now()))

Файлconsole_out никогда не имеет в себе все 7 состояний. Он всегда пропускает одно или несколько состояний. Вот вывод из последнего запуска:

Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started  at: 2016-07-27 21:47:01.482322
State 02 started  at: 2016-07-27 21:47:01.497947
State 11 started  at: 2016-07-27 21:47:01.529198
State 10 started  at: 2016-07-27 21:47:01.497947
    State 11 finished  at: 2016-07-27 21:47:15.701207
    State 15 finished  at: 2016-07-27 21:47:24.123164
    State 44 finished  at: 2016-07-27 21:47:32.029489
    State 50 finished  at: 2016-07-27 21:47:51.203107
    State 10 finished  at: 2016-07-27 21:47:53.046876
    State 33 finished  at: 2016-07-27 21:47:58.156301
    State 02 finished  at: 2016-07-27 21:48:18.856979

All done at 2016-07-27 21:48:18.992277

Зачем?

Обратите внимание, что ОС - это Windows Server 2012 R2.

 Kartik28 июл. 2016 г., 06:48
Окна. К несчастью.
 Tim Peters28 июл. 2016 г., 06:23
Какая операционная система? Если вы работаете в Windows, код нуждается в изменениях дляlock работать как задумано.

Ответы на вопрос(1)

Решение Вопроса

Так как вы работаете в Windows,ничего такого наследуется рабочими процессами. Каждый процесс запускает всю основную программу «с нуля».

В частности, с написанным кодом каждый процесс имеет свой собственный экземплярlockи эти случаи не имеют ничего общего друг с другом. Короче,lock не обеспечивает никакого взаимного исключения между процессами вообще.

Чтобы это исправить,Pool конструктор может быть изменен для вызова функции инициализации для каждого процесса, которой вы передаете экземплярLock(), Например, вот так:

def init(L):
    global lock
    lock = L

а затем добавить эти аргументы вPool() конструктор:

initializer=init, initargs=(Lock(),),

И вам больше не нужно:

lock = Lock()

линия.

Тогда межпроцессное взаимное исключение будет работать как задумано.

БЕЗ ЗАМКА

Если вы хотите делегировать весь вывод процессу записи, вы можете пропустить блокировку и использовать вместо этого очередь для подачи в этот процесс [и посмотрите позже для другой версии].

def writer_process(q):
    with open(console_out, 'w') as out:
        while True:
            message = q.get()
            if message is None:
                break
            out.write(message)
            out.flush() # can't guess whether you really want this

и изменитьwriter() чтобы просто:

def writer(message):
    q.put(message)

Вы бы снова должны использоватьinitializer= а такжеinitargs= вPool конструктор, так что все процессы используюттак же очередь.

Должен запускаться только один процессwriter_process()и это может быть начато само по себе в качестве примераmultiprocessing.Process.

Наконец, чтобыwriter_process() знаю, что пора бросать, когда этоявляется время для этого, чтобы истощить очередь и вернуться просто запустить

q.put(None)

в основном процессе.

ПОТОМ

Вместо этого OP остановился на этой версии, потому что им нужно было одновременно открыть выходной файл в другом коде:

def writer_process(q):
    while True:
        message = q.get()
        if message == 'done':
            break
        else:
            with open(console_out, 'a') as out:
                out.write(message)

Я не знаю, почему терминатор был изменен на"done", Любая уникальная ценность работает для этого;None традиционный

 Tim Peters28 июл. 2016 г., 07:28
Ничего такого используется всеми процессами в Windows по волшебству.init() функцияназывается в каждом процессе, и последний связывает глобальное имя процессаlock (из-заglobal lock заявление) в каждом процессе к одному экземпляруLock() созданный и прошедший мимо основного процесса.
 Kartik29 июл. 2016 г., 22:05
Итакelse в моей версии не требуется. Спасибо, что терпеливо научил меня всему этому.
 Kartik28 июл. 2016 г., 07:36
Ой! Я понял Если он не требует слишком много, можете ли вы написать ответ, который использует mp.Manager и Queue вместо примитивов, таких как Lock? Это был бы мой предпочтительный подход, но когда я попробовал это, это не сработало вообще. Поэтому я переключился на использование блокировки и случайно задал этот вопрос.
 Tim Peters28 июл. 2016 г., 08:19
См. Редактирование: дал полу-подробный набросок того, как вывод может обрабатываться очередью. Но до сих пор не знаю, что вы хотели быManager за.
 Kartik28 июл. 2016 г., 07:25
Что тогда происходит, так этоPool создаетlock в глобальном пространстве имен, которое совместно используется всеми рабочими процессами, правильно?
 Kartik28 июл. 2016 г., 20:30
Потрясающие! Спасибо за разъяснение и помощь в этом. Искренне ценю.
 Kartik29 июл. 2016 г., 21:12
'done' пришел, потому что это было скопировано из другого места. Я предполагал, чтоq.get() в пустой очереди может вернутьсяNone.
 Kartik11 авг. 2016 г., 08:33
Еще раз спасибо, Тим, вы не представляете, насколько это было полезно для меня. Если бы я мог, я бы сказал ваш ответ еще как минимум 10 раз.
 Tim Peters29 июл. 2016 г., 22:13
Хорошо, я несчитать это нужно, но не могу догадаться, иногда ли вы проходитеNone по какой-то неизвестной (для меня) причине. Я отредактирую снова, чтобы убрать эту часть.
 Tim Peters29 июл. 2016 г., 21:18
Нет,q.get() когда в очереди пустые блоки, пока что-то не добавлено в очередь. Логика блокировки слепакакие добавлен в очередь - он только ждетчто-то быть добавленным.None, пустая строка, список с миллиардом элементов ... не важно, что.
 Kartik28 июл. 2016 г., 10:33
И я также читал, чтоManager помогает с межпроцессным взаимодействием в Windows. Которого я думал, не хватало, потому что мойQueue подход не работал вообще.
 Kartik29 июл. 2016 г., 22:34
Это не нужно. Я пробовал безelse и это работает великолепно. Я не прохожуNoneЯ просто передаю два сообщения в моем вопросе: время начала и окончания каждого состояния. Это для мониторинга прогресса. Как я уже сказал, GDAL выводит на консоль весь груз дерьма, и мне нужен был другой способ отделить операторы выполнения от этого. (Вот связанный вопрос, который я задал:stackoverflow.com/questions/38601836/...)
 Kartik28 июл. 2016 г., 10:30
Спасибо. Я лично чувствую, что использование очереди более элегантно, чем установка блокировок. Это просто идея использования отдельного процесса для обработки выходных данных, а подача этого процесса выглядит более Pythonic. Моя главная цель - иметь файл, в котором регистрируется ход выполнения кода. Было время, когда я просто печатал выходные данные на экране, но затем я начал использовать GDAL, который выводит на экран всю массу сообщений, запутывающих маркеры прогресса. Отсюда и запись в файл. (PS. Я использовалout.flush() потому что я предположил, что файл не был сброшен, что привело к отсутствующим строкам.)
 Tim Peters28 июл. 2016 г., 07:39
Если на ваш текущий вопрос дан ответ, вы должны принять ответ и открыть новые вопросы для новых вопросов. Что должно сказать больше о том, что вы пытаетесь достичь, потому что я понятия не имею, что бы вы использовалиManager или жеQueue для вэтот вопрос: единственное, что требовалось в этом вопросе, это взаимное исключение между процессами, и это именно то, чтоmp.Lock() для.
 Tim Peters28 июл. 2016 г., 20:28
Я бы лично использовалmp.Queue в соответствии с показанными линиями, но запустите его в потоке основного процесса (нет необходимости создавать новый процесс для него);t = threading.Thread(target=writer_process, args=(q,)); t.start() а затем, когда программа заканчиваетсяwriter(None); t.join(). mp.Manager средства могут быть удобными, но их использование влечет за собой большие накладные расходы на межпроцессное взаимодействие, так что в конце концов я редко находил их «стоящими».

Ваш ответ на вопрос