Python から 対話型アプリ(CLI / Web / デスクトップ)を作る手順

Python から 対話型アプリ(CLI / Web / デスクトップ)を作る手順は、だいたい次の設計で「外せない部品」を揃えるのが最短です。OpenAI 側は新規開発なら Responses API が推奨ルートです。 (OpenAI Platform)


目次

1) 方式を決める(まずは最小構成)

対話型といっても、入出力の“器”で作り方が変わります。

  • CLI(最短で動く)input() で受けて表示。まずこれで会話状態・ストリーミングを固める
  • Web(FastAPI 等):フロント(ブラウザ)⇄API(Python)⇄OpenAI
  • デスクトップ(Tkinter / PySide / Electron 等):UI イベント駆動+バックグラウンド通信

最初は CLI で 会話状態管理エラー処理まで作ってから UI に載せ替えるのが楽です。


2) APIキーを安全に用意

APIキーは 環境変数に置くのが基本です(コードやリポジトリに直書きしない)。 (OpenAI Help Center)

  • Windows(例):setx OPENAI_API_KEY "<yourkey>"
  • macOS/Linux(例):export OPENAI_API_KEY="<yourkey>"

3) Python SDK を入れる

OpenAI の Python SDK を入れて、最小の疎通を通します。 (OpenAI Platform)

pip install openai

4) 会話状態(メモリ)をどう持つか決める

対話アプリの肝はここです。選択肢は大きく2つ。

A. アプリ側で履歴を保持(シンプル・確実)

毎回「system/developer 指示+ユーザー履歴+直前のアシスタント出力」を送る方式。
ガチの業務アプリだとこの方が制御しやすいことが多いです。 (OpenAI Platform)

B. Responses API の連鎖/Conversations を使う(状態を扱いやすい)

Responses API は「前回応答を次回入力に使う」「会話状態を保持する」ための仕組みが用意されています。 (OpenAI Platform)


5) まずは CLI の最小実装(会話が続く)を作る

以下は **「履歴をアプリ側で保持」**する、最小の対話ループ例です(UI を何にしてもこの層は使い回せます)。

from openai import OpenAI

client = OpenAI()

SYSTEM = (
    "あなたは業務システム開発を支援するアシスタント。"
    "結論→理由→手順→注意点の順で簡潔に答える。"
)

def main():
    history = [{"role": "system", "content": SYSTEM}]

    while True:
        user = input("you> ").strip()
        if user.lower() in {"exit", "quit"}:
            break

        history.append({"role": "user", "content": user})

        resp = client.responses.create(
            model="gpt-4.1-mini",   # 例:実際は用途/コスト/品質で選定
            input=history,
        )

        # 取り出し方はSDK/レスポンス形式で多少差が出るため、
        # まずは raw を print して確認→関数化するのが安全です。
        text = resp.output_text
        print(f"ai> {text}")

        history.append({"role": "assistant", "content": text})

if __name__ == "__main__":
    main()

ポイント

  • 「会話状態」は history が担う(=UI に依存しない)
  • 量が増えたら、要約・圧縮・古い履歴の間引きが必要になります(長期運用の定石) (OpenAI Platform)

6) “対話型”を気持ちよくする(ストリーミング/リアルタイム)

  • テキストを 逐次表示したい → Responses のストリーミング(SDKの stream 口)を使う
  • 音声や超低遅延の対話 → Realtime 系のガイドに沿って SSE/WS で組む (OpenAI Platform)

(UI が Web なら SSE、デスクトップなら別スレッド/asyncio で表示更新、が典型です)


7) 業務レベルに上げるチェックリスト

ベテランSE向けに、ここを押さえると事故が減ります。

  • プロンプトを“コードから分離”:役割(system/developer)と業務ルールをテンプレ化
  • タイムアウト/リトライ/サーキットブレーカ:ネットワーク前提の作法
  • ログ:入力・出力・モデル・コスト・レイテンシを相関IDで追えるように
  • PII/機密:マスキング、保存ポリシー、監査(必要なら)
  • ツール呼び出し(関数呼び出し):DB参照や社内APIを「モデルに直接やらせず」安全な関数として提供(拡張しやすい) (OpenAI Platform)

もし「最終形」が **Web(FastAPI + フロント)**なのか、**デスクトップ(Tkinter/PySide)**なのか、CLIで完結なのかだけ教えてくれたら、その形に合わせて

  • ディレクトリ構成
  • 非同期/ストリーミング実装
  • 会話履歴の永続化(SQLite/Redis 等)
    まで、実装ひな形を一式で出します。

デスクトップ(Tkinter/PySide)

了解です。デスクトップ(Tkinter / PySide)で OpenAI モデルを使った対話アプリを作るときは、UI から API 呼び出しを 分離して「UI を固めない」構成にするのが王道です。OpenAI 側は新規なら Responses API を中心に組むのが今の推奨です。 (OpenAI Platform)


全体手順(共通アーキテクチャ)

1) “UI層”と“LLM層”を分ける

  • UI層:入力欄、送信ボタン、ログ表示、キャンセル、履歴一覧など
  • LLM層(サービス層):OpenAI 呼び出し、会話履歴管理、リトライ/タイムアウト、ストリーミング取り回し

UIイベント →(キュー/シグナル)→ LLM層 →(逐次イベント)→ UI更新
この構造にすると Tkinter/PySide どちらでも同じロジックを使い回せます。

2) 会話履歴(メモリ)の保持方式を決める

最初は「履歴をアプリ側で持つ」でOKです。Responses API は inputuser/assistant を交互に入れることで会話状態を表現できます。 (OpenAI Platform)

3) UIを固めない(重要)

  • Tkinter:threading + Queue + after() でUIスレッドに返す
  • PySide:QThread / QRunnable + Signal でUI更新

4) ストリーミング(任意だが体験が良くなる)

長文や応答待ちがあるなら、ストリーミングで「生成中…」を表示すると体験が段違いです。 (OpenAI Platform)


OpenAI 呼び出しの最小形(共通:Responses API)

  • client.responses.create(...) を使い
  • 出力は response.output_text で取り出すのが安全(output 配列は複数要素になり得る) (OpenAI Platform)

Tkinter 版:実装テンプレ(非ブロッキング + ストリーミング対応)

ポイント:API呼び出しは別スレッド、UIは after() でポーリングして更新。

import os
import threading
import queue
import tkinter as tk
from tkinter.scrolledtext import ScrolledText
from openai import OpenAI

client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Desktop Chat (Tkinter + OpenAI)")
        self.geometry("900x600")

        self.log = ScrolledText(self, wrap=tk.WORD)
        self.log.pack(fill=tk.BOTH, expand=True)

        bottom = tk.Frame(self)
        bottom.pack(fill=tk.X)

        self.entry = tk.Entry(bottom)
        self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5, pady=5)
        self.entry.bind("<Return>", lambda e: self.on_send())

        self.send_btn = tk.Button(bottom, text="Send", command=self.on_send)
        self.send_btn.pack(side=tk.LEFT, padx=5, pady=5)

        self.q = queue.Queue()
        self.worker_busy = False

        # 会話履歴(最小)
        self.history = [{"role": "system", "content": SYSTEM}]

        self.after(30, self.pump_events)

    def append_log(self, text: str):
        self.log.insert(tk.END, text)
        self.log.see(tk.END)

    def on_send(self):
        if self.worker_busy:
            return
        user = self.entry.get().strip()
        if not user:
            return
        self.entry.delete(0, tk.END)

        self.append_log(f"you> {user}\n")
        self.history.append({"role": "user", "content": user})

        self.worker_busy = True
        self.send_btn.config(state=tk.DISABLED)

        threading.Thread(target=self.run_llm_stream, daemon=True).start()

    def run_llm_stream(self):
        """
        ストリーミングでイベントをQueueに流す。
        例では event.delta を想定し、UI側で逐次追記。
        """
        try:
            # Streaming guide 参照:SSE で逐次イベントが来る :contentReference[oaicite:4]{index=4}
            stream = client.responses.create(
                model="gpt-4o-mini",
                input=self.history,
                stream=True,
            )

            self.q.put(("ai_start", None))
            chunks = []

            for event in stream:
                # SDKのイベント形は更新され得るので、まずは event を print/inspect して調整するのが安全
                # ここは代表的な「増分テキスト」を拾う実装例(必要に応じて差し替え)
                delta = getattr(event, "delta", None)
                if isinstance(delta, str) and delta:
                    chunks.append(delta)
                    self.q.put(("ai_delta", delta))

            full = "".join(chunks).strip()
            if full:
                self.history.append({"role": "assistant", "content": full})
            self.q.put(("ai_end", None))

        except Exception as e:
            self.q.put(("error", str(e)))

    def pump_events(self):
        try:
            while True:
                kind, payload = self.q.get_nowait()
                if kind == "ai_start":
                    self.append_log("ai> ")
                elif kind == "ai_delta":
                    self.append_log(payload)
                elif kind == "ai_end":
                    self.append_log("\n")
                    self.worker_busy = False
                    self.send_btn.config(state=tk.NORMAL)
                elif kind == "error":
                    self.append_log(f"\n[ERROR] {payload}\n")
                    self.worker_busy = False
                    self.send_btn.config(state=tk.NORMAL)
        except queue.Empty:
            pass
        self.after(30, self.pump_events)

if __name__ == "__main__":
    App().mainloop()

実務で必ず入れるもの

  • タイムアウト/リトライ(ネットワーク前提)
  • 送信中のキャンセル(フラグ+ストリーム停止)
  • 履歴が肥大したら「要約して圧縮」か「直近 Nターンのみ」 (OpenAI Platform)

PySide6 版:実装テンプレ(QThread + Signal)

PySide は「UI更新はメインスレッドで」が鉄則なので、Signalで増分を飛ばします。

import os
from PySide6.QtCore import QThread, Signal
from PySide6.QtWidgets import QApplication, QWidget, QVBoxLayout, QTextEdit, QLineEdit, QPushButton
from openai import OpenAI

client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"

class LLMWorker(QThread):
    delta = Signal(str)
    started_ai = Signal()
    finished_ai = Signal()
    failed = Signal(str)

    def __init__(self, history):
        super().__init__()
        self.history = history

    def run(self):
        try:
            stream = client.responses.create(
                model="gpt-4o-mini",
                input=self.history,
                stream=True,
            )
            self.started_ai.emit()
            chunks = []
            for event in stream:
                d = getattr(event, "delta", None)
                if isinstance(d, str) and d:
                    chunks.append(d)
                    self.delta.emit(d)
            full = "".join(chunks).strip()
            if full:
                self.history.append({"role": "assistant", "content": full})
            self.finished_ai.emit()
        except Exception as e:
            self.failed.emit(str(e))

class Main(QWidget):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Desktop Chat (PySide6 + OpenAI)")
        self.resize(900, 600)

        self.history = [{"role": "system", "content": SYSTEM}]
        self.worker = None

        self.chat = QTextEdit()
        self.chat.setReadOnly(True)
        self.input = QLineEdit()
        self.btn = QPushButton("Send")

        layout = QVBoxLayout()
        layout.addWidget(self.chat)
        layout.addWidget(self.input)
        layout.addWidget(self.btn)
        self.setLayout(layout)

        self.btn.clicked.connect(self.on_send)
        self.input.returnPressed.connect(self.on_send)

    def on_send(self):
        if self.worker and self.worker.isRunning():
            return
        text = self.input.text().strip()
        if not text:
            return
        self.input.clear()

        self.chat.insertPlainText(f"you> {text}\n")
        self.history.append({"role": "user", "content": text})

        self.worker = LLMWorker(self.history)
        self.worker.started_ai.connect(lambda: self.chat.insertPlainText("ai> "))
        self.worker.delta.connect(lambda d: self.chat.insertPlainText(d))
        self.worker.finished_ai.connect(lambda: self.chat.insertPlainText("\n"))
        self.worker.failed.connect(lambda e: self.chat.insertPlainText(f"\n[ERROR] {e}\n"))
        self.worker.start()

if __name__ == "__main__":
    app = QApplication([])
    w = Main()
    w.show()
    app.exec()

次に足すと“アプリっぽくなる”機能(優先順)

  1. 履歴の永続化:SQLite(会話ID/メッセージ/時刻/モデル/トークン等)
  2. プロンプトテンプレ管理:system 指示を外部ファイル化、切替UI
  3. ツール呼び出し(関数呼び出し):DB参照・社内API・ファイル操作は「モデルに直にやらせず」関数として安全に提供 (OpenAI Platform)
  4. ガード:入力サイズ制限、機密語マスキング、ログポリシー
  5. ストリーミングのイベント取り回しの堅牢化:イベント形が変わっても壊れにくいパーサ(output_text中心+イベントはフォールバック) (OpenAI Platform)

もしあなたの想定が「業務向け(社内ツール)」なら、次は どこまでローカルで完結させたいかで最適構成が変わります:

  • A) 完全ローカル(APIだけ外部)
  • B) 社内プロキシサーバ経由(監査・鍵管理)
  • C) 社内データ/DB検索を組み込み(function calling / RAG)

この3つのどれを想定していますか?(聞きつつ、どれでも良いように“推奨の雛形ディレクトリ構成”もすぐ出せます)

A) **完全ローカル(APIだけ外部)**なら、設計の芯はこれです:

  • APIキーはローカル端末内に安全保管(環境変数/OSの資格情報ストア)
  • UIスレッドを絶対にブロックしない(ワーカー + ストリームイベント)
  • 会話履歴・設定・ログをローカル永続化(SQLite)
  • OpenAI 側は新規なら Responses API(テキスト/画像入力、ストリーミング、状態的なやり取りの基盤) (OpenAI Platform)

以下、“手順”としてそのまま実装計画に落とせる形でまとめます。


1) ディレクトリ構成(まずこれで迷子防止)

最小でもこの分離にしておくと、Tkinter/PySide を後で差し替えできます。

app/
  ui/                # Tkinter or PySide 実装
  core/
    chat_service.py  # OpenAI呼び出し・履歴管理・ストリーミング
    models.py        # Message/Settings などの型
    storage.py       # SQLite 永続化
    redact.py        # マスキング(任意)
  resources/
  main.py

2) キー管理(ローカル完結の“正しい落としどころ”)

完全ローカルでは「ユーザー端末にキーを置く」前提なので、漏洩対策が重要です。

  • 環境変数 OPENAI_API_KEY を基本にする(SDKもこれを読む) (OpenAI Platform)
  • キーを コード/設定ファイルに直書きしない (OpenAI Help Center)
  • 可能なら OS の資格情報ストア(Windows Credential Manager / macOS Keychain 等)も検討(運用で差が出ます)

注意:OpenAI の公式でも「クライアント(ブラウザ/モバイル)にキーを埋め込むな」は強く言っています。デスクトップも“配布形態”によっては同じリスクになります。社内配布ならまだ現実的ですが、外部配布なら プロキシ方式(B) を再検討が安全です。 (OpenAI Help Center)


3) OpenAI 呼び出しは Responses API を“1箇所”に閉じ込める

  • Responses は 最も進んだ生成インターフェースとして位置付けられています (OpenAI Platform)
  • Python SDK の導入と基本呼び出しは Quickstart / Libraries に準拠 (OpenAI Platform)

ストリーミングを標準にする理由

「対話型」体験の差が大きいです。OpenAI 公式も SSE で逐次イベントが出る形をガイドしています。 (OpenAI Platform)


4) UI(Tkinter/PySide)側の鉄則:UI更新はメインスレッド

推奨パターン

  • Tkinterthreading + queue.Queue + after() で UI 反映
  • PySideQThread/QRunnable + Signal で UI 反映

ここが守れていれば、OpenAI 呼び出し方式(通常/ストリーミング)やモデル切替を後からいくらでも変えられます。


5) “キャンセル”は最初から入れる(運用で効く)

ローカルアプリは「ユーザーが連打する」ので、最低限これを入れると安定します。

  • 送信ボタン無効化(多重送信防止)
  • 生成中は キャンセルボタンを有効化
  • キャンセル時は
    • ストリーム読み取りを止める(フラグで break)
    • UI に「中断しました」を追記
    • 履歴には “中断” として扱う(次ターンの破綻を避ける)

6) ローカル永続化(SQLite)—実務向けの最小スキーマ

最低でもこの3つがあると「アプリ」になります。

  • conversations(id, title, created_at, updated_at, system_prompt_version, model)
  • messages(id, conversation_id, role, content, created_at)
  • settings(key, value)(JSONでOK)

そして起動時に

  • 前回の会話一覧表示
  • クリックで復元
    ができるようにする。

7) 履歴肥大対策(長期利用の必須)

ローカルで使い続けると、必ず履歴が重くなります。

現実的な戦略は2つ:

  1. 直近Nターンのみ送る(簡単・堅牢)
  2. 古い履歴は 要約して圧縮し、systemメモに格納して送る

“まず1)”で十分です。必要になったら2)を追加。


8) 構造化出力(業務アプリで事故を減らす)

対話アプリでも「ボタン操作」「フォーム入力」「設定変更」などをモデルに頼むなら、自由文より スキーマ固定が強いです。OpenAI は Structured Outputs(JSON Schema / Pydantic など)を案内しています。 (OpenAI Platform)

例:

  • 「次のアクションは search / summarize / draft / noop のどれか」
  • 「必要パラメータを必ず揃える」
    みたいな事故防止ができます。

9) 配布(完全ローカルなら最終的にここが壁)

  • PyInstaller(1ファイル/1フォルダ配布)
  • 設定ファイルとDBの置き場所(%APPDATA% / ~/.config
  • ログローテート(肥大化対策)

配布先が社内端末なら、ここまで整えると運用が回ります。


10) ここまでの“最短ロードマップ”

  1. CLIで疎通(SDK導入・Responsesで1回応答) (OpenAI Platform)
  2. UIだけ作る(入力欄/送信/ログ)
  3. ワーカー化してUIフリーズ解消
  4. ストリーミング対応(体験が完成) (OpenAI Platform)
  5. SQLite 永続化(会話一覧/復元)
  6. キャンセル/エラー処理/ログ
  7. 構造化出力(必要なら) (OpenAI Platform)

もしよければ、次のどちらで **“完成形のひな形一式”**を出します(コード+構成+SQLite+キャンセル+ストリーミングまで含める):

  • Tkinter 版(標準ライブラリ中心で軽い)
  • PySide6 版(業務アプリっぽいUIにしやすい)

どっちに寄せます?(迷うなら PySide6 推しですが、まず動かすなら Tkinter が速いです)

Tkinter 版(標準ライブラリ中心で軽い)

了解です。Tkinter(標準ライブラリ中心)+OpenAI Responses APIで「完全ローカル(APIだけ外部)」の対話アプリを作るなら、次の“最小で実務耐性がある”構成が鉄板です。

  • OpenAI 呼び出しは Responses API に一本化(output_text が使える/statefulにも拡張しやすい) (OpenAI Platform)
  • UI を固めない:worker thread → Queue → after()でUI更新
  • 永続化:SQLite(会話一覧・復元)
  • ストリーミング:SSEイベント(response.output_text.delta)で逐次表示 (OpenAI Platform)

以下、そのままコピペで動く「ひな形一式」です。


0) 依存関係と起動

pip install openai
setx OPENAI_API_KEY "YOUR_KEY"   # Windows例(PowerShellなら $env:OPENAI_API_KEY="...")
python main.py

OpenAI SDK は OPENAI_API_KEY を読むのが標準です。 (OpenAI Platform)


1) ディレクトリ構成

desktop_chat/
  main.py
  core/
    chat_service.py
    storage.py
    models.py

2) コード一式

core/models.py

from __future__ import annotations
from dataclasses import dataclass
from typing import Literal

Role = Literal["system", "user", "assistant"]

@dataclass
class Message:
    role: Role
    content: str

core/storage.py(SQLite 永続化)

from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable, Optional, Tuple

SCHEMA = """
PRAGMA journal_mode=WAL;

CREATE TABLE IF NOT EXISTS conversations (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  title TEXT NOT NULL,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

CREATE TABLE IF NOT EXISTS messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  conversation_id INTEGER NOT NULL,
  role TEXT NOT NULL,
  content TEXT NOT NULL,
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
  FOREIGN KEY(conversation_id) REFERENCES conversations(id)
);

CREATE INDEX IF NOT EXISTS idx_messages_conv_time
  ON messages(conversation_id, created_at);
"""

class Storage:
    def __init__(self, db_path: str = "chat.db"):
        self.path = Path(db_path)
        self.conn = sqlite3.connect(self.path, check_same_thread=False)
        self.conn.execute("PRAGMA foreign_keys=ON;")
        self.conn.executescript(SCHEMA)
        self.conn.commit()

    def create_conversation(self, title: str) -> int:
        cur = self.conn.cursor()
        cur.execute("INSERT INTO conversations(title) VALUES(?)", (title,))
        self.conn.commit()
        return int(cur.lastrowid)

    def touch_conversation(self, conversation_id: int) -> None:
        self.conn.execute(
            "UPDATE conversations SET updated_at=datetime('now') WHERE id=?",
            (conversation_id,),
        )
        self.conn.commit()

    def list_conversations(self, limit: int = 50) -> Iterable[Tuple[int, str, str]]:
        # id, title, updated_at
        cur = self.conn.cursor()
        cur.execute(
            "SELECT id, title, updated_at FROM conversations ORDER BY updated_at DESC LIMIT ?",
            (limit,),
        )
        return cur.fetchall()

    def add_message(self, conversation_id: int, role: str, content: str) -> None:
        self.conn.execute(
            "INSERT INTO messages(conversation_id, role, content) VALUES(?,?,?)",
            (conversation_id, role, content),
        )
        self.touch_conversation(conversation_id)

    def load_messages(self, conversation_id: int) -> list[tuple[str, str]]:
        cur = self.conn.cursor()
        cur.execute(
            "SELECT role, content FROM messages WHERE conversation_id=? ORDER BY id ASC",
            (conversation_id,),
        )
        return cur.fetchall()

    def close(self) -> None:
        self.conn.close()

core/chat_service.py(Responses API 呼び出し + ストリーミング)

from __future__ import annotations
import os
from typing import Iterable, List, Optional

from openai import OpenAI
from .models import Message

DEFAULT_SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"

class ChatService:
    """
    UIから独立した“LLM層”。
    - history: [Message(...), ...] を受け取り
    - streamならテキストdeltaを逐次yield
    - non-streamなら全文を返す
    """
    def __init__(self, model: str = "gpt-4o-mini", api_key: Optional[str] = None):
        self.client = OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
        self.model = model

    def ensure_system(self, history: List[Message]) -> List[Message]:
        if not history or history[0].role != "system":
            return [Message("system", DEFAULT_SYSTEM)] + history
        return history

    def create_once(self, history: List[Message]) -> str:
        history = self.ensure_system(history)
        resp = self.client.responses.create(
            model=self.model,
            input=[{"role": m.role, "content": m.content} for m in history],
        )
        # output_text はSDKの便利プロパティ(output配列のテキストを集約) :contentReference[oaicite:3]{index=3}
        return (resp.output_text or "").strip()

    def stream_text(self, history: List[Message]) -> Iterable[str]:
        """
        SSEイベントを逐次処理。代表的には event.type == 'response.output_text.delta' で event.delta が増分。 :contentReference[oaicite:4]{index=4}
        """
        history = self.ensure_system(history)
        stream = self.client.responses.create(
            model=self.model,
            input=[{"role": m.role, "content": m.content} for m in history],
            stream=True,
        )

        for event in stream:
            etype = getattr(event, "type", None)
            if etype == "response.output_text.delta":
                delta = getattr(event, "delta", "")
                if delta:
                    yield delta
            # 他イベント(開始/完了/エラー等)は必要に応じて拾う :contentReference[oaicite:5]{index=5}

main.py(Tkinter UI:非ブロッキング + キャンセル + 会話一覧)

from __future__ import annotations
import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText

from core.models import Message
from core.storage import Storage
from core.chat_service import ChatService, DEFAULT_SYSTEM

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Desktop Chat (Tkinter + OpenAI)")
        self.geometry("980x650")

        self.storage = Storage("chat.db")
        self.svc = ChatService(model="gpt-4o-mini")

        self.event_q: "queue.Queue[tuple[str, str | None]]" = queue.Queue()
        self.cancel_flag = threading.Event()
        self.worker: threading.Thread | None = None

        self.current_conv_id = self.storage.create_conversation("New chat")
        self.history: list[Message] = [Message("system", DEFAULT_SYSTEM)]

        self._build_ui()
        self._load_conversation_list()
        self.after(30, self._pump_events)

    def _build_ui(self):
        outer = ttk.Frame(self)
        outer.pack(fill=tk.BOTH, expand=True)

        # 左:会話一覧
        left = ttk.Frame(outer, width=260)
        left.pack(side=tk.LEFT, fill=tk.Y)

        ttk.Label(left, text="Conversations").pack(anchor="w", padx=8, pady=(8,2))

        self.conv_list = tk.Listbox(left, height=20)
        self.conv_list.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
        self.conv_list.bind("<<ListboxSelect>>", self.on_select_conversation)

        btns = ttk.Frame(left)
        btns.pack(fill=tk.X, padx=8, pady=6)

        ttk.Button(btns, text="New", command=self.on_new_chat).pack(side=tk.LEFT, fill=tk.X, expand=True)
        ttk.Button(btns, text="Reload", command=self._load_conversation_list).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(6,0))

        # 右:チャット
        right = ttk.Frame(outer)
        right.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        self.log = ScrolledText(right, wrap=tk.WORD)
        self.log.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)

        bottom = ttk.Frame(right)
        bottom.pack(fill=tk.X, padx=8, pady=(0,8))

        self.entry = ttk.Entry(bottom)
        self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
        self.entry.bind("<Return>", lambda e: self.on_send())

        self.send_btn = ttk.Button(bottom, text="Send", command=self.on_send)
        self.send_btn.pack(side=tk.LEFT, padx=(8,0))

        self.cancel_btn = ttk.Button(bottom, text="Cancel", command=self.on_cancel, state=tk.DISABLED)
        self.cancel_btn.pack(side=tk.LEFT, padx=(8,0))

    def _append(self, text: str):
        self.log.insert(tk.END, text)
        self.log.see(tk.END)

    def _load_conversation_list(self):
        self.conv_list.delete(0, tk.END)
        self.conv_rows = list(self.storage.list_conversations(limit=100))
        for (cid, title, updated_at) in self.conv_rows:
            self.conv_list.insert(tk.END, f"[{cid}] {title}  ({updated_at})")

        # いま開いてる会話を選択状態に
        for i, (cid, _, _) in enumerate(self.conv_rows):
            if cid == self.current_conv_id:
                self.conv_list.selection_set(i)
                self.conv_list.see(i)
                break

    def on_new_chat(self):
        if self._busy():
            return
        self.current_conv_id = self.storage.create_conversation("New chat")
        self.history = [Message("system", DEFAULT_SYSTEM)]
        self.log.delete("1.0", tk.END)
        self._load_conversation_list()

    def on_select_conversation(self, _evt=None):
        if self._busy():
            return
        sel = self.conv_list.curselection()
        if not sel:
            return
        idx = int(sel[0])
        cid = self.conv_rows[idx][0]
        self.current_conv_id = cid

        # DBから復元
        self.log.delete("1.0", tk.END)
        self.history = [Message("system", DEFAULT_SYSTEM)]
        rows = self.storage.load_messages(cid)
        for role, content in rows:
            if role == "user":
                self._append(f"you> {content}\n")
                self.history.append(Message("user", content))
            elif role == "assistant":
                self._append(f"ai> {content}\n")
                self.history.append(Message("assistant", content))

    def _busy(self) -> bool:
        return self.worker is not None and self.worker.is_alive()

    def on_cancel(self):
        self.cancel_flag.set()

    def on_send(self):
        if self._busy():
            return
        user = self.entry.get().strip()
        if not user:
            return
        self.entry.delete(0, tk.END)

        self._append(f"you> {user}\n")
        self.history.append(Message("user", user))
        self.storage.add_message(self.current_conv_id, "user", user)

        self.send_btn.configure(state=tk.DISABLED)
        self.cancel_btn.configure(state=tk.NORMAL)
        self.cancel_flag.clear()

        self.worker = threading.Thread(target=self._worker_stream, daemon=True)
        self.worker.start()

    def _worker_stream(self):
        try:
            self.event_q.put(("ai_start", None))

            buf = []
            for delta in self.svc.stream_text(self.history):
                if self.cancel_flag.is_set():
                    self.event_q.put(("ai_cancelled", None))
                    return
                buf.append(delta)
                self.event_q.put(("ai_delta", delta))

            full = "".join(buf).strip()
            if full:
                self.history.append(Message("assistant", full))
                self.storage.add_message(self.current_conv_id, "assistant", full)

            self.event_q.put(("ai_end", None))
        except Exception as e:
            self.event_q.put(("error", str(e)))

    def _pump_events(self):
        try:
            while True:
                kind, payload = self.event_q.get_nowait()
                if kind == "ai_start":
                    self._append("ai> ")
                elif kind == "ai_delta":
                    self._append(payload or "")
                elif kind == "ai_end":
                    self._append("\n")
                    self._finish_turn()
                elif kind == "ai_cancelled":
                    self._append("\n[Cancelled]\n")
                    self._finish_turn()
                elif kind == "error":
                    self._append(f"\n[ERROR] {payload}\n")
                    self._finish_turn()
        except queue.Empty:
            pass

        self.after(30, self._pump_events)

    def _finish_turn(self):
        self.send_btn.configure(state=tk.NORMAL)
        self.cancel_btn.configure(state=tk.DISABLED)
        self._load_conversation_list()

    def destroy(self):
        try:
            self.storage.close()
        finally:
            super().destroy()

if __name__ == "__main__":
    App().mainloop()

3) このひな形が “実務で効く”ポイント

  • UIが固まらない:OpenAI呼び出しはスレッド、UI更新は after() だけ
  • ストリーミングresponse.output_text.delta を拾って逐次描画(体感が大幅に良くなる) (OpenAI Platform)
  • output_text 前提の設計output 配列の構造に依存せず安全(SDK側が集約) (OpenAI Platform)
  • SQLiteで会話が残る:再起動しても続きから使える

4) 次に足すとさらに安定する(優先度順)

  1. タイムアウト/リトライ(ネットワーク瞬断対策)
  2. 履歴肥大の制御(直近Nターンのみ送信、または要約圧縮)
  3. 設定画面(モデル名、system prompt、保存先、ログレベル)
  4. Structured Outputs(ボタン操作やフォーム入力をモデルにやらせるなら事故防止に強い) (OpenAI Platform)

このままでも動きますが、あなたがベテランSEなら次に決めると設計が一気に締まります:

  • **「会話履歴は直近Nターン」**にしますか、それとも **「要約圧縮」**も最初から入れますか?(Nターン方式が運用楽です)

要約圧縮

了解です。要約圧縮を入れると「長期利用でも破綻しにくい」アプリになります。やり方はシンプルで、

  • 古い履歴をまとめて “会話サマリ” に圧縮
  • 次回以降はそのサマリを system(またはdeveloper相当) として先頭に付ける
  • DBにはサマリも保存(再起動しても継続)

という流れにします。

以下、いまの Tkinter ひな形に 最小改造で入れる実装を出します。


1) 方針(実装の肝)

いつ要約する?

  • 送信のたびに history の総文字数(またはメッセージ数)が閾値を超えたら要約
    • 例:合計 12,000 文字超で要約
  • あるいは「最新 N ターンは残し、残りを要約」にする
    • 例:直近 8 ターンは残す(user/assistant を 8 往復)

何を要約する?

  • system はそのまま
  • 直近 N ターンより前の user/assistant を要約対象にして、
    • summary_message = Message("system", "【会話要約】...") を 1 つ作る
  • 既存の要約があれば、新しい要約を 追記・統合(“要約の要約”)

2) core/chat_service.py に「要約関数」を追加

ChatServicesummarize() を足します(モデルは軽量でOK)。

# core/chat_service.py に追記

SUMMARY_PROMPT = """あなたは会話履歴を圧縮する要約エンジンです。
次の会話ログを、今後の会話を継続するための「保持すべき情報」に圧縮してください。

要件:
- 事実/前提/決定事項/制約/未解決タスク/ユーザーの意図 を優先
- 余談や重複は捨てる
- 日本語で簡潔に
- 出力形式:
  1) 前提・背景:
  2) 決定事項:
  3) 未解決/次のアクション:
  4) 用語・固有名詞:
"""

class ChatService:
    # 既存...

    def summarize_text(self, existing_summary: str, chunk_text: str) -> str:
        """
        existing_summary: 既にある要約(無ければ空)
        chunk_text: 今回まとめたい古い履歴(プレーンテキスト)
        """
        input_messages = [
            {"role": "system", "content": SUMMARY_PROMPT},
        ]
        if existing_summary.strip():
            input_messages.append({"role": "user", "content": f"既存の要約:\n{existing_summary}\n"})
        input_messages.append({"role": "user", "content": f"新しい会話ログ:\n{chunk_text}\n"})

        resp = self.client.responses.create(
            model="gpt-4o-mini",  # 要約は軽量で十分なことが多い
            input=input_messages,
        )
        return (resp.output_text or "").strip()

3) main.py に「要約圧縮」を組み込む

3-1) 閾値と直近保持ターン数を追加

App.__init__ のあたりに定数を置きます。

self.SUMMARY_MAX_CHARS = 12000   # 閾値(適宜調整)
self.KEEP_LAST_TURNS = 8         # 直近何往復残すか
self.summary_text = ""           # 現在の要約(DBから復元できるように後で保存)

3-2) DBに要約を保存する(最小は messages に role=”summary” を入れる)

いちばん改造が少ない方法は、messagesrole=”summary” を保存する方式です。
(スキーマ変更なしで動きます)

  • 保存:storage.add_message(conversation_id, "summary", summary_text)
  • 復元:load_messages() で role==”summary” を拾う

on_select_conversation() の復元処理を少し変更

# on_select_conversation 内の rows ループに追記
self.summary_text = ""
for role, content in rows:
    if role == "summary":
        self.summary_text = content  # 最新だけ使う(複数あるなら最後のものを採用)

そして、通常ログ表示は user/assistant だけでOK(summaryは表示しない or 表示するなら別UI)。

3-3) history の先頭に「要約」を挿入する関数を追加

main.py にユーティリティを作ります。

def build_effective_history(self) -> list[Message]:
    """
    OpenAIに送る履歴(system + summary + 直近メッセージ)
    """
    h = [Message("system", DEFAULT_SYSTEM)]
    if self.summary_text.strip():
        h.append(Message("system", "【会話要約】\n" + self.summary_text.strip()))
    # self.history は system含みで持っていたが、ここでは user/assistant のみでもOK
    for m in self.history:
        if m.role in ("user", "assistant"):
            h.append(m)
    return h

3-4) 送信前に要約圧縮を走らせる

on_send() で user を history に append したあと、worker を起動する前に要約判定を入れます。

def total_chars_of_history(self) -> int:
    return sum(len(m.content) for m in self.history if m.role in ("user","assistant"))

def maybe_summarize(self):
    # 閾値未満なら何もしない
    if self.total_chars_of_history() <= self.SUMMARY_MAX_CHARS:
        return

    # 直近 KEEP_LAST_TURNS 往復を残す: user/assistant のペアを数える
    ua = [m for m in self.history if m.role in ("user","assistant")]
    # 往復=2メッセージなので keep_msgs = KEEP_LAST_TURNS*2
    keep_msgs = self.KEEP_LAST_TURNS * 2
    if len(ua) <= keep_msgs:
        return

    old = ua[:-keep_msgs]
    recent = ua[-keep_msgs:]

    # old をプレーンテキスト化
    chunk = []
    for m in old:
        prefix = "USER" if m.role == "user" else "ASSISTANT"
        chunk.append(f"{prefix}: {m.content}")
    chunk_text = "\n".join(chunk)

    # 既存要約と統合して更新
    new_summary = self.svc.summarize_text(self.summary_text, chunk_text)
    self.summary_text = new_summary

    # DB保存(role=summary で1件追加)
    self.storage.add_message(self.current_conv_id, "summary", self.summary_text)

    # history は recent だけ残す(古い分を圧縮したため)
    self.history = recent

on_send() の中で、userメッセージ保存後に呼びます:

self.history.append(Message("user", user))
self.storage.add_message(self.current_conv_id, "user", user)

self.maybe_summarize()   # ← 追加

3-5) OpenAI呼び出しは「effective_history」を使う

_worker_stream()self.svc.stream_text(self.history) となっている部分を差し替えます:

effective = self.build_effective_history()
for delta in self.svc.stream_text(effective):
    ...

assistant全文を self.history.append(Message("assistant", full)) するのはそのままでOKです。


4) 運用で効くチューニング(最初からおすすめ)

  • **閾値は「メッセージ数」より「文字数」**が安定(日本語はトークン換算がブレやすいので)
  • KEEP_LAST_TURNS は 6〜12 くらいが扱いやすい
  • 要約は軽量モデルでOK(コスト/速度面)
  • 要約文は「曖昧さ」を減らすために、制約・決定・未決を必ず分ける(上のSUMMARY_PROMPTがそれ)

5) これで起きがちな罠(先に潰す)

  • 要約が肥大化する
    → 要約も一定長を超えたら「要約の要約」を作る(同じ summarize_text で existing_summary に突っ込むだけでOK)
  • 要約が“仕様”を改変する
    → systemの「不変ルール」は要約に混ぜない。systemは system のまま固定、要約は“事実と決定”中心に。

必要なら、ここまでの変更を反映した「完成版ファイル一式」をそのまま貼れる形でまとめ直します。
次はどっちを優先します?

  1. 要約の要約(summary肥大対策) も最初から入れる
  2. まずは今回の最小版で動かして、挙動を見て調整

まずは今回の最小版で動かして、挙動を見て調整

了解です。では「今回の最小版(要約圧縮つき)」として、そのままコピペで動く状態にまとめます。
(前回のひな形に対して、差分が少ない構成です)


1) 変更点まとめ(最小)

  • ChatServicesummarize_text() を追加(要約用)
  • main.py
    • summary_text(現在の要約)
    • maybe_summarize()(閾値超過時に要約して履歴を圧縮)
    • build_effective_history()(OpenAIに送る履歴=system + 要約 + 直近ログ)
  • SQLite は既存テーブルのまま、messages.role"summary" を追加して保存(スキーマ変更不要

2) コード一式(最小版)

core/chat_service.py(差し替え)

from __future__ import annotations
import os
from typing import Iterable, List, Optional

from openai import OpenAI
from .models import Message

DEFAULT_SYSTEM = "あなたは業務システム開発を支援するアシスタント。結論→理由→手順→注意点で簡潔に。"

SUMMARY_PROMPT = """あなたは会話履歴を圧縮する要約エンジンです。
次の会話ログを、今後の会話を継続するための「保持すべき情報」に圧縮してください。

要件:
- 事実/前提/決定事項/制約/未解決タスク/ユーザーの意図 を優先
- 余談や重複は捨てる
- 日本語で簡潔に
- 出力形式:
  1) 前提・背景:
  2) 決定事項:
  3) 未解決/次のアクション:
  4) 用語・固有名詞:
"""

class ChatService:
    def __init__(self, model: str = "gpt-4o-mini", api_key: Optional[str] = None):
        self.client = OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
        self.model = model

    def ensure_system(self, history: List[Message]) -> List[Message]:
        if not history or history[0].role != "system":
            return [Message("system", DEFAULT_SYSTEM)] + history
        return history

    def create_once(self, history: List[Message]) -> str:
        history = self.ensure_system(history)
        resp = self.client.responses.create(
            model=self.model,
            input=[{"role": m.role, "content": m.content} for m in history],
        )
        return (resp.output_text or "").strip()

    def stream_text(self, history: List[Message]) -> Iterable[str]:
        """
        event.type == 'response.output_text.delta' の delta を逐次拾う方式(代表例)。
        SDKのイベント形は更新され得るので、動かしながら必要に応じて調整してください。
        """
        history = self.ensure_system(history)
        stream = self.client.responses.create(
            model=self.model,
            input=[{"role": m.role, "content": m.content} for m in history],
            stream=True,
        )

        for event in stream:
            etype = getattr(event, "type", None)
            if etype == "response.output_text.delta":
                delta = getattr(event, "delta", "")
                if delta:
                    yield delta

    def summarize_text(self, existing_summary: str, chunk_text: str) -> str:
        """
        既存要約(existing_summary)があればそれを踏まえて統合しつつ、
        chunk_text(古い会話ログ)を圧縮して返す。
        """
        input_messages = [{"role": "system", "content": SUMMARY_PROMPT}]
        if existing_summary.strip():
            input_messages.append({"role": "user", "content": f"既存の要約:\n{existing_summary}\n"})
        input_messages.append({"role": "user", "content": f"新しい会話ログ:\n{chunk_text}\n"})

        resp = self.client.responses.create(
            model=self.model,   # 要約も同じモデルでOK。軽量モデルにしても良い
            input=input_messages,
        )
        return (resp.output_text or "").strip()

main.py(差し替え)

from __future__ import annotations
import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText

from core.models import Message
from core.storage import Storage
from core.chat_service import ChatService, DEFAULT_SYSTEM

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Desktop Chat (Tkinter + OpenAI) - Summary Compression")
        self.geometry("980x650")

        self.storage = Storage("chat.db")
        self.svc = ChatService(model="gpt-4o-mini")

        self.event_q: "queue.Queue[tuple[str, str | None]]" = queue.Queue()
        self.cancel_flag = threading.Event()
        self.worker: threading.Thread | None = None

        self.current_conv_id = self.storage.create_conversation("New chat")

        # 履歴(systemは固定、user/assistantのみここに積む)
        self.history: list[Message] = []
        self.summary_text: str = ""

        # 要約トリガ(最小設定)
        self.SUMMARY_MAX_CHARS = 12000   # 古い履歴の合計文字数がこれを超えたら要約
        self.KEEP_LAST_TURNS = 8         # 直近何往復残すか(user/assistantで1往復)

        self._build_ui()
        self._load_conversation_list()
        self.after(30, self._pump_events)

    def _build_ui(self):
        outer = ttk.Frame(self)
        outer.pack(fill=tk.BOTH, expand=True)

        # 左:会話一覧
        left = ttk.Frame(outer, width=260)
        left.pack(side=tk.LEFT, fill=tk.Y)

        ttk.Label(left, text="Conversations").pack(anchor="w", padx=8, pady=(8,2))

        self.conv_list = tk.Listbox(left, height=20)
        self.conv_list.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
        self.conv_list.bind("<<ListboxSelect>>", self.on_select_conversation)

        btns = ttk.Frame(left)
        btns.pack(fill=tk.X, padx=8, pady=6)

        ttk.Button(btns, text="New", command=self.on_new_chat).pack(side=tk.LEFT, fill=tk.X, expand=True)
        ttk.Button(btns, text="Reload", command=self._load_conversation_list).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(6,0))

        # 右:チャット
        right = ttk.Frame(outer)
        right.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        self.log = ScrolledText(right, wrap=tk.WORD)
        self.log.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)

        bottom = ttk.Frame(right)
        bottom.pack(fill=tk.X, padx=8, pady=(0,8))

        self.entry = ttk.Entry(bottom)
        self.entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
        self.entry.bind("<Return>", lambda e: self.on_send())

        self.send_btn = ttk.Button(bottom, text="Send", command=self.on_send)
        self.send_btn.pack(side=tk.LEFT, padx=(8,0))

        self.cancel_btn = ttk.Button(bottom, text="Cancel", command=self.on_cancel, state=tk.DISABLED)
        self.cancel_btn.pack(side=tk.LEFT, padx=(8,0))

    def _append(self, text: str):
        self.log.insert(tk.END, text)
        self.log.see(tk.END)

    def _busy(self) -> bool:
        return self.worker is not None and self.worker.is_alive()

    # ---- 会話一覧 / 復元 ----
    def _load_conversation_list(self):
        self.conv_list.delete(0, tk.END)
        self.conv_rows = list(self.storage.list_conversations(limit=100))
        for (cid, title, updated_at) in self.conv_rows:
            self.conv_list.insert(tk.END, f"[{cid}] {title}  ({updated_at})")

        for i, (cid, _, _) in enumerate(self.conv_rows):
            if cid == self.current_conv_id:
                self.conv_list.selection_set(i)
                self.conv_list.see(i)
                break

    def on_new_chat(self):
        if self._busy():
            return
        self.current_conv_id = self.storage.create_conversation("New chat")
        self.history = []
        self.summary_text = ""
        self.log.delete("1.0", tk.END)
        self._load_conversation_list()

    def on_select_conversation(self, _evt=None):
        if self._busy():
            return
        sel = self.conv_list.curselection()
        if not sel:
            return
        idx = int(sel[0])
        cid = self.conv_rows[idx][0]
        self.current_conv_id = cid

        self.log.delete("1.0", tk.END)
        self.history = []
        self.summary_text = ""

        rows = self.storage.load_messages(cid)

        # summaryは最後の1個を採用(複数保存されていても良い)
        for role, content in rows:
            if role == "summary":
                self.summary_text = content

        # user/assistant を表示・復元
        for role, content in rows:
            if role == "user":
                self._append(f"you> {content}\n")
                self.history.append(Message("user", content))
            elif role == "assistant":
                self._append(f"ai> {content}\n")
                self.history.append(Message("assistant", content))

    # ---- 要約圧縮ロジック ----
    def total_chars_of_history(self) -> int:
        return sum(len(m.content) for m in self.history if m.role in ("user", "assistant"))

    def build_effective_history(self) -> list[Message]:
        h = [Message("system", DEFAULT_SYSTEM)]
        if self.summary_text.strip():
            h.append(Message("system", "【会話要約】\n" + self.summary_text.strip()))
        h.extend([m for m in self.history if m.role in ("user", "assistant")])
        return h

    def maybe_summarize(self):
        # 閾値未満なら何もしない
        if self.total_chars_of_history() <= self.SUMMARY_MAX_CHARS:
            return

        ua = [m for m in self.history if m.role in ("user", "assistant")]
        keep_msgs = self.KEEP_LAST_TURNS * 2  # 1往復=2メッセージ
        if len(ua) <= keep_msgs:
            return

        old = ua[:-keep_msgs]
        recent = ua[-keep_msgs:]

        chunk_lines = []
        for m in old:
            prefix = "USER" if m.role == "user" else "ASSISTANT"
            chunk_lines.append(f"{prefix}: {m.content}")
        chunk_text = "\n".join(chunk_lines)

        # 既存要約と統合して更新
        new_summary = self.svc.summarize_text(self.summary_text, chunk_text)
        self.summary_text = new_summary

        # DB保存(role=summary で追加)
        self.storage.add_message(self.current_conv_id, "summary", self.summary_text)

        # メモリ上の履歴は recent だけ残す(古いのは要約に圧縮済み)
        self.history = recent

    # ---- 送信/キャンセル ----
    def on_cancel(self):
        self.cancel_flag.set()

    def on_send(self):
        if self._busy():
            return
        user = self.entry.get().strip()
        if not user:
            return
        self.entry.delete(0, tk.END)

        self._append(f"you> {user}\n")
        self.history.append(Message("user", user))
        self.storage.add_message(self.current_conv_id, "user", user)

        # ここで要約判定(最小版)
        self.maybe_summarize()

        self.send_btn.configure(state=tk.DISABLED)
        self.cancel_btn.configure(state=tk.NORMAL)
        self.cancel_flag.clear()

        self.worker = threading.Thread(target=self._worker_stream, daemon=True)
        self.worker.start()

    def _worker_stream(self):
        try:
            self.event_q.put(("ai_start", None))

            buf = []
            effective = self.build_effective_history()

            for delta in self.svc.stream_text(effective):
                if self.cancel_flag.is_set():
                    self.event_q.put(("ai_cancelled", None))
                    return
                buf.append(delta)
                self.event_q.put(("ai_delta", delta))

            full = "".join(buf).strip()
            if full:
                self.history.append(Message("assistant", full))
                self.storage.add_message(self.current_conv_id, "assistant", full)

            self.event_q.put(("ai_end", None))
        except Exception as e:
            self.event_q.put(("error", str(e)))

    # ---- UIイベントポンプ ----
    def _pump_events(self):
        try:
            while True:
                kind, payload = self.event_q.get_nowait()
                if kind == "ai_start":
                    self._append("ai> ")
                elif kind == "ai_delta":
                    self._append(payload or "")
                elif kind == "ai_end":
                    self._append("\n")
                    self._finish_turn()
                elif kind == "ai_cancelled":
                    self._append("\n[Cancelled]\n")
                    self._finish_turn()
                elif kind == "error":
                    self._append(f"\n[ERROR] {payload}\n")
                    self._finish_turn()
        except queue.Empty:
            pass

        self.after(30, self._pump_events)

    def _finish_turn(self):
        self.send_btn.configure(state=tk.NORMAL)
        self.cancel_btn.configure(state=tk.DISABLED)
        self._load_conversation_list()

    def destroy(self):
        try:
            self.storage.close()
        finally:
            super().destroy()

if __name__ == "__main__":
    App().mainloop()

3) 動作確認の観点(最小版で見るべき挙動)

  1. しばらく会話して、履歴が増えたら
    • 返答が遅くなりにくいか(要約で軽くなるか)
  2. 要約が走った後でも
    • 文脈が維持されるか(決定事項・制約が残るか)
  3. chat.db が増える(role=summary が入る)
    • 会話を切り替えても summary が復元されるか

4) 次の調整ポイント(挙動を見てからでOK)

  • SUMMARY_MAX_CHARS を上げ下げ(要約の頻度)
  • KEEP_LAST_TURNS を増減(直近文脈の保持)
  • SUMMARY_PROMPT の出力フォーマットをあなた好みに(“仕様書メモ”風にするなど)

動かしてみて、もし

  • 「要約が早すぎる/遅すぎる」
  • 「要約後に会話がズレる」
  • 「要約が長くなりすぎる」
    のどれかが出たら、その症状に合わせて 最小差分の改善案(要約の要約/要約を構造化/要約に“固定ルール”を入れる等)を出します。
よかったらシェアしてね!
目次