Home Umela inteligencia Asynchrónne volania API LLM v Pythone: Komplexná príručka

Asynchrónne volania API LLM v Pythone: Komplexná príručka

by
mm

Ako vývojári a vedci v oblasti DTA sa často stretávame s tým, že potrebujeme komunikovať s týmito výkonnými modelmi prostredníctvom rozhraní API. S rastúcou zložitosťou a rozsahom našich aplikácií sa však potreba efektívnych a výkonných interakcií API stáva kľúčovou. Toto je miesto, kde žiari asynchrónne programovanie, ktoré nám umožňuje maximalizovať priepustnosť a minimalizovať latenciu pri práci s LLM API.

V tejto komplexnej príručke preskúmame svet asynchrónnych volaní LLM API v Pythone. Pokryjeme všetko od základov asynchrónneho programovania až po pokročilé techniky na zvládanie zložitých pracovných postupov. Na konci tohto článku budete dobre rozumieť tomu, ako využiť asynchrónne programovanie na prebitie vašich aplikácií poháňaných LLM.

Skôr než sa ponoríme do špecifík asynchrónnych volaní API LLM, položme pevný základ v konceptoch asynchrónneho programovania.

Asynchrónne programovanie umožňuje vykonávať viacero operácií súčasne bez blokovania hlavného vlákna vykonávania. V Pythone sa to primárne dosahuje prostredníctvom asyncio modul, ktorý poskytuje rámec na písanie súbežného kódu pomocou korutín, slučiek udalostí a budúcnosti.

Kľúčové pojmy:

  • Korutíny: Funkcie definované pomocou async def ktoré možno pozastaviť a obnoviť.
  • Slučka udalostí: Centrálny spúšťací mechanizmus, ktorý riadi a spúšťa asynchrónne úlohy.
  • Očakávané: Objekty, ktoré možno použiť s kľúčovým slovom wait (korutíny, úlohy, budúcnosť).

Tu je jednoduchý príklad na ilustráciu týchto konceptov:

import asyncio
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    print(f"Hello, {name}!")
async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
asyncio.run(main())

V tomto príklade definujeme asynchrónnu funkciu greet ktorý simuluje I/O operáciu s asyncio.sleep(). The main využitie funkcií asyncio.gather() spustiť viacero pozdravov súčasne. Napriek oneskoreniu spánku sa všetky tri pozdravy vytlačia približne po 1 sekunde, čo demonštruje silu asynchrónneho vykonávania.

Potreba async vo volaniach LLM API

Pri práci s LLM API sa často stretávame so scenármi, kedy potrebujeme uskutočniť viacero API volaní, buď postupne alebo paralelne. Tradičný synchrónny kód môže viesť k významným prekážkam výkonu, najmä pri operáciách s vysokou latenciou, ako sú sieťové požiadavky na služby LLM.

Zvážte scenár, v ktorom potrebujeme vygenerovať súhrny pre 100 rôznych článkov pomocou LLM API. Pri synchrónnom prístupe by sa každé volanie API zablokovalo, kým nedostane odpoveď, čo môže trvať niekoľko minút, kým sa všetky požiadavky dokončia. Asynchrónny prístup nám na druhej strane umožňuje iniciovať viacero volaní API súčasne, čím sa dramaticky skráti celkový čas vykonávania.

Nastavenie vášho prostredia

Ak chcete začať s asynchrónnymi volaniami LLM API, budete musieť nastaviť svoje prostredie Python s potrebnými knižnicami. Tu je to, čo budete potrebovať:

  • Python 3.7 alebo vyššie (pre natívnu podporu asyncio)
  • aiohttp: Asynchrónna knižnica HTTP klienta
  • openai: Úradník Klient OpenAI Python (ak používate modely GPT OpenAI)
  • langchain: Rámec na vytváranie aplikácií pomocou LLM (voliteľné, ale odporúčané pre zložité pracovné postupy)

Tieto závislosti môžete nainštalovať pomocou pip:

pip install aiohttp openai langchain
<div class="relative flex flex-col rounded-lg">

Základné volania ASync LLM API s asyncio a aiohttp

Začnime jednoduchým asynchrónnym volaním LLM API pomocou aiohttp. Ako príklad použijeme API GPT-3.5 OpenAI, ale koncepty platia aj pre iné LLM API.

import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(prompt, client):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=({"role": "user", "content": prompt})
    )
    return response.choices(0).message.content
async def main():
    prompts = (
        "Explain quantum computing in simple terms.",
        "Write a haiku about artificial intelligence.",
        "Describe the process of photosynthesis."
    )
    
    async with AsyncOpenAI() as client:
        tasks = (generate_text(prompt, client) for prompt in prompts)
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

V tomto príklade definujeme asynchrónnu funkciu generate_text ktorý zavolá OpenAI API pomocou klienta AsyncOpenAI. The main vytvára viacero úloh pre rôzne výzvy a použitia asyncio.gather() spustiť ich súbežne.

Tento prístup nám umožňuje odosielať viaceré požiadavky do LLM API súčasne, čím sa výrazne znižuje celkový čas potrebný na spracovanie všetkých výziev.

Pokročilé techniky: Dávkovanie a kontrola súbežnosti

Zatiaľ čo predchádzajúci príklad demonštruje základy asynchrónnych volaní LLM API, reálne aplikácie často vyžadujú sofistikovanejšie prístupy. Poďme preskúmať dve dôležité techniky: dávkovanie požiadaviek a riadenie súbežnosti.

Dávkové požiadavky: Pri spracovávaní veľkého počtu výziev je často efektívnejšie ich dávkovať do skupín, než posielať jednotlivé požiadavky pre každú výzvu. To znižuje réžiu viacerých volaní API a môže viesť k lepšiemu výkonu.

import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, client):
    responses = await asyncio.gather(*(
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=({"role": "user", "content": prompt})
        ) for prompt in batch
    ))
    return (response.choices(0).message.content for response in responses)
async def main():
    prompts = (f"Tell me a fact about number {i}" for i in range(100))
    batch_size = 10
    
    async with AsyncOpenAI() as client:
        results = ()
        for i in range(0, len(prompts), batch_size):
            batch = prompts(i:i+batch_size)
            batch_results = await process_batch(batch, client)
            results.extend(batch_results)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

Kontrola súbežnosti: Zatiaľ čo asynchrónne programovanie umožňuje súbežné vykonávanie, je dôležité kontrolovať úroveň súbežnosti, aby sa predišlo preťaženiu servera API alebo prekročeniu limitov rýchlosti. Na tento účel môžeme použiť asyncio.Semaphore.

import asyncio
from openai import AsyncOpenAI
async def generate_text(prompt, client, semaphore):
    async with semaphore:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=({"role": "user", "content": prompt})
        )
        return response.choices(0).message.content
async def main():
    prompts = (f"Tell me a fact about number {i}" for i in range(100))
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = (generate_text(prompt, client, semaphore) for prompt in prompts)
        results = await asyncio.gather(*tasks)
    
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

V tomto príklade používame semafor na obmedzenie počtu súbežných požiadaviek na 5, čím zaisťujeme, že nepreťažíme server API.

Spracovanie chýb a opakovanie v asynchrónnych hovoroch LLM

Pri práci s externými rozhraniami API je dôležité implementovať robustné spracovanie chýb a mechanizmy opakovania. Poďme vylepšiť náš kód tak, aby zvládal bežné chyby a implementujme exponenciálne stiahnutie pre opakovanie.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
    pass
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(prompt, client):
    try:
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=({"role": "user", "content": prompt})
        )
        return response.choices(0).message.content
    except Exception as e:
        print(f"Error occurred: {e}")
        raise APIError("Failed to generate text")
async def process_prompt(prompt, client, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(prompt, client)
            return prompt, result
        except APIError:
            return prompt, "Failed to generate response after multiple attempts."
async def main():
    prompts = (f"Tell me a fact about number {i}" for i in range(20))
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as client:
        tasks = (process_prompt(prompt, client, semaphore) for prompt in prompts)
        results = await asyncio.gather(*tasks)
    
    for prompt, result in results:
        print(f"Prompt: {prompt}\nResponse: {result}\n")
asyncio.run(main())

Táto vylepšená verzia obsahuje:

  • Zvyk APIError výnimka pre chyby súvisiace s API.
  • A generate_text_with_retry funkcia zdobená @retry z knižnice húževnatosti, ktorá implementuje exponenciálny ústup.
  • Spracovanie chýb v process_prompt funkcia zachytávania a hlásenia porúch.

Optimalizácia výkonu: Streamovanie odpovedí

Pri vytváraní dlhodobého obsahu môžu odozvy streamovania výrazne zlepšiť vnímaný výkon vašej aplikácie. Namiesto čakania na celú odpoveď môžete spracovať a zobraziť kúsky textu, keď budú k dispozícii.

import asyncio
from openai import AsyncOpenAI
async def stream_text(prompt, client):
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=({"role": "user", "content": prompt}),
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices(0).delta.content is not None:
            content = chunk.choices(0).delta.content
            full_response += content
            print(content, end='', flush=True)
    
    print("\n")
    return full_response
async def main():
    prompt = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as client:
        result = await stream_text(prompt, client)
    
    print(f"Full response:\n{result}")
asyncio.run(main())

Tento príklad ukazuje, ako streamovať odpoveď z rozhrania API, pričom každý blok sa vytlačí hneď, ako príde. Tento prístup je obzvlášť užitočný pre chatovacie aplikácie alebo akýkoľvek scenár, kde chcete používateľovi poskytnúť spätnú väzbu v reálnom čase.

Vytváranie asynchrónnych pracovných postupov s LangChain

Pre komplexnejšie aplikácie s podporou LLM je rámec LangChain poskytuje vysokoúrovňovú abstrakciu, ktorá zjednodušuje proces reťazenia viacerých LLM hovorov a integráciu ďalších nástrojov. Pozrime sa na príklad použitia LangChain s asynchrónnymi schopnosťami:

Tento príklad ukazuje, ako možno LangChain použiť na vytvorenie zložitejších pracovných postupov so streamovaním a asynchrónnym vykonávaním. The AsyncCallbackManager a StreamingStdOutCallbackHandler umožňujú streamovanie generovaného obsahu v reálnom čase.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager((StreamingStdOutCallbackHandler())))
    prompt = PromptTemplate(
        input_variables=("topic"),
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, prompt=prompt)
    return await chain.arun(topic=topic)
async def main():
    topics = ("a magical forest", "a futuristic city", "an underwater civilization")
    tasks = (generate_story(topic) for topic in topics)
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n")
asyncio.run(main())

Poskytovanie asynchrónnych aplikácií LLM pomocou FastAPI

Aby bola vaša asynchrónna LLM aplikácia dostupná ako webová služba, FastAPI je skvelou voľbou vďaka svojej natívnej podpore asynchrónnych operácií. Tu je príklad, ako vytvoriť jednoduchý koncový bod API na generovanie textu:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
class GenerationRequest(BaseModel):
    prompt: str
class GenerationResponse(BaseModel):
    generated_text: str
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=({"role": "user", "content": request.prompt})
    )
    generated_text = response.choices(0).message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.prompt, generated_text)
    
    return GenerationResponse(generated_text=generated_text)
async def log_generation(prompt: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Táto aplikácia FastAPI vytvára koncový bod /generate ktorý prijme výzvu a vráti vygenerovaný text. Tiež ukazuje, ako používať úlohy na pozadí na ďalšie spracovanie bez blokovania odpovede.

Osvedčené postupy a bežné úskalia

Pri práci s asynchrónnymi LLM API majte na pamäti tieto osvedčené postupy:

  1. Použite združovanie pripojení: Pri vytváraní viacerých požiadaviek znova použite pripojenia, aby ste znížili réžiu.
  2. Implementujte správne spracovanie chýb: Vždy zohľadnite problémy so sieťou, chyby API a neočakávané odpovede.
  3. Rešpektujte limity sadzieb: Použite semafory alebo iné mechanizmy kontroly súbežnosti, aby ste sa vyhli preťaženiu rozhrania API.
  4. Monitorujte a zaznamenávajte: Implementujte komplexné protokolovanie na sledovanie výkonu a identifikáciu problémov.
  5. Pre dlhý obsah použite streamovanie: Zlepšuje používateľskú skúsenosť a umožňuje skoré spracovanie čiastkových výsledkov.

Source Link

Related Posts

Leave a Comment