Очікуємо результат асинхронних операцій в Python

2 хв. читання

Одне з основних призначень asyncio в Python — виконання декількох корутин асинхронно. А чи добре ви знаєте про способи очікування результату таких операцій? Поговоримо про них детальніше в цій статті.

Є чимало методів отримати результат асинхронної операції в Python. Всі вони досить різні й розраховані на певні ситуацій. Було б корисно мати перед очима шпаргалку з особливостями кожного способу, аби застосовувати їх правильно.

Перш ніж перейдемо до огляду способів, трохи роз'яснень та термінів:

  • корутина (coroutine) — асинхронна функція, що виконується. Тобто, якщо ви визначите функцію async def f(): ... та викличете її як f() — ви отримаєте корутину;
  • awaitable — все, що підтримує await, тобто корутини, asyncio.Futures , asyncio.Tasks , об'єкти з методом __await__;
  • В прикладах будуть використовуватись дві асинхронні функції — f та g. Немає значення, що саме вони роблять, для нас важливо лише те, що вони асинхронні, і що зрештою вони припинять своє виконання.

await

Найпростіший спосіб дочекатись результату корутин:

result_f = await f()
result_g = await g()

Варто пам'ятати:

  1. У такий спосіб корутини не виконуються асинхронно. g виконується після того, як виконалась f.
  2. Ви не можете скасувати операцію, якщо використовуєте await.

Ви можете запропонувати подібний підхід для розв'язання першої проблеми:

coro_f = f()
coro_g = g()

result_f = await coro_f
result_g = await coro_g

Тут функція g/coro_g не починає виконуватись, доки ви не використали await. Тобто цей код аналогічний першому. Щоб уникнути проблем вище, необхідно огорнути корутини у завдання (tasks).

Завдання (Tasks)

asyncio.Tasks огортає ваші корутини, аби їх виконання могло незалежно плануватись циклом подій, коли ви передаєте йому управління (зазвичай за допомогою await). Створити завдання можна за допомогою asyncio.create_task():

task_f = asyncio.create_task(f())
task_g = asyncio.create_task(g())

await asyncio.sleep(0.1) # <- f() та g() вже виконуються!
result_f = await task_f
result_g = await task_g

Тепер ваші завдання виконуються в конкурентному режимі, ви можете скасувати будь-яке з них за допомогою функції cancel(). Зверніть увагу, що обидва завдання необхідно створити до того, як ви вирішите викликати await вперше, інакше ви не отримаєте результату. Мета await — зібрати результати та очистити ресурси (asyncio видасть застереження, якщо цього не зробите).

В реальних проєктах очікувати кожне завдання не дуже практично, адже часто ви навіть не знаєте, скільки викликів await знадобиться. Тож нам треба зібрати декілька результатів від awaitables в одному місці.

asyncio.gather()

asyncio.gather() приймає 1 або більше awaitables як *args, огортає їх в завдання, якщо це потрібно, і очікує їх завершення. Так ми отримуємо результат всіх awaitables у тому ж порядку, в якому вони були передані:

result_f, result_g = await asyncio.gather(f(), g())

Якщо виняток виникає в g() або f(), gather() миттєво поверне його, однак на інші завдання це не вплине. Якщо ж скасувати gather(), всі його awaitables, які ще не завершили своє виконання, також будуть скасовані.

Ви можете передати параметр return_exceptions=True, тоді винятки будуть повертатись як звичайні результати, а вам доведеться власноруч перевіряти, які саме результати успішні (наприклад, за допомогою isinstance(result, BaseException)).

Підсумуємо:

  • asyncio.gather() приймає один чи більше аргументів як *args.
  • Огортає кожен awaitable у завдання, якщо це необхідно.
  • Повертає перелік результатів у тому ж порядку, в якому їх було передано.
  • Дозволяє повертати помилки як звичайні результати (якщо передасте return_exceptions=True).
  • Якщо ви не передали параметр return_exceptions, то gather() одразу повідомить про винятки у будь-якому з awaitables, однак інші завдання продовжать виконуватись.
  • Якщо скасувати сам gather(), скасуються і всі завдання, які він виконував.

Тепер ми дізнались, як дочекатись результату декількох awaitables за раз. Однак добре збалансовані розподілені системи не можуть працювати без затримок. Тут gather() вже не допоможе, нам потрібне інше рішення.

asyncio.wait_for()

asyncio.wait_for() приймає два аргументи: один awaitable та затримку в секундах. Якщо awaitable — це корутина, вона автоматично огортається в завдання. Тому ось така конструкція доволі поширена:

try:
    result_f, result_g = await asyncio.wait_for(
        asyncio.gather(f(), g()),
        timeout=5.0.
    )
except asyncio.TimeoutError:
    print("oops took longer than 5s!")

Щойно закінчується затримка, внутрішнє завдання скасовується. Всі завдання в gather() також скасовуються (в нашому прикладі f() та g()).

Зверніть увагу: якщо ви просто заміните create_task() на wait_for(), це не спрацює. create_task() — звичайна функція, яка повертає завдання. wait_for() — асинхронна функція, яка повертає корутину. Тобто вона не виконається, поки ви не викличете await:

# Не конкурентний режим
cf = asyncio.wait_for(f(), timeout=0.5)
cg = asyncio.wait_for(g(), timeout=0.5)

# cf та cg тут — це корутини, а не завдання!
# На цьому етапі цикл подій нічого не планує

await cf  # g() ще не виконується!
await cg  # тільки тут wait_for створює завдання для g()

Якщо тепер ви думаєте, що wait_for() був би не потрібен, якби gather() підтримував затримку, то ви такі не одні.

Підсумуємо:

  • asyncio.wait_for() приймає один awaitable.
  • Огортає awaitable в завдання, якщо потрібно.
  • Приймає затримку, після закінчення якої завдання скасовується.
  • На відміну від create_task(), це корутина, тому її виконання не починається, доки ви не викликали await.

Відволічемось на async-timeout

Більш елегантний підхід до затримок — пакет async-timeout на PyPI. З менеджером асинхронного контексту ви можете використати загальну затримку, навіть якщо корутини треба виконати послідовно:

async with async_timeout.timeout(5.0):
    await f()
    await g()

Іноді не хочеться чекати виконання всіх awaitables. Краще, аби була можливість працювати з ними одразу, як їх виконання припиняється, аби хутчіш повідомляти користувачу результат.

asyncio.as_completed()

asyncio.as_completed() приймає ітерований об'єкт (наприклад, список, кортеж, сет) з awaitables. Функція повертає ітератор, який генерує asyncio.Futures в порядку завершення виконання awaitables:

for fut in asyncio.as_completed([task_f, task_g], timeout=5.0):
    try:
        await fut
        print("one task down!")
    except Exception:
        print("ouch")

При такому підході немає можливості з'ясувати, яку саме awaitable чекати (насправді способи існують, однак вони достатньо потворні й використовують незадокументовані інструменти).

Підсумуємо:

  • asyncio.as_completed() приймає багато awaitables та формує з них iterable.
  • Повертає Futures, результат яких ви отримаєте з await одразу, як щось завершить своє виконання.
  • Не гарантується повернення awaitables у початковому стані.
  • Огортає awaitables у завдання (під капотом викликається asyncio.ensure_future().
  • Приймає необов'язковий параметр затримки.

Ви вже маєте інструмент для багатьох асинхронних задач, однак все ще не вистачає контролю над процесом очікування результату.

asyncio.wait()

asyncio.wait() — найбільш громіздка й водночас потужна функція з перелічених в матеріалі. Вона трохи нагадує старий-добрий системний виклик select().

Подібно до as_completed(), функція приймає awaitables і формує ітерований об'єкт. asyncio.wait() повертає два сети: awaitables, які завершили виконання, і ті, що ще в очікуванні. Уся відповідальність за отримання та перевірку результату на вас. Оскільки частина повернених awaitables гарантовано будуть виконані, ви можете перевірити їх результат за допомогою Task.result() чи Task.exception(). Але, на думку автора, await більш характерний спосіб для такого випадку.

done, pending = await asyncio.wait([task_f, task_g])

for t in done:
    try:
        if t is task_f:
            print(f"The result of f() is { await task_f }.")
    except Exception as e:
        print(f"f() failed with { repr(e) }.")

# ...те ж саме для g()

Наш код не працюватиме, якщо ми передамо корутину та функцію wait(), огорнуті в завдання. Усе тому, що повернений awaitable відрізнятиметься від переданого і перевірка на ідентичність завжди провалюватиметься. (Це стосується також функції wait_for(), яка повертає корутину. Обидві функції не змішуватимуться, поки ви не огорнете виклик wait_for() у завдання.) Зараз, за таких умов, wait() все одно продовжить своє виконання, але попередить вас про можливі баги.

Як awaitable може залишатись в очікуванні, якщо wait() вже завершив виконання? Існує два варіанти:

  1. Ви можете передати затримку, після якої wait() припинить виконання. Але на відміну від gather(), з awaitables нічого не відбувається, коли затримка спливає. Функція просто завершує виконання та розподіляє завдання на виконані та ті, що ще в очікуванні.
  2. Ви можете зробити так, аби wait() не чекав виконання всіх awaitables, за допомогою аргументу return_when. Автоматично цей аргумент приймає значення asyncio.ALL_COMPLETED. Ви можете змінити значення на asyncio.FIRST_EXCEPTION, яке очікує завершення всіх awaitables, якщо лише якесь з них не спровокує виняток. А от з asyncio.FIRST_COMPLETED функція завершує виконання одразу, коли якийсь awaitables завершив виконання.

Всі особливості функції тримати в голові складно, однак з ними ви можете створити потужні рішення для асинхронних завдань.

Підсумуємо:

  • asyncio.wait() приймає багато awaitables і огортає в ітерований об'єкт.
  • Не поверне результат, але відсортує передані awaitables у два набори: ті, що виконались, і ті, що в очікуванні. Відповідальність за обробку результату лежить на вас.
  • Функція огортає awaitables в завдання, але попередить вас про те, що на виході ви отримаєте інші awaitables. Уникайте такого підходу і передавайте лише завдання.
  • З asyncio.wait() ви отримуєте контроль над моментом сортування завдань та їхнім поверненням, однак ви не зможете скасувати їх:
    • Передайте затримку, аби обмежити максимальний час очікування;
      • Передайте параметр return_when зі значенням:
  1. asyncio.ALL_COMPLETED: якщо хочете дочекатись завершення всіх awaitables.
  2. asyncio.FIRST_EXCEPTION: якщо хочете дочекатись завершення всіх awaitables, або якщо хочете припинити виконання при появі винятку.
  3. asyncio.FIRST_COMPLETED: припиняє виконання, якщо хоча б один awaitable завершився.

Наступні кроки

Якщо ви хочете заглибитись в матеріал про asyncio, автор рекомендує такі ресурси:

Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Codeguida 5.6K
Приєднався: 8 місяців тому
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація