Делать запросы на 1 миллион с aiohttp / asyncio - буквально

Я продолжил этот урок:https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html и все работает нормально, когда я делаю как 50 000 запросов. Но мне нужно сделать 1 миллион вызовов API, а затем у меня проблема с этим кодом:

    url = "http://some_url.com/?id={}"
    tasks = set()

    sem = asyncio.Semaphore(MAX_SIM_CONNS)
    for i in range(1, LAST_ID + 1):
        task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
        tasks.add(task)

    responses = asyncio.gather(*tasks)
    return await responses

Поскольку Python должен создавать задачи на 1 миллион, он в основном просто отстает, а затем печатаетKilled сообщение в терминале. Есть ли способ использовать генератор из предварительно созданного набора (или списка) URL-адресов? Благодарю.

 Peter Jung08 авг. 2016 г., 16:12
Разве это не семафор для этого? Даже если я установлю семафор на 10, я получу задержку и сообщение "Killed"
 roman26 сент. 2016 г., 22:39
Я попробовал пример по ссылке, и он работает. Как сказал автор, около 11-12 минут на 1000000 запросов. Я даже настроил его для работы с python3.4. Работает здесь Возможно, что-то не так с вашим кодом в другом месте. Вы можете опубликовать весь код?
 amirouche18 июл. 2017 г., 19:49
Вы можете опубликоватьdmesg вывод команды после того, как программа была убита, и / или полная ошибка, полученная интерпретатором Python. Это должно быть MemoryError.

Ответы на вопрос(2)

оздавать больше задач, которые может держать память. Я предполагаю, что вы достигли предела памяти. Проверьте dmesg для получения дополнительной информации.

1 миллион RPS не означает, что есть 1M задач. Задача может выполнить несколько запросов в одну секунду.

Запланируйте все 1 миллион задач одновременно

о котором вы говорите. Требуется до 3 ГБ оперативной памяти, поэтому вполне возможно, что она будет прервана операционной системой, если у вас мало свободной памяти.

import asyncio
from aiohttp import ClientSession

MAX_SIM_CONNS = 50
LAST_ID = 10**6

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def bound_fetch(sem, url, session):
    async with sem:
        await fetch(url, session)

async def fetch_all():
    url = "http://localhost:8080/?id={}"
    tasks = set()
    async with ClientSession() as session:
        sem = asyncio.Semaphore(MAX_SIM_CONNS)
        for i in range(1, LAST_ID + 1):
            task = asyncio.create_task(bound_fetch(sem, url.format(i), session))
            tasks.add(task)
        return await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(fetch_all())
Используйте очередь, чтобы оптимизировать работу

Это мое предложение, как использоватьasyncio.Queue передавать URL-адреса рабочим задачам. Очередь заполняется по мере необходимости, предварительно не составлен список URL-адресов.

Требуется всего 30 МБ ОЗУ :)

import asyncio
from aiohttp import ClientSession

MAX_SIM_CONNS = 50
LAST_ID = 10**6

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def fetch_worker(url_queue):
    async with ClientSession() as session:
        while True:
            url = await url_queue.get()
            try:
                if url is None:
                    # all work is done
                    return
                response = await fetch(url, session)
                # ...do something with the response
            finally:
                url_queue.task_done()
                # calling task_done() is necessary for the url_queue.join() to work correctly

async def fetch_all():
    url = "http://localhost:8080/?id={}"
    url_queue = asyncio.Queue(maxsize=100)
    worker_tasks = []
    for i in range(MAX_SIM_CONNS):
        wt = asyncio.create_task(fetch_worker(url_queue))
        worker_tasks.append(wt)
    for i in range(1, LAST_ID + 1):
        await url_queue.put(url.format(i))
    for i in range(MAX_SIM_CONNS):
        # tell the workers that the work is done
        await url_queue.put(None)
    await url_queue.join()
    await asyncio.gather(*worker_tasks)

if __name__ == '__main__':
    asyncio.run(fetch_all())

Ваш ответ на вопрос