Utility function to load a flow created with the Langflow library.

Parameters

schema
Union[Dict, str]

The langflow schema dict or url.

tweaks
Optional[Dict]

Optional Langflow tweaks to be processed.

Usage

import json

from langchain.agents import AgentExecutor

from chainlit.langflow import load_flow
import chainlit as cl


with open("./schema.json", "r") as f:
    schema = json.load(f)


@cl.on_chat_start
async def start():
    flow = await load_flow(schema=schema)
    cl.user_session.set("flow", flow)


@cl.on_message
async def main(message: cl.Message):
    # Load the flow from the user session
    flow = cl.user_session.get("flow")  # type: AgentExecutor

    # Run the flow
    res = await cl.make_async(flow.run)(
        message.content, callbacks=[cl.LangchainCallbackHandler()]
    )

    # Send the response
    await cl.Message(content=res).send()