Эквивалент asyncio.Queues с рабочими «потоками»
Я пытаюсь выяснить, как портировать многопоточные программы для использованияasyncio
, У меня много кода, который синхронизируется вокруг нескольких стандартных библиотекQueues
в основном так:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
Один поток создает значения (возможно, пользовательский ввод), а другой поток что-то делает с ними. Дело в том, что эти потоки бездействуют, пока не появятся новые данные, и в этот момент они просыпаются и что-то с ними делают.
Я пытаюсь реализовать этот шаблон с помощью Asyncio, но я не могу понять, как заставить его «идти».
Мои попытки выглядят примерно так (и ничего не делают).
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()?
loop = asyncio.get_event_loop()
loop.run_forever()
Я пробовал разные варианты использования сопрограмм, а не их использования, упаковки объектов в задачи, попытки заставить их создавать или возвращать фьючерсы и т. Д.
Я начинаю думать, что у меня неправильное представление о том, как я должен использовать asyncio (возможно, этот шаблон должен быть реализован другим способом, о котором я не знаю). Любые указатели будут оценены.