Python から 対話型アプリ(CLI / Web / デスクトップ)を作る手順は、だいたい次の設計で「外せない部品」を揃えるのが最短です。OpenAI 側は新規開発なら Responses API が推奨ルートです。 (OpenAI Platform)
1) 方式を決める(まずは最小構成)
対話型といっても、入出力の“器”で作り方が変わります。
- CLI(最短で動く):
input()で受けて表示。まずこれで会話状態・ストリーミングを固める - Web(FastAPI 等):フロント(ブラウザ)⇄API(Python)⇄OpenAI
- デスクトップ(Tkinter / PySide / Electron 等):UI イベント駆動+バックグラウンド通信
最初は CLI で 会話状態管理とエラー処理まで作ってから UI に載せ替えるのが楽です。
2) APIキーを安全に用意
APIキーは 環境変数に置くのが基本です(コードやリポジトリに直書きしない)。 (OpenAI Help Center)
- Windows(例):
setx OPENAI_API_KEY "<yourkey>" - macOS/Linux(例):
export OPENAI_API_KEY="<yourkey>"
3) Python SDK を入れる
OpenAI の Python SDK を入れて、最小の疎通を通します。 (OpenAI Platform)
pip install openai
4) 会話状態(メモリ)をどう持つか決める
対話アプリの肝はここです。選択肢は大きく2つ。
A. アプリ側で履歴を保持(シンプル・確実)
毎回「system/developer 指示+ユーザー履歴+直前のアシスタント出力」を送る方式。
ガチの業務アプリだとこの方が制御しやすいことが多いです。 (OpenAI Platform)
B. Responses API の連鎖/Conversations を使う(状態を扱いやすい)
Responses API は「前回応答を次回入力に使う」「会話状態を保持する」ための仕組みが用意されています。 (OpenAI Platform)
5) まずは CLI の最小実装(会話が続く)を作る
以下は **「履歴をアプリ側で保持」**する、最小の対話ループ例です(UI を何にしてもこの層は使い回せます)。
from openai import OpenAI
client = OpenAI()
SYSTEM = (
"あなたは業務システム開発を支援するアシスタント。"
"結論→理由→手順→注意点の順で簡潔に答える。"
)
def main():
history = [{"role": "system", "content": SYSTEM}]
while True:
user = input("you> ").strip()
if user.lower() in {"exit", "quit"}:
break
history.append({"role": "user", "content": user})
resp = client.responses.create(
model="gpt-4.1-mini", # 例:実際は用途/コスト/品質で選定
input=history,
)
# 取り出し方はSDK/レスポンス形式で多少差が出るため、
# まずは raw を print して確認→関数化するのが安全です。
text = resp.output_text
print(f"ai> {text}")
history.append({"role": "assistant", "content": text})
if __name__ == "__main__":
main()
ポイント
- 「会話状態」は history が担う(=UI に依存しない)
- 量が増えたら、要約・圧縮・古い履歴の間引きが必要になります(長期運用の定石) (OpenAI Platform)
6) “対話型”を気持ちよくする(ストリーミング/リアルタイム)
- テキストを 逐次表示したい → Responses のストリーミング(SDKの stream 口)を使う
- 音声や超低遅延の対話 → Realtime 系のガイドに沿って SSE/WS で組む (OpenAI Platform)
(UI が Web なら SSE、デスクトップなら別スレッド/asyncio で表示更新、が典型です)
7) 業務レベルに上げるチェックリスト
ベテランSE向けに、ここを押さえると事故が減ります。
- プロンプトを“コードから分離”:役割(system/developer)と業務ルールをテンプレ化
- タイムアウト/リトライ/サーキットブレーカ:ネットワーク前提の作法
- ログ:入力・出力・モデル・コスト・レイテンシを相関IDで追えるように
- PII/機密:マスキング、保存ポリシー、監査(必要なら)
- ツール呼び出し(関数呼び出し):DB参照や社内APIを「モデルに直接やらせず」安全な関数として提供(拡張しやすい) (OpenAI Platform)
もし「最終形」が **Web(FastAPI + フロント)**なのか、**デスクトップ(Tkinter/PySide)**なのか、CLIで完結なのかだけ教えてくれたら、その形に合わせて
- ディレクトリ構成
- 非同期/ストリーミング実装
- 会話履歴の永続化(SQLite/Redis 等)
まで、実装ひな形を一式で出します。
デスクトップ(Tkinter/PySide)
了解です。デスクトップ(Tkinter / PySide)で OpenAI モデルを使った対話アプリを作るときは、UI から API 呼び出しを 分離して「UI を固めない」構成にするのが王道です。OpenAI 側は新規なら Responses API を中心に組むのが今の推奨です。 (OpenAI Platform)
全体手順(共通アーキテクチャ)
1) “UI層”と“LLM層”を分ける
- UI層:入力欄、送信ボタン、ログ表示、キャンセル、履歴一覧など
- LLM層(サービス層):OpenAI 呼び出し、会話履歴管理、リトライ/タイムアウト、ストリーミング取り回し
UIイベント →(キュー/シグナル)→ LLM層 →(逐次イベント)→ UI更新
この構造にすると Tkinter/PySide どちらでも同じロジックを使い回せます。
2) 会話履歴(メモリ)の保持方式を決める
最初は「履歴をアプリ側で持つ」でOKです。Responses API は input に user/assistant を交互に入れることで会話状態を表現できます。 (OpenAI Platform)
3) UIを固めない(重要)
- Tkinter:threading + Queue + after() でUIスレッドに返す
- PySide:QThread / QRunnable + Signal でUI更新
4) ストリーミング(任意だが体験が良くなる)
長文や応答待ちがあるなら、ストリーミングで「生成中…」を表示すると体験が段違いです。 (OpenAI Platform)
OpenAI 呼び出しの最小形(共通:Responses API)
client.responses.create(...)を使い- 出力は
response.output_textで取り出すのが安全(output配列は複数要素になり得る) (OpenAI Platform)
Tkinter 版:実装テンプレ(非ブロッキング + ストリーミング対応)
ポイント:API呼び出しは別スレッド、UIは after() でポーリングして更新。
import os
import threading
import queue
import tkinter as tk
from tkinter.scrolledtext import ScrolledText
from openai import OpenAI
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Desktop Chat (Tkinter + OpenAI)")
self.geometry("900x600")
self.log = ScrolledText(self, wrap=tk.WORD)
self.log.pack(fill=tk.BOTH, expand=True)
bottom = tk.Frame(self)
bottom.pack(fill=tk.X)
self.entry = tk.Entry(bottom)
self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5, pady=5)
self.entry.bind("<Return>", lambda e: self.on_send())
self.send_btn = tk.Button(bottom, text="Send", command=self.on_send)
self.send_btn.pack(side=tk.LEFT, padx=5, pady=5)
self.q = queue.Queue()
self.worker_busy = False
# 会話履歴(最小)
self.history = [{"role": "system", "content": SYSTEM}]
self.after(30, self.pump_events)
def append_log(self, text: str):
self.log.insert(tk.END, text)
self.log.see(tk.END)
def on_send(self):
if self.worker_busy:
return
user = self.entry.get().strip()
if not user:
return
self.entry.delete(0, tk.END)
self.append_log(f"you> {user}\n")
self.history.append({"role": "user", "content": user})
self.worker_busy = True
self.send_btn.config(state=tk.DISABLED)
threading.Thread(target=self.run_llm_stream, daemon=True).start()
def run_llm_stream(self):
"""
ストリーミングでイベントをQueueに流す。
例では event.delta を想定し、UI側で逐次追記。
"""
try:
# Streaming guide 参照:SSE で逐次イベントが来る :contentReference[oaicite:4]{index=4}
stream = client.responses.create(
model="gpt-4o-mini",
input=self.history,
stream=True,
)
self.q.put(("ai_start", None))
chunks = []
for event in stream:
# SDKのイベント形は更新され得るので、まずは event を print/inspect して調整するのが安全
# ここは代表的な「増分テキスト」を拾う実装例(必要に応じて差し替え)
delta = getattr(event, "delta", None)
if isinstance(delta, str) and delta:
chunks.append(delta)
self.q.put(("ai_delta", delta))
full = "".join(chunks).strip()
if full:
self.history.append({"role": "assistant", "content": full})
self.q.put(("ai_end", None))
except Exception as e:
self.q.put(("error", str(e)))
def pump_events(self):
try:
while True:
kind, payload = self.q.get_nowait()
if kind == "ai_start":
self.append_log("ai> ")
elif kind == "ai_delta":
self.append_log(payload)
elif kind == "ai_end":
self.append_log("\n")
self.worker_busy = False
self.send_btn.config(state=tk.NORMAL)
elif kind == "error":
self.append_log(f"\n[ERROR] {payload}\n")
self.worker_busy = False
self.send_btn.config(state=tk.NORMAL)
except queue.Empty:
pass
self.after(30, self.pump_events)
if __name__ == "__main__":
App().mainloop()
実務で必ず入れるもの
- タイムアウト/リトライ(ネットワーク前提)
- 送信中のキャンセル(フラグ+ストリーム停止)
- 履歴が肥大したら「要約して圧縮」か「直近 Nターンのみ」 (OpenAI Platform)
PySide6 版:実装テンプレ(QThread + Signal)
PySide は「UI更新はメインスレッドで」が鉄則なので、Signalで増分を飛ばします。
import os
from PySide6.QtCore import QThread, Signal
from PySide6.QtWidgets import QApplication, QWidget, QVBoxLayout, QTextEdit, QLineEdit, QPushButton
from openai import OpenAI
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"
class LLMWorker(QThread):
delta = Signal(str)
started_ai = Signal()
finished_ai = Signal()
failed = Signal(str)
def __init__(self, history):
super().__init__()
self.history = history
def run(self):
try:
stream = client.responses.create(
model="gpt-4o-mini",
input=self.history,
stream=True,
)
self.started_ai.emit()
chunks = []
for event in stream:
d = getattr(event, "delta", None)
if isinstance(d, str) and d:
chunks.append(d)
self.delta.emit(d)
full = "".join(chunks).strip()
if full:
self.history.append({"role": "assistant", "content": full})
self.finished_ai.emit()
except Exception as e:
self.failed.emit(str(e))
class Main(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("Desktop Chat (PySide6 + OpenAI)")
self.resize(900, 600)
self.history = [{"role": "system", "content": SYSTEM}]
self.worker = None
self.chat = QTextEdit()
self.chat.setReadOnly(True)
self.input = QLineEdit()
self.btn = QPushButton("Send")
layout = QVBoxLayout()
layout.addWidget(self.chat)
layout.addWidget(self.input)
layout.addWidget(self.btn)
self.setLayout(layout)
self.btn.clicked.connect(self.on_send)
self.input.returnPressed.connect(self.on_send)
def on_send(self):
if self.worker and self.worker.isRunning():
return
text = self.input.text().strip()
if not text:
return
self.input.clear()
self.chat.insertPlainText(f"you> {text}\n")
self.history.append({"role": "user", "content": text})
self.worker = LLMWorker(self.history)
self.worker.started_ai.connect(lambda: self.chat.insertPlainText("ai> "))
self.worker.delta.connect(lambda d: self.chat.insertPlainText(d))
self.worker.finished_ai.connect(lambda: self.chat.insertPlainText("\n"))
self.worker.failed.connect(lambda e: self.chat.insertPlainText(f"\n[ERROR] {e}\n"))
self.worker.start()
if __name__ == "__main__":
app = QApplication([])
w = Main()
w.show()
app.exec()
次に足すと“アプリっぽくなる”機能(優先順)
- 履歴の永続化:SQLite(会話ID/メッセージ/時刻/モデル/トークン等)
- プロンプトテンプレ管理:system 指示を外部ファイル化、切替UI
- ツール呼び出し(関数呼び出し):DB参照・社内API・ファイル操作は「モデルに直にやらせず」関数として安全に提供 (OpenAI Platform)
- ガード:入力サイズ制限、機密語マスキング、ログポリシー
- ストリーミングのイベント取り回しの堅牢化:イベント形が変わっても壊れにくいパーサ(
output_text中心+イベントはフォールバック) (OpenAI Platform)
もしあなたの想定が「業務向け(社内ツール)」なら、次は どこまでローカルで完結させたいかで最適構成が変わります:
- A) 完全ローカル(APIだけ外部)
- B) 社内プロキシサーバ経由(監査・鍵管理)
- C) 社内データ/DB検索を組み込み(function calling / RAG)
この3つのどれを想定していますか?(聞きつつ、どれでも良いように“推奨の雛形ディレクトリ構成”もすぐ出せます)
A) **完全ローカル(APIだけ外部)**なら、設計の芯はこれです:
- APIキーはローカル端末内に安全保管(環境変数/OSの資格情報ストア)
- UIスレッドを絶対にブロックしない(ワーカー + ストリームイベント)
- 会話履歴・設定・ログをローカル永続化(SQLite)
- OpenAI 側は新規なら Responses API(テキスト/画像入力、ストリーミング、状態的なやり取りの基盤) (OpenAI Platform)
以下、“手順”としてそのまま実装計画に落とせる形でまとめます。
1) ディレクトリ構成(まずこれで迷子防止)
最小でもこの分離にしておくと、Tkinter/PySide を後で差し替えできます。
app/
ui/ # Tkinter or PySide 実装
core/
chat_service.py # OpenAI呼び出し・履歴管理・ストリーミング
models.py # Message/Settings などの型
storage.py # SQLite 永続化
redact.py # マスキング(任意)
resources/
main.py
2) キー管理(ローカル完結の“正しい落としどころ”)
完全ローカルでは「ユーザー端末にキーを置く」前提なので、漏洩対策が重要です。
- 環境変数
OPENAI_API_KEYを基本にする(SDKもこれを読む) (OpenAI Platform) - キーを コード/設定ファイルに直書きしない (OpenAI Help Center)
- 可能なら OS の資格情報ストア(Windows Credential Manager / macOS Keychain 等)も検討(運用で差が出ます)
注意:OpenAI の公式でも「クライアント(ブラウザ/モバイル)にキーを埋め込むな」は強く言っています。デスクトップも“配布形態”によっては同じリスクになります。社内配布ならまだ現実的ですが、外部配布なら プロキシ方式(B) を再検討が安全です。 (OpenAI Help Center)
3) OpenAI 呼び出しは Responses API を“1箇所”に閉じ込める
- Responses は 最も進んだ生成インターフェースとして位置付けられています (OpenAI Platform)
- Python SDK の導入と基本呼び出しは Quickstart / Libraries に準拠 (OpenAI Platform)
ストリーミングを標準にする理由
「対話型」体験の差が大きいです。OpenAI 公式も SSE で逐次イベントが出る形をガイドしています。 (OpenAI Platform)
4) UI(Tkinter/PySide)側の鉄則:UI更新はメインスレッド
推奨パターン
- Tkinter:
threading+queue.Queue+after()で UI 反映 - PySide:
QThread/QRunnable+Signalで UI 反映
ここが守れていれば、OpenAI 呼び出し方式(通常/ストリーミング)やモデル切替を後からいくらでも変えられます。
5) “キャンセル”は最初から入れる(運用で効く)
ローカルアプリは「ユーザーが連打する」ので、最低限これを入れると安定します。
- 送信ボタン無効化(多重送信防止)
- 生成中は キャンセルボタンを有効化
- キャンセル時は
- ストリーム読み取りを止める(フラグで break)
- UI に「中断しました」を追記
- 履歴には “中断” として扱う(次ターンの破綻を避ける)
6) ローカル永続化(SQLite)—実務向けの最小スキーマ
最低でもこの3つがあると「アプリ」になります。
conversations(id, title, created_at, updated_at, system_prompt_version, model)messages(id, conversation_id, role, content, created_at)settings(key, value)(JSONでOK)
そして起動時に
- 前回の会話一覧表示
- クリックで復元
ができるようにする。
7) 履歴肥大対策(長期利用の必須)
ローカルで使い続けると、必ず履歴が重くなります。
現実的な戦略は2つ:
- 直近Nターンのみ送る(簡単・堅牢)
- 古い履歴は 要約して圧縮し、systemメモに格納して送る
“まず1)”で十分です。必要になったら2)を追加。
8) 構造化出力(業務アプリで事故を減らす)
対話アプリでも「ボタン操作」「フォーム入力」「設定変更」などをモデルに頼むなら、自由文より スキーマ固定が強いです。OpenAI は Structured Outputs(JSON Schema / Pydantic など)を案内しています。 (OpenAI Platform)
例:
- 「次のアクションは
search / summarize / draft / noopのどれか」 - 「必要パラメータを必ず揃える」
みたいな事故防止ができます。
9) 配布(完全ローカルなら最終的にここが壁)
- PyInstaller(1ファイル/1フォルダ配布)
- 設定ファイルとDBの置き場所(
%APPDATA%/~/.config) - ログローテート(肥大化対策)
配布先が社内端末なら、ここまで整えると運用が回ります。
10) ここまでの“最短ロードマップ”
- CLIで疎通(SDK導入・Responsesで1回応答) (OpenAI Platform)
- UIだけ作る(入力欄/送信/ログ)
- ワーカー化してUIフリーズ解消
- ストリーミング対応(体験が完成) (OpenAI Platform)
- SQLite 永続化(会話一覧/復元)
- キャンセル/エラー処理/ログ
- 構造化出力(必要なら) (OpenAI Platform)
もしよければ、次のどちらで **“完成形のひな形一式”**を出します(コード+構成+SQLite+キャンセル+ストリーミングまで含める):
- Tkinter 版(標準ライブラリ中心で軽い)
- PySide6 版(業務アプリっぽいUIにしやすい)
どっちに寄せます?(迷うなら PySide6 推しですが、まず動かすなら Tkinter が速いです)
Tkinter 版(標準ライブラリ中心で軽い)
了解です。Tkinter(標準ライブラリ中心)+OpenAI Responses APIで「完全ローカル(APIだけ外部)」の対話アプリを作るなら、次の“最小で実務耐性がある”構成が鉄板です。
- OpenAI 呼び出しは Responses API に一本化(
output_textが使える/statefulにも拡張しやすい) (OpenAI Platform) - UI を固めない:worker thread → Queue →
after()でUI更新 - 永続化:SQLite(会話一覧・復元)
- ストリーミング:SSEイベント(
response.output_text.delta)で逐次表示 (OpenAI Platform)
以下、そのままコピペで動く「ひな形一式」です。
0) 依存関係と起動
pip install openai setx OPENAI_API_KEY "YOUR_KEY" # Windows例(PowerShellなら $env:OPENAI_API_KEY="...") python main.py
OpenAI SDK は OPENAI_API_KEY を読むのが標準です。 (OpenAI Platform)
1) ディレクトリ構成
desktop_chat/
main.py
core/
chat_service.py
storage.py
models.py
2) コード一式
core/models.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
Role = Literal["system", "user", "assistant"]
@dataclass
class Message:
role: Role
content: str
core/storage.py(SQLite 永続化)
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable, Optional, Tuple
SCHEMA = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY(conversation_id) REFERENCES conversations(id)
);
CREATE INDEX IF NOT EXISTS idx_messages_conv_time
ON messages(conversation_id, created_at);
"""
class Storage:
def __init__(self, db_path: str = "chat.db"):
self.path = Path(db_path)
self.conn = sqlite3.connect(self.path, check_same_thread=False)
self.conn.execute("PRAGMA foreign_keys=ON;")
self.conn.executescript(SCHEMA)
self.conn.commit()
def create_conversation(self, title: str) -> int:
cur = self.conn.cursor()
cur.execute("INSERT INTO conversations(title) VALUES(?)", (title,))
self.conn.commit()
return int(cur.lastrowid)
def touch_conversation(self, conversation_id: int) -> None:
self.conn.execute(
"UPDATE conversations SET updated_at=datetime('now') WHERE id=?",
(conversation_id,),
)
self.conn.commit()
def list_conversations(self, limit: int = 50) -> Iterable[Tuple[int, str, str]]:
# id, title, updated_at
cur = self.conn.cursor()
cur.execute(
"SELECT id, title, updated_at FROM conversations ORDER BY updated_at DESC LIMIT ?",
(limit,),
)
return cur.fetchall()
def add_message(self, conversation_id: int, role: str, content: str) -> None:
self.conn.execute(
"INSERT INTO messages(conversation_id, role, content) VALUES(?,?,?)",
(conversation_id, role, content),
)
self.touch_conversation(conversation_id)
def load_messages(self, conversation_id: int) -> list[tuple[str, str]]:
cur = self.conn.cursor()
cur.execute(
"SELECT role, content FROM messages WHERE conversation_id=? ORDER BY id ASC",
(conversation_id,),
)
return cur.fetchall()
def close(self) -> None:
self.conn.close()
core/chat_service.py(Responses API 呼び出し + ストリーミング)
from __future__ import annotations
import os
from typing import Iterable, List, Optional
from openai import OpenAI
from .models import Message
DEFAULT_SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"
class ChatService:
"""
UIから独立した“LLM層”。
- history: [Message(...), ...] を受け取り
- streamならテキストdeltaを逐次yield
- non-streamなら全文を返す
"""
def __init__(self, model: str = "gpt-4o-mini", api_key: Optional[str] = None):
self.client = OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
self.model = model
def ensure_system(self, history: List[Message]) -> List[Message]:
if not history or history[0].role != "system":
return [Message("system", DEFAULT_SYSTEM)] + history
return history
def create_once(self, history: List[Message]) -> str:
history = self.ensure_system(history)
resp = self.client.responses.create(
model=self.model,
input=[{"role": m.role, "content": m.content} for m in history],
)
# output_text はSDKの便利プロパティ(output配列のテキストを集約) :contentReference[oaicite:3]{index=3}
return (resp.output_text or "").strip()
def stream_text(self, history: List[Message]) -> Iterable[str]:
"""
SSEイベントを逐次処理。代表的には event.type == 'response.output_text.delta' で event.delta が増分。 :contentReference[oaicite:4]{index=4}
"""
history = self.ensure_system(history)
stream = self.client.responses.create(
model=self.model,
input=[{"role": m.role, "content": m.content} for m in history],
stream=True,
)
for event in stream:
etype = getattr(event, "type", None)
if etype == "response.output_text.delta":
delta = getattr(event, "delta", "")
if delta:
yield delta
# 他イベント(開始/完了/エラー等)は必要に応じて拾う :contentReference[oaicite:5]{index=5}
main.py(Tkinter UI:非ブロッキング + キャンセル + 会話一覧)
from __future__ import annotations
import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
from core.models import Message
from core.storage import Storage
from core.chat_service import ChatService, DEFAULT_SYSTEM
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Desktop Chat (Tkinter + OpenAI)")
self.geometry("980x650")
self.storage = Storage("chat.db")
self.svc = ChatService(model="gpt-4o-mini")
self.event_q: "queue.Queue[tuple[str, str | None]]" = queue.Queue()
self.cancel_flag = threading.Event()
self.worker: threading.Thread | None = None
self.current_conv_id = self.storage.create_conversation("New chat")
self.history: list[Message] = [Message("system", DEFAULT_SYSTEM)]
self._build_ui()
self._load_conversation_list()
self.after(30, self._pump_events)
def _build_ui(self):
outer = ttk.Frame(self)
outer.pack(fill=tk.BOTH, expand=True)
# 左:会話一覧
left = ttk.Frame(outer, width=260)
left.pack(side=tk.LEFT, fill=tk.Y)
ttk.Label(left, text="Conversations").pack(anchor="w", padx=8, pady=(8,2))
self.conv_list = tk.Listbox(left, height=20)
self.conv_list.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
self.conv_list.bind("<<ListboxSelect>>", self.on_select_conversation)
btns = ttk.Frame(left)
btns.pack(fill=tk.X, padx=8, pady=6)
ttk.Button(btns, text="New", command=self.on_new_chat).pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Button(btns, text="Reload", command=self._load_conversation_list).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(6,0))
# 右:チャット
right = ttk.Frame(outer)
right.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.log = ScrolledText(right, wrap=tk.WORD)
self.log.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
bottom = ttk.Frame(right)
bottom.pack(fill=tk.X, padx=8, pady=(0,8))
self.entry = ttk.Entry(bottom)
self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.entry.bind("<Return>", lambda e: self.on_send())
self.send_btn = ttk.Button(bottom, text="Send", command=self.on_send)
self.send_btn.pack(side=tk.LEFT, padx=(8,0))
self.cancel_btn = ttk.Button(bottom, text="Cancel", command=self.on_cancel, state=tk.DISABLED)
self.cancel_btn.pack(side=tk.LEFT, padx=(8,0))
def _append(self, text: str):
self.log.insert(tk.END, text)
self.log.see(tk.END)
def _load_conversation_list(self):
self.conv_list.delete(0, tk.END)
self.conv_rows = list(self.storage.list_conversations(limit=100))
for (cid, title, updated_at) in self.conv_rows:
self.conv_list.insert(tk.END, f"[{cid}] {title} ({updated_at})")
# いま開いてる会話を選択状態に
for i, (cid, _, _) in enumerate(self.conv_rows):
if cid == self.current_conv_id:
self.conv_list.selection_set(i)
self.conv_list.see(i)
break
def on_new_chat(self):
if self._busy():
return
self.current_conv_id = self.storage.create_conversation("New chat")
self.history = [Message("system", DEFAULT_SYSTEM)]
self.log.delete("1.0", tk.END)
self._load_conversation_list()
def on_select_conversation(self, _evt=None):
if self._busy():
return
sel = self.conv_list.curselection()
if not sel:
return
idx = int(sel[0])
cid = self.conv_rows[idx][0]
self.current_conv_id = cid
# DBから復元
self.log.delete("1.0", tk.END)
self.history = [Message("system", DEFAULT_SYSTEM)]
rows = self.storage.load_messages(cid)
for role, content in rows:
if role == "user":
self._append(f"you> {content}\n")
self.history.append(Message("user", content))
elif role == "assistant":
self._append(f"ai> {content}\n")
self.history.append(Message("assistant", content))
def _busy(self) -> bool:
return self.worker is not None and self.worker.is_alive()
def on_cancel(self):
self.cancel_flag.set()
def on_send(self):
if self._busy():
return
user = self.entry.get().strip()
if not user:
return
self.entry.delete(0, tk.END)
self._append(f"you> {user}\n")
self.history.append(Message("user", user))
self.storage.add_message(self.current_conv_id, "user", user)
self.send_btn.configure(state=tk.DISABLED)
self.cancel_btn.configure(state=tk.NORMAL)
self.cancel_flag.clear()
self.worker = threading.Thread(target=self._worker_stream, daemon=True)
self.worker.start()
def _worker_stream(self):
try:
self.event_q.put(("ai_start", None))
buf = []
for delta in self.svc.stream_text(self.history):
if self.cancel_flag.is_set():
self.event_q.put(("ai_cancelled", None))
return
buf.append(delta)
self.event_q.put(("ai_delta", delta))
full = "".join(buf).strip()
if full:
self.history.append(Message("assistant", full))
self.storage.add_message(self.current_conv_id, "assistant", full)
self.event_q.put(("ai_end", None))
except Exception as e:
self.event_q.put(("error", str(e)))
def _pump_events(self):
try:
while True:
kind, payload = self.event_q.get_nowait()
if kind == "ai_start":
self._append("ai> ")
elif kind == "ai_delta":
self._append(payload or "")
elif kind == "ai_end":
self._append("\n")
self._finish_turn()
elif kind == "ai_cancelled":
self._append("\n[Cancelled]\n")
self._finish_turn()
elif kind == "error":
self._append(f"\n[ERROR] {payload}\n")
self._finish_turn()
except queue.Empty:
pass
self.after(30, self._pump_events)
def _finish_turn(self):
self.send_btn.configure(state=tk.NORMAL)
self.cancel_btn.configure(state=tk.DISABLED)
self._load_conversation_list()
def destroy(self):
try:
self.storage.close()
finally:
super().destroy()
if __name__ == "__main__":
App().mainloop()
3) このひな形が “実務で効く”ポイント
- UIが固まらない:OpenAI呼び出しはスレッド、UI更新は
after()だけ - ストリーミング:
response.output_text.deltaを拾って逐次描画(体感が大幅に良くなる) (OpenAI Platform) output_text前提の設計:output配列の構造に依存せず安全(SDK側が集約) (OpenAI Platform)- SQLiteで会話が残る:再起動しても続きから使える
4) 次に足すとさらに安定する(優先度順)
- タイムアウト/リトライ(ネットワーク瞬断対策)
- 履歴肥大の制御(直近Nターンのみ送信、または要約圧縮)
- 設定画面(モデル名、system prompt、保存先、ログレベル)
- Structured Outputs(ボタン操作やフォーム入力をモデルにやらせるなら事故防止に強い) (OpenAI Platform)
このままでも動きますが、あなたがベテランSEなら次に決めると設計が一気に締まります:
- **「会話履歴は直近Nターン」**にしますか、それとも **「要約圧縮」**も最初から入れますか?(Nターン方式が運用楽です)
要約圧縮
了解です。要約圧縮を入れると「長期利用でも破綻しにくい」アプリになります。やり方はシンプルで、
- 古い履歴をまとめて “会話サマリ” に圧縮
- 次回以降はそのサマリを system(またはdeveloper相当) として先頭に付ける
- DBにはサマリも保存(再起動しても継続)
という流れにします。
以下、いまの Tkinter ひな形に 最小改造で入れる実装を出します。
1) 方針(実装の肝)
いつ要約する?
- 送信のたびに
historyの総文字数(またはメッセージ数)が閾値を超えたら要約- 例:合計 12,000 文字超で要約
- あるいは「最新 N ターンは残し、残りを要約」にする
- 例:直近 8 ターンは残す(user/assistant を 8 往復)
何を要約する?
- system はそのまま
- 直近 N ターンより前の user/assistant を要約対象にして、
summary_message = Message("system", "【会話要約】...")を 1 つ作る
- 既存の要約があれば、新しい要約を 追記・統合(“要約の要約”)
2) core/chat_service.py に「要約関数」を追加
ChatService に summarize() を足します(モデルは軽量でOK)。
# core/chat_service.py に追記
SUMMARY_PROMPT = """あなたは会話履歴を圧縮する要約エンジンです。
次の会話ログを、今後の会話を継続するための「保持すべき情報」に圧縮してください。
要件:
- 事実/前提/決定事項/制約/未解決タスク/ユーザーの意図 を優先
- 余談や重複は捨てる
- 日本語で簡潔に
- 出力形式:
1) 前提・背景:
2) 決定事項:
3) 未解決/次のアクション:
4) 用語・固有名詞:
"""
class ChatService:
# 既存...
def summarize_text(self, existing_summary: str, chunk_text: str) -> str:
"""
existing_summary: 既にある要約(無ければ空)
chunk_text: 今回まとめたい古い履歴(プレーンテキスト)
"""
input_messages = [
{"role": "system", "content": SUMMARY_PROMPT},
]
if existing_summary.strip():
input_messages.append({"role": "user", "content": f"既存の要約:\n{existing_summary}\n"})
input_messages.append({"role": "user", "content": f"新しい会話ログ:\n{chunk_text}\n"})
resp = self.client.responses.create(
model="gpt-4o-mini", # 要約は軽量で十分なことが多い
input=input_messages,
)
return (resp.output_text or "").strip()
3) main.py に「要約圧縮」を組み込む
3-1) 閾値と直近保持ターン数を追加
App.__init__ のあたりに定数を置きます。
self.SUMMARY_MAX_CHARS = 12000 # 閾値(適宜調整) self.KEEP_LAST_TURNS = 8 # 直近何往復残すか self.summary_text = "" # 現在の要約(DBから復元できるように後で保存)
3-2) DBに要約を保存する(最小は messages に role=”summary” を入れる)
いちばん改造が少ない方法は、messages に role=”summary” を保存する方式です。
(スキーマ変更なしで動きます)
- 保存:
storage.add_message(conversation_id, "summary", summary_text) - 復元:
load_messages()で role==”summary” を拾う
on_select_conversation() の復元処理を少し変更
# on_select_conversation 内の rows ループに追記
self.summary_text = ""
for role, content in rows:
if role == "summary":
self.summary_text = content # 最新だけ使う(複数あるなら最後のものを採用)
そして、通常ログ表示は user/assistant だけでOK(summaryは表示しない or 表示するなら別UI)。
3-3) history の先頭に「要約」を挿入する関数を追加
main.py にユーティリティを作ります。
def build_effective_history(self) -> list[Message]:
"""
OpenAIに送る履歴(system + summary + 直近メッセージ)
"""
h = [Message("system", DEFAULT_SYSTEM)]
if self.summary_text.strip():
h.append(Message("system", "【会話要約】\n" + self.summary_text.strip()))
# self.history は system含みで持っていたが、ここでは user/assistant のみでもOK
for m in self.history:
if m.role in ("user", "assistant"):
h.append(m)
return h
3-4) 送信前に要約圧縮を走らせる
on_send() で user を history に append したあと、worker を起動する前に要約判定を入れます。
def total_chars_of_history(self) -> int:
return sum(len(m.content) for m in self.history if m.role in ("user","assistant"))
def maybe_summarize(self):
# 閾値未満なら何もしない
if self.total_chars_of_history() <= self.SUMMARY_MAX_CHARS:
return
# 直近 KEEP_LAST_TURNS 往復を残す: user/assistant のペアを数える
ua = [m for m in self.history if m.role in ("user","assistant")]
# 往復=2メッセージなので keep_msgs = KEEP_LAST_TURNS*2
keep_msgs = self.KEEP_LAST_TURNS * 2
if len(ua) <= keep_msgs:
return
old = ua[:-keep_msgs]
recent = ua[-keep_msgs:]
# old をプレーンテキスト化
chunk = []
for m in old:
prefix = "USER" if m.role == "user" else "ASSISTANT"
chunk.append(f"{prefix}: {m.content}")
chunk_text = "\n".join(chunk)
# 既存要約と統合して更新
new_summary = self.svc.summarize_text(self.summary_text, chunk_text)
self.summary_text = new_summary
# DB保存(role=summary で1件追加)
self.storage.add_message(self.current_conv_id, "summary", self.summary_text)
# history は recent だけ残す(古い分を圧縮したため)
self.history = recent
on_send() の中で、userメッセージ保存後に呼びます:
self.history.append(Message("user", user))
self.storage.add_message(self.current_conv_id, "user", user)
self.maybe_summarize() # ← 追加
3-5) OpenAI呼び出しは「effective_history」を使う
_worker_stream() で self.svc.stream_text(self.history) となっている部分を差し替えます:
effective = self.build_effective_history()
for delta in self.svc.stream_text(effective):
...
assistant全文を self.history.append(Message("assistant", full)) するのはそのままでOKです。
4) 運用で効くチューニング(最初からおすすめ)
- **閾値は「メッセージ数」より「文字数」**が安定(日本語はトークン換算がブレやすいので)
KEEP_LAST_TURNSは 6〜12 くらいが扱いやすい- 要約は軽量モデルでOK(コスト/速度面)
- 要約文は「曖昧さ」を減らすために、制約・決定・未決を必ず分ける(上のSUMMARY_PROMPTがそれ)
5) これで起きがちな罠(先に潰す)
- 要約が肥大化する
→ 要約も一定長を超えたら「要約の要約」を作る(同じ summarize_text で existing_summary に突っ込むだけでOK) - 要約が“仕様”を改変する
→ systemの「不変ルール」は要約に混ぜない。systemは system のまま固定、要約は“事実と決定”中心に。
必要なら、ここまでの変更を反映した「完成版ファイル一式」をそのまま貼れる形でまとめ直します。
次はどっちを優先します?
- 要約の要約(summary肥大対策) も最初から入れる
- まずは今回の最小版で動かして、挙動を見て調整
まずは今回の最小版で動かして、挙動を見て調整
了解です。では「今回の最小版(要約圧縮つき)」として、そのままコピペで動く状態にまとめます。
(前回のひな形に対して、差分が少ない構成です)
1) 変更点まとめ(最小)
ChatServiceに summarize_text() を追加(要約用)main.pyにsummary_text(現在の要約)maybe_summarize()(閾値超過時に要約して履歴を圧縮)build_effective_history()(OpenAIに送る履歴=system + 要約 + 直近ログ)
- SQLite は既存テーブルのまま、
messages.roleに"summary"を追加して保存(スキーマ変更不要)
2) コード一式(最小版)
core/chat_service.py(差し替え)
from __future__ import annotations
import os
from typing import Iterable, List, Optional
from openai import OpenAI
from .models import Message
DEFAULT_SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"
SUMMARY_PROMPT = """あなたは会話履歴を圧縮する要約エンジンです。
次の会話ログを、今後の会話を継続するための「保持すべき情報」に圧縮してください。
要件:
- 事実/前提/決定事項/制約/未解決タスク/ユーザーの意図 を優先
- 余談や重複は捨てる
- 日本語で簡潔に
- 出力形式:
1) 前提・背景:
2) 決定事項:
3) 未解決/次のアクション:
4) 用語・固有名詞:
"""
class ChatService:
def __init__(self, model: str = "gpt-4o-mini", api_key: Optional[str] = None):
self.client = OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
self.model = model
def ensure_system(self, history: List[Message]) -> List[Message]:
if not history or history[0].role != "system":
return [Message("system", DEFAULT_SYSTEM)] + history
return history
def create_once(self, history: List[Message]) -> str:
history = self.ensure_system(history)
resp = self.client.responses.create(
model=self.model,
input=[{"role": m.role, "content": m.content} for m in history],
)
return (resp.output_text or "").strip()
def stream_text(self, history: List[Message]) -> Iterable[str]:
"""
event.type == 'response.output_text.delta' の delta を逐次拾う方式(代表例)。
SDKのイベント形は更新され得るので、動かしながら必要に応じて調整してください。
"""
history = self.ensure_system(history)
stream = self.client.responses.create(
model=self.model,
input=[{"role": m.role, "content": m.content} for m in history],
stream=True,
)
for event in stream:
etype = getattr(event, "type", None)
if etype == "response.output_text.delta":
delta = getattr(event, "delta", "")
if delta:
yield delta
def summarize_text(self, existing_summary: str, chunk_text: str) -> str:
"""
既存要約(existing_summary)があればそれを踏まえて統合しつつ、
chunk_text(古い会話ログ)を圧縮して返す。
"""
input_messages = [{"role": "system", "content": SUMMARY_PROMPT}]
if existing_summary.strip():
input_messages.append({"role": "user", "content": f"既存の要約:\n{existing_summary}\n"})
input_messages.append({"role": "user", "content": f"新しい会話ログ:\n{chunk_text}\n"})
resp = self.client.responses.create(
model=self.model, # 要約も同じモデルでOK。軽量モデルにしても良い
input=input_messages,
)
return (resp.output_text or "").strip()
main.py(差し替え)
from __future__ import annotations
import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
from core.models import Message
from core.storage import Storage
from core.chat_service import ChatService, DEFAULT_SYSTEM
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Desktop Chat (Tkinter + OpenAI) - Summary Compression")
self.geometry("980x650")
self.storage = Storage("chat.db")
self.svc = ChatService(model="gpt-4o-mini")
self.event_q: "queue.Queue[tuple[str, str | None]]" = queue.Queue()
self.cancel_flag = threading.Event()
self.worker: threading.Thread | None = None
self.current_conv_id = self.storage.create_conversation("New chat")
# 履歴(systemは固定、user/assistantのみここに積む)
self.history: list[Message] = []
self.summary_text: str = ""
# 要約トリガ(最小設定)
self.SUMMARY_MAX_CHARS = 12000 # 古い履歴の合計文字数がこれを超えたら要約
self.KEEP_LAST_TURNS = 8 # 直近何往復残すか(user/assistantで1往復)
self._build_ui()
self._load_conversation_list()
self.after(30, self._pump_events)
def _build_ui(self):
outer = ttk.Frame(self)
outer.pack(fill=tk.BOTH, expand=True)
# 左:会話一覧
left = ttk.Frame(outer, width=260)
left.pack(side=tk.LEFT, fill=tk.Y)
ttk.Label(left, text="Conversations").pack(anchor="w", padx=8, pady=(8,2))
self.conv_list = tk.Listbox(left, height=20)
self.conv_list.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
self.conv_list.bind("<<ListboxSelect>>", self.on_select_conversation)
btns = ttk.Frame(left)
btns.pack(fill=tk.X, padx=8, pady=6)
ttk.Button(btns, text="New", command=self.on_new_chat).pack(side=tk.LEFT, fill=tk.X, expand=True)
ttk.Button(btns, text="Reload", command=self._load_conversation_list).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(6,0))
# 右:チャット
right = ttk.Frame(outer)
right.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.log = ScrolledText(right, wrap=tk.WORD)
self.log.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
bottom = ttk.Frame(right)
bottom.pack(fill=tk.X, padx=8, pady=(0,8))
self.entry = ttk.Entry(bottom)
self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.entry.bind("<Return>", lambda e: self.on_send())
self.send_btn = ttk.Button(bottom, text="Send", command=self.on_send)
self.send_btn.pack(side=tk.LEFT, padx=(8,0))
self.cancel_btn = ttk.Button(bottom, text="Cancel", command=self.on_cancel, state=tk.DISABLED)
self.cancel_btn.pack(side=tk.LEFT, padx=(8,0))
def _append(self, text: str):
self.log.insert(tk.END, text)
self.log.see(tk.END)
def _busy(self) -> bool:
return self.worker is not None and self.worker.is_alive()
# ---- 会話一覧 / 復元 ----
def _load_conversation_list(self):
self.conv_list.delete(0, tk.END)
self.conv_rows = list(self.storage.list_conversations(limit=100))
for (cid, title, updated_at) in self.conv_rows:
self.conv_list.insert(tk.END, f"[{cid}] {title} ({updated_at})")
for i, (cid, _, _) in enumerate(self.conv_rows):
if cid == self.current_conv_id:
self.conv_list.selection_set(i)
self.conv_list.see(i)
break
def on_new_chat(self):
if self._busy():
return
self.current_conv_id = self.storage.create_conversation("New chat")
self.history = []
self.summary_text = ""
self.log.delete("1.0", tk.END)
self._load_conversation_list()
def on_select_conversation(self, _evt=None):
if self._busy():
return
sel = self.conv_list.curselection()
if not sel:
return
idx = int(sel[0])
cid = self.conv_rows[idx][0]
self.current_conv_id = cid
self.log.delete("1.0", tk.END)
self.history = []
self.summary_text = ""
rows = self.storage.load_messages(cid)
# summaryは最後の1個を採用(複数保存されていても良い)
for role, content in rows:
if role == "summary":
self.summary_text = content
# user/assistant を表示・復元
for role, content in rows:
if role == "user":
self._append(f"you> {content}\n")
self.history.append(Message("user", content))
elif role == "assistant":
self._append(f"ai> {content}\n")
self.history.append(Message("assistant", content))
# ---- 要約圧縮ロジック ----
def total_chars_of_history(self) -> int:
return sum(len(m.content) for m in self.history if m.role in ("user", "assistant"))
def build_effective_history(self) -> list[Message]:
h = [Message("system", DEFAULT_SYSTEM)]
if self.summary_text.strip():
h.append(Message("system", "【会話要約】\n" + self.summary_text.strip()))
h.extend([m for m in self.history if m.role in ("user", "assistant")])
return h
def maybe_summarize(self):
# 閾値未満なら何もしない
if self.total_chars_of_history() <= self.SUMMARY_MAX_CHARS:
return
ua = [m for m in self.history if m.role in ("user", "assistant")]
keep_msgs = self.KEEP_LAST_TURNS * 2 # 1往復=2メッセージ
if len(ua) <= keep_msgs:
return
old = ua[:-keep_msgs]
recent = ua[-keep_msgs:]
chunk_lines = []
for m in old:
prefix = "USER" if m.role == "user" else "ASSISTANT"
chunk_lines.append(f"{prefix}: {m.content}")
chunk_text = "\n".join(chunk_lines)
# 既存要約と統合して更新
new_summary = self.svc.summarize_text(self.summary_text, chunk_text)
self.summary_text = new_summary
# DB保存(role=summary で追加)
self.storage.add_message(self.current_conv_id, "summary", self.summary_text)
# メモリ上の履歴は recent だけ残す(古いのは要約に圧縮済み)
self.history = recent
# ---- 送信/キャンセル ----
def on_cancel(self):
self.cancel_flag.set()
def on_send(self):
if self._busy():
return
user = self.entry.get().strip()
if not user:
return
self.entry.delete(0, tk.END)
self._append(f"you> {user}\n")
self.history.append(Message("user", user))
self.storage.add_message(self.current_conv_id, "user", user)
# ここで要約判定(最小版)
self.maybe_summarize()
self.send_btn.configure(state=tk.DISABLED)
self.cancel_btn.configure(state=tk.NORMAL)
self.cancel_flag.clear()
self.worker = threading.Thread(target=self._worker_stream, daemon=True)
self.worker.start()
def _worker_stream(self):
try:
self.event_q.put(("ai_start", None))
buf = []
effective = self.build_effective_history()
for delta in self.svc.stream_text(effective):
if self.cancel_flag.is_set():
self.event_q.put(("ai_cancelled", None))
return
buf.append(delta)
self.event_q.put(("ai_delta", delta))
full = "".join(buf).strip()
if full:
self.history.append(Message("assistant", full))
self.storage.add_message(self.current_conv_id, "assistant", full)
self.event_q.put(("ai_end", None))
except Exception as e:
self.event_q.put(("error", str(e)))
# ---- UIイベントポンプ ----
def _pump_events(self):
try:
while True:
kind, payload = self.event_q.get_nowait()
if kind == "ai_start":
self._append("ai> ")
elif kind == "ai_delta":
self._append(payload or "")
elif kind == "ai_end":
self._append("\n")
self._finish_turn()
elif kind == "ai_cancelled":
self._append("\n[Cancelled]\n")
self._finish_turn()
elif kind == "error":
self._append(f"\n[ERROR] {payload}\n")
self._finish_turn()
except queue.Empty:
pass
self.after(30, self._pump_events)
def _finish_turn(self):
self.send_btn.configure(state=tk.NORMAL)
self.cancel_btn.configure(state=tk.DISABLED)
self._load_conversation_list()
def destroy(self):
try:
self.storage.close()
finally:
super().destroy()
if __name__ == "__main__":
App().mainloop()
3) 動作確認の観点(最小版で見るべき挙動)
- しばらく会話して、履歴が増えたら
- 返答が遅くなりにくいか(要約で軽くなるか)
- 要約が走った後でも
- 文脈が維持されるか(決定事項・制約が残るか)
chat.dbが増える(role=summaryが入る)- 会話を切り替えても summary が復元されるか
4) 次の調整ポイント(挙動を見てからでOK)
SUMMARY_MAX_CHARSを上げ下げ(要約の頻度)KEEP_LAST_TURNSを増減(直近文脈の保持)SUMMARY_PROMPTの出力フォーマットをあなた好みに(“仕様書メモ”風にするなど)
動かしてみて、もし
- 「要約が早すぎる/遅すぎる」
- 「要約後に会話がズレる」
- 「要約が長くなりすぎる」
のどれかが出たら、その症状に合わせて 最小差分の改善案(要約の要約/要約を構造化/要約に“固定ルール”を入れる等)を出します。