import asyncio
import os
from typing import AsyncIterator
from uuid import uuid4
from literalai import AsyncLiteralClient
from openai import AsyncOpenAI
LITERAL_API_KEY = os.getenv("LITERAL_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PROMPT = """Return the user message uppercase"""
literalai_client = AsyncLiteralClient(api_key=LITERAL_API_KEY, environment="dev")
literalai_client.initialize()
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
async def call_completion(msg: str) -> AsyncIterator[str]:
stream = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": PROMPT},
{"role": "user", "content": msg},
],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
async def make_request(num: int) -> None:
print(f"Running {num}")
thread_id = f"req{num}_{uuid4()}"
msg = f"Hello {num}"
with literalai_client.thread(thread_id=thread_id, name=thread_id):
literalai_client.message(content=msg, type="user_message")
print(f"Querying {num}")
response = ""
with literalai_client.step(type="run", name="run_stream"):
async for chunk in call_completion(msg):
response += chunk
await asyncio.sleep(0)
print(f"Got response {num}")
literalai_client.message(content=response, type="assistant_message")
async def main() -> None:
requests = [make_request(i) for i in range(3)]
await asyncio.gather(*requests)
literalai_client.flush_and_stop()
if __name__ == "__main__":
asyncio.run(main())