Ako vývojári a vedci v oblasti DTA sa často stretávame s tým, že potrebujeme komunikovať s týmito výkonnými modelmi prostredníctvom rozhraní API. S rastúcou zložitosťou a rozsahom našich aplikácií sa však potreba efektívnych a výkonných interakcií API stáva kľúčovou. Toto je miesto, kde žiari asynchrónne programovanie, ktoré nám umožňuje maximalizovať priepustnosť a minimalizovať latenciu pri práci s LLM API.
V tejto komplexnej príručke preskúmame svet asynchrónnych volaní LLM API v Pythone. Pokryjeme všetko od základov asynchrónneho programovania až po pokročilé techniky na zvládanie zložitých pracovných postupov. Na konci tohto článku budete dobre rozumieť tomu, ako využiť asynchrónne programovanie na prebitie vašich aplikácií poháňaných LLM.
Skôr než sa ponoríme do špecifík asynchrónnych volaní API LLM, položme pevný základ v konceptoch asynchrónneho programovania.
Asynchrónne programovanie umožňuje vykonávať viacero operácií súčasne bez blokovania hlavného vlákna vykonávania. V Pythone sa to primárne dosahuje prostredníctvom asyncio modul, ktorý poskytuje rámec na písanie súbežného kódu pomocou korutín, slučiek udalostí a budúcnosti.
Kľúčové pojmy:
- Korutíny: Funkcie definované pomocou async def ktoré možno pozastaviť a obnoviť.
- Slučka udalostí: Centrálny spúšťací mechanizmus, ktorý riadi a spúšťa asynchrónne úlohy.
- Očakávané: Objekty, ktoré možno použiť s kľúčovým slovom wait (korutíny, úlohy, budúcnosť).
Tu je jednoduchý príklad na ilustráciu týchto konceptov:
import asyncio async def greet(name): await asyncio.sleep(1) # Simulate an I/O operation print(f"Hello, {name}!") async def main(): await asyncio.gather( greet("Alice"), greet("Bob"), greet("Charlie") ) asyncio.run(main())
V tomto príklade definujeme asynchrónnu funkciu greet
ktorý simuluje I/O operáciu s asyncio.sleep()
. The main
využitie funkcií asyncio.gather()
spustiť viacero pozdravov súčasne. Napriek oneskoreniu spánku sa všetky tri pozdravy vytlačia približne po 1 sekunde, čo demonštruje silu asynchrónneho vykonávania.
Potreba async vo volaniach LLM API
Pri práci s LLM API sa často stretávame so scenármi, kedy potrebujeme uskutočniť viacero API volaní, buď postupne alebo paralelne. Tradičný synchrónny kód môže viesť k významným prekážkam výkonu, najmä pri operáciách s vysokou latenciou, ako sú sieťové požiadavky na služby LLM.
Zvážte scenár, v ktorom potrebujeme vygenerovať súhrny pre 100 rôznych článkov pomocou LLM API. Pri synchrónnom prístupe by sa každé volanie API zablokovalo, kým nedostane odpoveď, čo môže trvať niekoľko minút, kým sa všetky požiadavky dokončia. Asynchrónny prístup nám na druhej strane umožňuje iniciovať viacero volaní API súčasne, čím sa dramaticky skráti celkový čas vykonávania.
Nastavenie vášho prostredia
Ak chcete začať s asynchrónnymi volaniami LLM API, budete musieť nastaviť svoje prostredie Python s potrebnými knižnicami. Tu je to, čo budete potrebovať:
- Python 3.7 alebo vyššie (pre natívnu podporu asyncio)
- aiohttp: Asynchrónna knižnica HTTP klienta
- openai: Úradník Klient OpenAI Python (ak používate modely GPT OpenAI)
- langchain: Rámec na vytváranie aplikácií pomocou LLM (voliteľné, ale odporúčané pre zložité pracovné postupy)
Tieto závislosti môžete nainštalovať pomocou pip:
pip install aiohttp openai langchain <div class="relative flex flex-col rounded-lg">
Základné volania ASync LLM API s asyncio a aiohttp
Začnime jednoduchým asynchrónnym volaním LLM API pomocou aiohttp. Ako príklad použijeme API GPT-3.5 OpenAI, ale koncepty platia aj pre iné LLM API.
import asyncio import aiohttp from openai import AsyncOpenAI async def generate_text(prompt, client): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=({"role": "user", "content": prompt}) ) return response.choices(0).message.content async def main(): prompts = ( "Explain quantum computing in simple terms.", "Write a haiku about artificial intelligence.", "Describe the process of photosynthesis." ) async with AsyncOpenAI() as client: tasks = (generate_text(prompt, client) for prompt in prompts) results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
V tomto príklade definujeme asynchrónnu funkciu generate_text
ktorý zavolá OpenAI API pomocou klienta AsyncOpenAI. The main
vytvára viacero úloh pre rôzne výzvy a použitia asyncio.gather()
spustiť ich súbežne.
Tento prístup nám umožňuje odosielať viaceré požiadavky do LLM API súčasne, čím sa výrazne znižuje celkový čas potrebný na spracovanie všetkých výziev.
Pokročilé techniky: Dávkovanie a kontrola súbežnosti
Zatiaľ čo predchádzajúci príklad demonštruje základy asynchrónnych volaní LLM API, reálne aplikácie často vyžadujú sofistikovanejšie prístupy. Poďme preskúmať dve dôležité techniky: dávkovanie požiadaviek a riadenie súbežnosti.
Dávkové požiadavky: Pri spracovávaní veľkého počtu výziev je často efektívnejšie ich dávkovať do skupín, než posielať jednotlivé požiadavky pre každú výzvu. To znižuje réžiu viacerých volaní API a môže viesť k lepšiemu výkonu.
import asyncio from openai import AsyncOpenAI async def process_batch(batch, client): responses = await asyncio.gather(*( client.chat.completions.create( model="gpt-3.5-turbo", messages=({"role": "user", "content": prompt}) ) for prompt in batch )) return (response.choices(0).message.content for response in responses) async def main(): prompts = (f"Tell me a fact about number {i}" for i in range(100)) batch_size = 10 async with AsyncOpenAI() as client: results = () for i in range(0, len(prompts), batch_size): batch = prompts(i:i+batch_size) batch_results = await process_batch(batch, client) results.extend(batch_results) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Kontrola súbežnosti: Zatiaľ čo asynchrónne programovanie umožňuje súbežné vykonávanie, je dôležité kontrolovať úroveň súbežnosti, aby sa predišlo preťaženiu servera API alebo prekročeniu limitov rýchlosti. Na tento účel môžeme použiť asyncio.Semaphore.
import asyncio from openai import AsyncOpenAI async def generate_text(prompt, client, semaphore): async with semaphore: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=({"role": "user", "content": prompt}) ) return response.choices(0).message.content async def main(): prompts = (f"Tell me a fact about number {i}" for i in range(100)) max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = (generate_text(prompt, client, semaphore) for prompt in prompts) results = await asyncio.gather(*tasks) for prompt, result in zip(prompts, results): print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
V tomto príklade používame semafor na obmedzenie počtu súbežných požiadaviek na 5, čím zaisťujeme, že nepreťažíme server API.
Spracovanie chýb a opakovanie v asynchrónnych hovoroch LLM
Pri práci s externými rozhraniami API je dôležité implementovať robustné spracovanie chýb a mechanizmy opakovania. Poďme vylepšiť náš kód tak, aby zvládal bežné chyby a implementujme exponenciálne stiahnutie pre opakovanie.
import asyncio import random from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential class APIError(Exception): pass @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def generate_text_with_retry(prompt, client): try: response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=({"role": "user", "content": prompt}) ) return response.choices(0).message.content except Exception as e: print(f"Error occurred: {e}") raise APIError("Failed to generate text") async def process_prompt(prompt, client, semaphore): async with semaphore: try: result = await generate_text_with_retry(prompt, client) return prompt, result except APIError: return prompt, "Failed to generate response after multiple attempts." async def main(): prompts = (f"Tell me a fact about number {i}" for i in range(20)) max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as client: tasks = (process_prompt(prompt, client, semaphore) for prompt in prompts) results = await asyncio.gather(*tasks) for prompt, result in results: print(f"Prompt: {prompt}\nResponse: {result}\n") asyncio.run(main())
Táto vylepšená verzia obsahuje:
- Zvyk
APIError
výnimka pre chyby súvisiace s API. - A
generate_text_with_retry
funkcia zdobená@retry
z knižnice húževnatosti, ktorá implementuje exponenciálny ústup. - Spracovanie chýb v
process_prompt
funkcia zachytávania a hlásenia porúch.
Optimalizácia výkonu: Streamovanie odpovedí
Pri vytváraní dlhodobého obsahu môžu odozvy streamovania výrazne zlepšiť vnímaný výkon vašej aplikácie. Namiesto čakania na celú odpoveď môžete spracovať a zobraziť kúsky textu, keď budú k dispozícii.
import asyncio from openai import AsyncOpenAI async def stream_text(prompt, client): stream = await client.chat.completions.create( model="gpt-3.5-turbo", messages=({"role": "user", "content": prompt}), stream=True ) full_response = "" async for chunk in stream: if chunk.choices(0).delta.content is not None: content = chunk.choices(0).delta.content full_response += content print(content, end='', flush=True) print("\n") return full_response async def main(): prompt = "Write a short story about a time-traveling scientist." async with AsyncOpenAI() as client: result = await stream_text(prompt, client) print(f"Full response:\n{result}") asyncio.run(main())
Tento príklad ukazuje, ako streamovať odpoveď z rozhrania API, pričom každý blok sa vytlačí hneď, ako príde. Tento prístup je obzvlášť užitočný pre chatovacie aplikácie alebo akýkoľvek scenár, kde chcete používateľovi poskytnúť spätnú väzbu v reálnom čase.
Vytváranie asynchrónnych pracovných postupov s LangChain
Pre komplexnejšie aplikácie s podporou LLM je rámec LangChain poskytuje vysokoúrovňovú abstrakciu, ktorá zjednodušuje proces reťazenia viacerých LLM hovorov a integráciu ďalších nástrojov. Pozrime sa na príklad použitia LangChain s asynchrónnymi schopnosťami:
Tento príklad ukazuje, ako možno LangChain použiť na vytvorenie zložitejších pracovných postupov so streamovaním a asynchrónnym vykonávaním. The AsyncCallbackManager
a StreamingStdOutCallbackHandler
umožňujú streamovanie generovaného obsahu v reálnom čase.
import asyncio from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.callbacks.manager import AsyncCallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler async def generate_story(topic): llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager((StreamingStdOutCallbackHandler()))) prompt = PromptTemplate( input_variables=("topic"), template="Write a short story about {topic}." ) chain = LLMChain(llm=llm, prompt=prompt) return await chain.arun(topic=topic) async def main(): topics = ("a magical forest", "a futuristic city", "an underwater civilization") tasks = (generate_story(topic) for topic in topics) stories = await asyncio.gather(*tasks) for topic, story in zip(topics, stories): print(f"\nTopic: {topic}\nStory: {story}\n{'='*50}\n") asyncio.run(main())
Poskytovanie asynchrónnych aplikácií LLM pomocou FastAPI
Aby bola vaša asynchrónna LLM aplikácia dostupná ako webová služba, FastAPI je skvelou voľbou vďaka svojej natívnej podpore asynchrónnych operácií. Tu je príklad, ako vytvoriť jednoduchý koncový bod API na generovanie textu:
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from openai import AsyncOpenAI app = FastAPI() client = AsyncOpenAI() class GenerationRequest(BaseModel): prompt: str class GenerationResponse(BaseModel): generated_text: str @app.post("/generate", response_model=GenerationResponse) async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks): response = await client.chat.completions.create( model="gpt-3.5-turbo", messages=({"role": "user", "content": request.prompt}) ) generated_text = response.choices(0).message.content # Simulate some post-processing in the background background_tasks.add_task(log_generation, request.prompt, generated_text) return GenerationResponse(generated_text=generated_text) async def log_generation(prompt: str, generated_text: str): # Simulate logging or additional processing await asyncio.sleep(2) print(f"Logged: Prompt '{prompt}' generated text of length {len(generated_text)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Táto aplikácia FastAPI vytvára koncový bod /generate
ktorý prijme výzvu a vráti vygenerovaný text. Tiež ukazuje, ako používať úlohy na pozadí na ďalšie spracovanie bez blokovania odpovede.
Osvedčené postupy a bežné úskalia
Pri práci s asynchrónnymi LLM API majte na pamäti tieto osvedčené postupy:
- Použite združovanie pripojení: Pri vytváraní viacerých požiadaviek znova použite pripojenia, aby ste znížili réžiu.
- Implementujte správne spracovanie chýb: Vždy zohľadnite problémy so sieťou, chyby API a neočakávané odpovede.
- Rešpektujte limity sadzieb: Použite semafory alebo iné mechanizmy kontroly súbežnosti, aby ste sa vyhli preťaženiu rozhrania API.
- Monitorujte a zaznamenávajte: Implementujte komplexné protokolovanie na sledovanie výkonu a identifikáciu problémov.
- Pre dlhý obsah použite streamovanie: Zlepšuje používateľskú skúsenosť a umožňuje skoré spracovanie čiastkových výsledkov.