asyncio на прикладі

12 хв. читання

Всі ми вже чули про асинхронність, про asyncio, про async/await. Ще більше пітонерів чули про потоки та модуль threading. Сьогодні я б хотів порівняти їх на прикладі скрипту, що буде перевіряти CMS сайту.

Тут я не буду описувати, що таке асинхронність, конкурентність і все таке. Тут суто практика. А для теорії в нас є чудова стаття. Я дуже рекомендую прочитати спочатку її, сьогоднішня стаття скоріше логічне продовження вищевказаної.

Принцип роботи

Наш скрипт буде приймати один головний аргумент — файл, що місить сайти, які слід перевірити. Він завантажує його, потім проходиться по цьому списку, виконує HTTP-запит головної сторінки и перевіряє чи немає деяких специфічних мета-тегів, шляхів чи посилань на сторінці.

Як бачите, наш скрипт майже не буде використовувати обчислювальні можливості, але натомість буде навантажувати мережу.

Однопоточний варіант

Давайте почнемо з найпростішого, без всяких потоків і асинхронності.

import requests
import argparse
import time

def prepare_url(url):
    if not "http://" in url:
        return ("http://" + url).strip()
    return url

def check_site(url):
    r = requests.get(url)
    if not r.ok:
        return "other"

    if "/wp-content/" in r.text or "/wp-includes/" in r.text:
        return "wp"
    elif ('<meta content="Joomla! - Open Source Content Management" name="generator">' in r.text 
          or 'Joomla = {}' in r.text or 'href="/templates/' in r.text):
        return "joomla"
    elif "misc\\/drupal.js" in r.text or "/misc/drupal.js" in r.text:
        return "drupal"
    else:
        return "other"

start = time.time()

parser = argparse.ArgumentParser(description='Check the site CMS')
parser.add_argument("input_file")

args = parser.parse_args()

wp_sites = []
joomla_sites = []
drupal_sites = []
other_sites = []

results = {"wp": wp_sites,
           "joomla": joomla_sites,
           "drupal": drupal_sites,
           "other": other_sites}

with open(args.input_file) as f:
    sites = f.readlines()

for site in sites:
    try:
        res = check_site(prepare_url(site))
    except Exception:
        res = "other"
    results[res].append(site)
    print(res, site.strip())

print("RECORDS: {}\
TIME: {}".format(len(sites), (time.time() - start)))

Не лякайтеся такого об'єму, тут все просто. Ми отримуємо назву файлу як аргумент, читаємо його і перевіряємо кожен сайт: за допомогою модуля requests робимо запит і перевіряємо чи є якісь специфічні для певної CMS дані. Мій алгоритм дуже неточний, але для прикладу саме те. І по результатах ми заносимо сайт до потрібного списку. На моєму ПК (з доволі слабким інтернетом) ось такі показники:

python3.5 onethread.py db.txt
...
RECORDS: 100
TIME: 107.26902484893799

Дуже повільно. Давайте трошки прокачаємо наш скрипт.

Багатопоточний варіант

Трошки змінимо нашу утиліту для роботи з декількома потоками:

import requests
import argparse
import time
import threading

def prepare_url(url):
    if not "http://" in url:
        return ("http://" + url).strip()
    return url

def check_site(url):
    r = requests.get(url)
    if not r.ok:
        return "other"

    if "/wp-content/" in r.text or "/wp-includes/" in r.text:
        return "wp"
    elif ('<meta content="Joomla! - Open Source Content Management" name="generator">' in r.text 
          or 'Joomla = {}' in r.text or 'href="/templates/' in r.text):
        return "joomla"
    elif "misc\\/drupal.js" in r.text or "/misc/drupal.js" in r.text:
        return "drupal"
    else:
        return "other"

def main():
    while sites:
        site = sites.pop()
        try:
            res = check_site(prepare_url(site))
        except Exception:
            res = "other"
        results[res].append(site)
        print(res, site.strip())

start = time.time()

parser = argparse.ArgumentParser(description='Check the site CMS')
parser.add_argument("input_file")
parser.add_argument("threads_count")

args = parser.parse_args()


wp_sites = []
joomla_sites = []
drupal_sites = []
other_sites = []

results = {"wp": wp_sites,
           "joomla": joomla_sites,
           "drupal": drupal_sites,
           "other": other_sites}

with open(args.input_file) as f:
    sites = f.readlines()

records = len(sites)

for i in range(int(args.threads_count)):
    t = threading.Thread(target=main)
    t.start()

while threading.active_count() != 1: # Чекаємо поки лишиться лише головний поток
    time.sleep(0.1)
    
print("RECORDS: {}\
TIME: {}".format(records, (time.time() - start)))

Тут ми лише трошки модифікували логіку утиліти щоб адаптувати її до використання з декількома потоками, кількість яких можна вказувати аргументом: ми винесли основу логіку скрипта в окрему функцію щоб мати можливість запустити її декілька разів. Але результати все ще не дуже, хоча й краще попередніх:

➜ python3.5 manythreads.py db.txt 30
...
RECORDS: 400
TIME: 48.35842537879944

Ось тут в гру вступає асинхронне виконання коду.

Асинхронний варіант

Хоча й асинхронна варіація нашого скрипту буде по структурі схожою з багатопоточним варіантом, але треба внести багато змін. Перш за все, для запитів ми будемо використовувати іншу бібліотеку: aiohttp. Вона дозволяє як виконувати запити, так і будувати веб-додатки. По-друге, нам потрібно перетворити наші функції (не всі) в співпрограми, щоб отримати можливість використовувати переваги асинхронного підходу.

Спочатку встановимо потрібну бібліотеку:

sudo pip install aiohttp

Я використовую Python 3.5, що і вам рекомендую, якщо ви хочете писати асинхронний код зручно. А з виходом Python 3.6 (грудень 2016) з'являться асинхронні функції-генератори (що таке функції-генератори) та асинхронні генератори списків (їх іноді не вистачає в 3.5, що змушує використовувати хаки). В Python 3.5 додали нативні співпрограми та конструкції async/await, які я буду використовувати.

import argparse
import time
import asyncio
import aiohttp
import async_timeout

def prepare_url(url):
    if not "http://" in url:
        return ("http://" + url).strip()
    return url

async def check_site(url):
    try:
        session = aiohttp.ClientSession(loop=loop)
        with async_timeout.timeout(5):
            async with session.get(url) as response:
                text = await response.text()
    except Exception:
        return "other"
    finally:
        session.close()

    if "/wp-content/" in text or "/wp-includes/" in text:
        return "wp"
    elif ('<meta content="Joomla! - Open Source Content Management" name="generator">' in text 
          or 'Joomla = {}' in text or 'href="/templates/' in text):
        return "joomla"
    elif "misc\\/drupal.js" in text or "/misc/drupal.js" in text:
        return "drupal"
    else:
        return "other"

async def main():
    while sites:
        site = sites.pop()
        try:
            res = await check_site(prepare_url(site))
        except Exception:
            res = "other"
        results[res].append(site)
        print(res, site.strip())



start = time.time()

parser = argparse.ArgumentParser(description='Check the site CMS')
parser.add_argument("input_file")
parser.add_argument("workers_count")

args = parser.parse_args()


wp_sites = []
joomla_sites = []
drupal_sites = []
other_sites = []

results = {"wp": wp_sites,
           "joomla": joomla_sites,
           "drupal": drupal_sites,
           "other": other_sites}

with open(args.input_file) as f:
    sites = f.readlines()

records = len(sites)

loop = asyncio.get_event_loop()

futures = []
for i in range(int(args.workers_count)):
    futures.append(asyncio.ensure_future(main(), loop=loop))


loop.run_until_complete(asyncio.wait(futures))
loop.stop()
    
print("RECORDS: {}\
TIME: {}".format(records, (time.time() - start)))

Як ви помітили, тепер функції main та check_site стали співпрограмами (оголошені за допомогою async def), а в місцях, при виконанні яких інтерпретатору дозволено перемкнутися на іншу співпрограму ми використовуємо await. Це може бути інша співпрограма, future, та будь який інший awaitable-обєкт. Потім ми створюємо наш цикл подій та запускаємо потрібну кількість співпрограм за допомогою функції asyncio.ensure_future. Вона приймає параметром співпрограму та запускає її в циклі подій, а нам повертає об'єкт future, що буде зберігати в собі результат виконання цієї функції. Ми зберігаємо в список, щоб потім використати в функції asyncio.wait. Ця співпрограма приймає список future і чекає поки вони не отримають результат. Через те, що співпрограми можна викликати тільки з інших співпрограм або з циклу подій, ми змушені використовувати loop.run_until_complete, ця функція приймає співпрограму і блокує подальше виконання коду поки вона не виконається. Після виконання цикл потрібно зупинити.

Якщо хочете детальніше дізнатися про те, як працювати з asyncio, в нас є стаття про це.

Хоча aiohttp і називає свій API схожим на той, що в requests, але вони все ж відрізняються. Спочатку ми створюємо сесію для виконання запитів, тут це обов'язково. Потім ми виконуємо сам запит, зберігаючи код сторінки в змінну і оброблюємо його як і в минулих прикладах.

Я не рекомендую ставити велику кількість воркерів, так як це спричинить багато майже одночасних запитів, що призведе до того, що багато з них просто впадуть з таймаутом. Для себе я знайшов оптимальне число, це 25-30.

Запустивши цей приклад, я отримав такі результати:

➜ python3.5 async.py db.txt 30 
...
RECORDS: 400
TIME: 35.65217161178589

Так, різниця не дуже велика. Але при збільшенні кількості даних вона буде зростати. Так, при ста сайтах, скрипти виконуются майже однаковий час, при чотирьохсот вже є 13 секунд різниці (~28%).

Висновок

Як бачите, асинхронний варіант нашої утиліти виявився найшвидшим, але це не означає, що asyncio - панацея від всіх проблем з потужністю. Вона показала такі добрі результати, тому що задача була підібрана правильно. В вирахуванні числа пі, наприклад, вона не допоможе, тут потрібна multiprocessing. Зараз головним ареалом використання asyncio є веб: як на стороні клієнта, так і на стороні сервера. А от з файлами asyncio працювати не вміє, хоча це теж I/O-операція.

Невеличкий бонус

asyncio дозволяє використовувати власні цикли подій. І одним з таких, що заслуговує уваги, є uvloop. Це цикл подій, що використовується в Node.js, але портований на Python. Тести показують в середньому приріст потужності в 2-2.2 рази в порівнянні з стандартним циклом asyncio. Встановити його можна так:

sudo pip install uvloop

В код потрібно додати лише декілька рядків коду, щоб все запрацювало на uvloop:

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

Тепер виклик asyncio.get_event_loop() буде повертати цикл uvloop. Але без мінусів не обійшлося. Наскільки я знаю, Windows в даний момент не підтримується, тільки Unix-like. Якщо вже щось змінилося, поправте будь ласка. Звісно, він допоможе не у всіх проектах, але про нього слід знати.

Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Codeguida 5.6K
Приєднався: 8 місяців тому
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація