はじめに
こんにちは!今週のテックブログは、AI SaaS 事業本部で「Hikari」の開発を担当している町田が執筆します。
AI SaaS 事業本部で開発・提供している Hikari は、生成 AI を用いて建設業の業務の効率化をサポートする Web アプリケーションです。
建設業界には、書類作成の負荷が高く本来の業務に集中できないといった課題があります。Hikari は建設業に特化した LLM サービスとして社内文書のデータ活用を支援することで、ユーザが本来の現場業務に向き合う時間を増やしています。
AI SaaS 事業本部では、以前 Langflow というノーコードで LLM エージェントの実装ができる OSS を AWS にセルフホストして利用していました。ある時、Langflow で LLM エージェントを開発していた同僚から、エージェントを実行すると途中でエラーが起きてしまうので調査してほしいという依頼がありました。
本記事では実際に起こった問題を踏まえながら、ストリーミング通信をプロキシサーバで中継する際の注意点をお伝えします。
まずは調査から
Langflow の GUI では、Error Building Component という文言が表示されており、ネットワークエラーであることが明記されていました。
また、AWS 上で展開した Langflow のサーバログを確認したところ、Build cancelled という文言がありました。
ネットワーク周りで問題が起きてるのかな🤔


エラーの再現を試みると、エージェントの実行後から 30 秒程度経つと必ずこのエラーが発生することがわかりました。
何か機械的な原因があることが推測されます。
また、アプリケーション側が故意に 30 秒毎にネットワークエラーをスローするとは考えられないため、AWS 上でのインフラ構成を確認することにしました。

我々は Langflow のコンテナを ECS で展開しています。インフラ構成図を確認すると、クライアントと ECS の間に幾つかのサービスがあります。CloudFront はリバースプロキシの役割を担うサービスであるため、怪しく感じて設定を確認しました。

Response timeout という設定が怪しい...🤔
こちらを 60 秒に設定すると、実際に Network Error が起こるタイミングがエージェントの実行開始の 60 秒後になりました。どうやらこの設定が影響しているようです。
Response timeout は、CloudFront がバックエンドサーバにリクエストを送ってからレスポンスが返ってくるまで待機する最大秒数を設定しています。何かレスポンスが返った場合には、待ち時間がリセットされ、長時間のストリーミング通信ができるようになります。一方で、何もレスポンスが返ってこないと設定時間を超えた時に接続を切断します。
同僚が実行していたエージェントは実行時間が非常に長いツールを実行しており、1分以上レスポンスがないような状況はザラにありました。設定時間を超過したため、接続が切断されていたようです。
すぐに思いつく解決策は2種類ありました。
- CloudFront の Response timeout を可能な限り長くする。
- バックエンド側から定期的にメッセージを送信し、Response timeout に引っかからないようにする。
1の方法は、CloudFront の Response timeout に設定できる最大時間が1分であること、エラーを引き起こすエージェントがツール実行に1分以上かかるのは仕様上仕方ないことが理由で、採用できませんでした。2の方法で対応することにしました。
タイムアウトへの対応
代表的なストリーミング通信である Server-Sent Events(SSE)では、接続のタイムアウトが起こりうるケースにおいて、クライアントに影響を及ぼさない「コメント」的なメッセージ(keep-alive メッセージ)を定期的に送信して、接続が切断されないようにすることが推奨されています。 今回のケースでは長時間データが流れないことはエージェントの仕様上仕方がないため、Langflow の実装を修正し、定期的にメッセージを送信することで接続が切断されないようにしました。
Langflow では、ストリーミング出力する文字列を生成して Queue に追加する関数と Queue からデータを取り出してユーザに送る関数の2つが実装されています。今回は後者の関数を、一定時間データが流れない場合に Keep-Alive メッセージを送信する実装に変更しました。
## こんな感じのコードです ## async def run_flow_generator( queue: asyncio.Queue ) -> None: # エージェントを実行して結果を格納している。詳細は省略 result = await run_flow() queue.put_nowait(result) async def consume_and_yield( queue: asyncio.Queue ) -> AsyncGenerator[bytes]: while True: try: value = await asyncio.wait_for( queue.get(), timeout=KEEP_ALIVE_TIMEOUT ) # イベントの処理の詳細は省略 yield value except asyncio.TimeoutError: # KEEP_ALIVE_TIMEOUT 秒以上待ってもデータが来なかった場合は、 # 接続が切断されないようにメッセージを送信する yield b'{"event": "keepalive", "data": {}}\n\n'
その後、圧倒的当事者意識で Langflow にプルリクを送るなどしました✌️
(管理者からは❤️マークだけ付けられて、レスポンスがないですが...)
プロキシサーバを立てて様々なストリーミングを試してみよう
今回は SSE に近い仕様のストリーミング通信で問題が発生していましたが、WebSocket や gRPC も同様の問題が起こるはずです。検証のため、ローカル環境に HTTP サーバとプロキシサーバを構築し、様々なストリーミング通信を試してみます。
検証にはプロキシサーバの envoy を利用します。envoy はマイクロサービスのためのサイドカーコンテナとして開発されており、様々な通信の中継を簡単に設定できます。
envoy のタイムアウト設定には様々な項目がありますが、今回は以下の2つの設定を利用しました。
- timeout: リクエスト全体のタイムアウト時間を設定します。デフォルトは 15 秒であり、ほとんどの通信はどのような状況であれ、この設定時間を超えると接続が切断されます。
- stream_idle_timeout: ストリーミング通信において、データが一定時間流れない場合に接続を切断する時間を設定します。デフォルトは5分であり、それ以上の時間データが流れない場合は接続が切断されます。
今回は envoy の timeout を 30 秒、stream_idle_timeout を 20 秒に設定します。また、アプリケーション側では Keep-Alive メッセージを 15 秒毎に送信する場合と、送信しない場合の2つのケースを試します。
# envoy.yaml の抜粋 stream_idle_timeout: 20s # **この行に全てのrouteに対するstream_idle_timeoutを設定** route_config: name: local_route virtual_hosts: - name: local_service domains: ["*"] cors: allow_origin_string_match: - prefix: "*" allow_methods: GET, PUT, DELETE, POST, OPTIONS allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout max_age: "1728000" expose_headers: custom-header-1,grpc-status,grpc-message routes: # ****************ここの下にroute毎のtimeout設定を記述******************** - match: prefix: "/hello.Greeter/" route: cluster: grpc_cluster timeout: 30s - match: prefix: "/fastapi/" route: cluster: fastapi_cluster prefix_rewrite: "/" timeout: 30s upgrade_configs: - upgrade_type: websocket - match: prefix: "/fastapi" route: cluster: fastapi_cluster prefix_rewrite: "/" timeout: 30s - match: prefix: "/" route: cluster: nextjs_cluster timeout: 30s
結果
| Keep-Alive なし | Keep-Alive あり | |
|---|---|---|
| SSE (HTTP1.1) | 20 秒でタイムアウト | 30 秒でタイムアウト |
| WebSocket | 20 秒でタイムアウト | 10 分待ってもタイムアウトしない |
| gRPC (HTTP2) | 20 秒でタイムアウト | 30 秒でタイムアウト |
全てのプロトコルで stream_idle_timeout の影響を Keep-Alive メッセージで回避できることがわかりました。
また、envoy の timeout の影響を SSE と gRPC は受けますが、WebSocket は例外的に timeout の影響を受けない仕様になっているようです。
プロキシサーバの仕様に応じて、この辺りの仕様が異なる可能性があります。プロトコル毎に想定した待ち時間でタイムアウトが発生するかどうかを、顧客にアプリケーションを展開する前に確認しておくと良いでしょう。
余談ですが、Amazon CloudFront の Response timeout と envoy の stream_idle_timeout は同じような役割を担っていますが、デフォルトの値が大きく異なっていた点が面白く感じました。プロキシごとに設計思想が異なっており、このような差異があるものと考えられます。プロキシサーバを利用する際は、デフォルトの設定を確認しておくと良いでしょう。
まとめ
プロキシサーバを介してストリーミング通信を行う場合、メッセージを長時間送信しない状況が続くとプロキシが接続を切断する可能性があります。 そのため、必要に応じて定期的にメッセージを送信することで、接続を切断されないようにすることが重要です。
また、プロキシサーバのタイムアウト設定を確認し、必要に応じて調整することも重要です。
ただし、ストリーミング通信において長時間何もメッセージをクライアントに送信しない仕様は、ユーザビリティの観点では望ましくありません。Keep-Alive の仕組みは接続を保持することはできますが、ユーザビリティの向上には繋がらないので、レスポンスの設計を見直した方が良いでしょう。
We're Hiring!
燈では、自分たちのコードを読むことはもちろん、OSS の実装をガンガン読み込むことも楽しめるエンジニアを募集しています!
ぜひカジュアル面談でお話ししましょう!