"""AI Personal Diary Plugin
This plugin manages synth's personal diary entries where synth records
what he says to users, his emotions, and his personal thoughts about interactions.
This creates a more human-like memory system where synth builds his persona
and remembers his relationships with users in a personal way.
"""
from __future__ import annotations
import json
from datetime import datetime, timedelta
from typing import Any, List, Dict
import asyncio
import aiomysql
from contextlib import asynccontextmanager
from core.db import get_conn_ctx
from core.logging_utils import log_error, log_info, log_debug, log_warning
# Injection priority for diary entries
INJECTION_PRIORITY = 8 # Low priority - diary is sacrificial
def register_injection_priority():
"""Register this component's injection priority."""
log_info(f"[ai_diary] Registered injection priority: {INJECTION_PRIORITY}")
return INJECTION_PRIORITY
# Register priority when module is loaded
register_injection_priority()
from core.core_initializer import register_plugin
from core.config import get_active_cortex_engine
from core.cortex_registry import get_cortex_registry
# Global flag to track if the plugin is enabled
PLUGIN_ENABLED = True
# Diary-specific configuration
DIARY_CONFIG = {
"diary_injection_file": "synth_diary.json",
"diary_injection_enabled": True,
"diary_allocation_percentage": 30, # Increased from 15% to utilize more available prompt space
"max_static_injection_chars": 30000, # Reduced for free ChatGPT limits
"fallback_diary_chars": 10000, # Reduced for free ChatGPT limits
"default_days": 7, # Default number of days to look back for diary entries
"min_space_threshold": 0.75, # Include diary only if we're using less than 75% of prompt space
"diary_entry_structure": "auto", # auto-select based on available space
"diary_sort_order": "descending", # newest first
"diary_filter_strategy": "most_recent", # strategy for selecting entries when space is limited
"diary_tag_priority": ["important", "daily", "thoughts"], # prioritize these tags
"enable_diary_char_logging": True, # Enhanced logging for debugging
}
def get_diary_config(interface_name: str) -> dict:
"""Get diary configuration for a specific interface."""
return DIARY_CONFIG
def normalize_interface_name(interface: str) -> str:
"""Normalize interface name for consistent diary entries."""
if not interface or interface.lower() == "unknown":
return "unknown"
# Normalize telegram interfaces
if "telegram" in interface.lower() or "telethon" in interface.lower():
return "telegram"
# Normalize discord interfaces
if "discord" in interface.lower():
return "discord"
# Other specific interfaces
interface_mapping = {
"webui": "webui",
"web": "webui",
"x_interface": "x",
"twitter": "x",
"reddit_interface": "reddit",
"cli": "manual",
"manual": "manual",
}
normalized = interface_mapping.get(interface.lower(), interface.lower())
return normalized
def get_max_diary_chars(
interface_name: str = None,
current_prompt_length: int = 0,
context_memory: dict = None,
) -> int:
"""Calculate how many characters can be allocated to diary injection based on active LLM interface limits.
Args:
interface_name: Name of the interface
current_prompt_length: Current length of the prompt
context_memory: Context dictionary that may contain maximize_diary flag for memory-focused operations
"""
try:
# Get limits directly from the active LLM engine
from core.config import get_active_cortex_engine
from core.cortex_registry import get_cortex_registry
import asyncio
# Handle async get_active_cortex_engine call safely
active_cortex = None
try:
# Try to get the event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in an async context, need to handle differently
log_debug(
"[ai_diary] Already in async context, using sync fallback for get_active_cortex_engine"
)
# Use a simple fallback since we can't await here
active_cortex = "manual" # Safe fallback
else:
active_cortex = loop.run_until_complete(get_active_cortex_engine())
except RuntimeError:
# No event loop exists, create one
try:
active_cortex = asyncio.run(get_active_cortex_engine())
except Exception as e:
log_debug(f"[ai_diary] Could not get active Cortex engine: {e}")
active_cortex = "manual" # Safe fallback
except Exception as e:
log_debug(f"[ai_diary] Error in async handling: {e}")
active_cortex = "manual" # Safe fallback
if not active_cortex or active_cortex == "manual":
log_debug("[ai_diary] Using manual fallback limits")
return 128001 # Safe fallback
registry = get_cortex_registry()
engine = registry.get_engine(active_cortex)
if not engine:
engine = registry.load_engine(active_cortex)
# Get limits from the active engine
max_prompt_chars = 128001 # Safe fallback default
if engine and hasattr(engine, "get_interface_limits"):
try:
limits = engine.get_interface_limits()
max_prompt_chars = limits.get("max_prompt_chars", 128001)
except Exception:
pass
# Check if this is a memory-focused operation (e.g., Grillo memory consolidation beat)
maximize_diary = False
if context_memory and isinstance(context_memory, dict):
maximize_diary = context_memory.get("maximize_diary", False)
# Use 80% for memory consolidation, 30% for normal operations
diary_percentage = 0.80 if maximize_diary else 0.30
diary_limit = int(max_prompt_chars * diary_percentage)
# Consider current prompt length
available_space = max_prompt_chars - current_prompt_length
diary_allocation = min(
diary_limit, max(available_space * (0.9 if maximize_diary else 0.5), 5000)
) # Higher % when maximizing
mode = "MAXIMIZED (80%)" if maximize_diary else "standard (30%)"
log_info(
f"[ai_diary] Diary allocation {mode}: {diary_allocation} chars (max: {max_prompt_chars}, used: {current_prompt_length})"
)
return max(diary_allocation, 5000) # Minimum 5k chars
except Exception as e:
log_warning(f"[ai_diary] Error calculating diary limit: {e}")
return 8001 # Fallback
async def _run_sync_async(coro):
"""Run async function, handling all cases without creating new event loops."""
try:
# Get current running loop if available
loop = asyncio.get_running_loop()
# Just run the coroutine directly - we have a running loop
return await coro
except RuntimeError:
# No running loop, just run the coroutine directly
return await coro
def _run_sync(coro):
"""Helper to run async functions in sync context with better error handling."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in an async context, schedule coroutine on the running loop from this thread
# This avoids creating a new event loop
return asyncio.run_coroutine_threadsafe(coro, loop).result(timeout=5.0)
else:
return loop.run_until_complete(coro)
except RuntimeError:
# No event loop, create one
try:
return asyncio.run(coro)
except Exception as e:
log_debug(f"[ai_diary] Error in asyncio.run: {e}")
return None
except Exception as e:
log_debug(f"[ai_diary] Unexpected error in _run_sync: {e}")
return None
def should_include_diary(
interface_name: str, current_prompt_length: int = 0, max_prompt_chars: int = 0
) -> bool:
"""Determine if diary should be included based on available space."""
# Try to get max_prompt_chars from active LLM if not provided
if max_prompt_chars <= 0:
try:
active_cortex = _run_sync(get_active_cortex_engine())
# Check that active_cortex is not None before proceeding
if not active_cortex:
log_debug(
"[ai_diary] Active Cortex engine is None, skipping limits lookup"
)
return True # Conservative: include diary if we can't determine LLM
registry = get_cortex_registry()
engine = registry.get_engine(active_cortex)
if not engine:
engine = registry.load_engine(active_cortex)
if engine and hasattr(engine, "get_max_prompt_chars"):
max_prompt_chars = engine.get_max_prompt_chars()
log_debug(
f"[ai_diary] Got max_prompt_chars from Cortex {active_cortex}: {max_prompt_chars}"
)
except Exception as e:
log_debug(f"[ai_diary] Could not get Cortex limits: {e}")
return True # Conservative: include diary if we can't determine limits
if max_prompt_chars <= 0:
# No prompt limit info, use conservative approach
return True
usage_ratio = current_prompt_length / max_prompt_chars
# Include diary if we're using less than threshold of available space
should_include = usage_ratio < DIARY_CONFIG["min_space_threshold"]
log_debug(
f"[ai_diary] Prompt usage: {current_prompt_length}/{max_prompt_chars} ({usage_ratio:.2%}), include_diary: {should_include}"
)
return should_include
@asynccontextmanager
async def get_db():
"""Context manager for MariaDB database connections."""
async with get_conn_ctx() as conn:
try:
log_debug("[ai_diary] Opened database connection")
yield conn
except Exception as e:
log_error(f"[ai_diary] Database error: {e}")
raise
async def init_diary_table():
"""Initialize all AI diary related tables if they don't exist."""
async with get_db() as conn:
cursor = await conn.cursor()
# Main ai_diary table - redesigned for personal diary entries
await cursor.execute("""
CREATE TABLE IF NOT EXISTS ai_diary (
id INT AUTO_INCREMENT PRIMARY KEY,
content TEXT NOT NULL COMMENT 'What synth said/did in the interaction',
personal_thought TEXT COMMENT 'synth personal reflection about the interaction',
emotions TEXT DEFAULT '[]' COMMENT 'synth emotions about this interaction',
interaction_summary TEXT COMMENT 'Brief summary of what happened',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
interface VARCHAR(50),
chat_id VARCHAR(255),
thread_id VARCHAR(255),
user_message TEXT COMMENT 'What the user said that triggered this response',
context_tags TEXT DEFAULT '[]' COMMENT 'Tags about the context/topic',
involved_users TEXT DEFAULT '[]' COMMENT 'JSON list of users involved in the interaction',
INDEX idx_timestamp (timestamp),
INDEX idx_interface_chat (interface, chat_id)
)
""")
# Ensure involved_users column exists (migration for existing tables)
try:
await cursor.execute("""
ALTER TABLE ai_diary ADD COLUMN involved_users TEXT DEFAULT '[]' COMMENT 'JSON list of users involved in the interaction'
""")
log_info("[ai_diary] Added missing involved_users column to ai_diary table")
except Exception as e:
# Column might already exist, that's fine
if "Duplicate column name" not in str(e):
log_debug(f"[ai_diary] Column migration check: {e}")
# Legacy memories table (moved from core)
await cursor.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL,
content TEXT NOT NULL,
author VARCHAR(100),
source VARCHAR(100),
tags TEXT,
scope VARCHAR(50),
emotion VARCHAR(50),
intensity INT,
emotion_state VARCHAR(50)
)
""")
# Legacy emotion_diary table (moved from core)
await cursor.execute("""
CREATE TABLE IF NOT EXISTS emotion_diary (
id VARCHAR(100) PRIMARY KEY,
source VARCHAR(100),
event TEXT,
emotion VARCHAR(50),
intensity INT,
state VARCHAR(50),
trigger_condition TEXT,
decision_logic TEXT,
next_check DATETIME
)
""")
# Archive table for archived diary entries
await cursor.execute("""
CREATE TABLE IF NOT EXISTS ai_diary_archive (
id INT AUTO_INCREMENT PRIMARY KEY,
content TEXT NOT NULL COMMENT 'What synth said/did in the interaction',
personal_thought TEXT COMMENT 'synth personal reflection about the interaction',
emotions TEXT DEFAULT '[]' COMMENT 'synth emotions about this interaction',
interaction_summary TEXT COMMENT 'Brief summary of what happened',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
interface VARCHAR(50),
chat_id VARCHAR(255),
thread_id VARCHAR(255),
user_message TEXT COMMENT 'What the user said that triggered this response',
context_tags TEXT DEFAULT '[]' COMMENT 'Tags about the context/topic',
involved_users TEXT DEFAULT '[]' COMMENT 'JSON list of users involved in the interaction',
INDEX idx_timestamp (timestamp),
INDEX idx_interface_chat (interface, chat_id)
)
""")
await conn.commit()
log_info("[ai_diary] AI diary tables initialized")
[docs]
async def recreate_diary_table():
"""Drop and recreate the ai_diary table with the new structure (DEV ONLY)."""
async with get_db() as conn:
cursor = await conn.cursor()
log_warning("[ai_diary] DROPPING and recreating ai_diary table (DEV MODE)")
# Drop the existing table
await cursor.execute("DROP TABLE IF EXISTS ai_diary")
# Recreate with new structure
await cursor.execute("""
CREATE TABLE ai_diary (
id INT AUTO_INCREMENT PRIMARY KEY,
content TEXT NOT NULL COMMENT 'What synth said/did in the interaction',
personal_thought TEXT COMMENT 'synth personal reflection about the interaction',
emotions TEXT DEFAULT '[]' COMMENT 'synth emotions about this interaction',
interaction_summary TEXT COMMENT 'Brief summary of what happened',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
interface VARCHAR(50),
chat_id VARCHAR(255),
thread_id VARCHAR(255),
user_message TEXT COMMENT 'What the user said that triggered this response',
context_tags TEXT DEFAULT '[]' COMMENT 'Tags about the context/topic',
INDEX idx_timestamp (timestamp),
INDEX idx_interface_chat (interface, chat_id)
)
""")
await conn.commit()
log_info(
"[ai_diary] ai_diary table recreated with new personal diary structure"
)
def _run(coro):
"""Run a coroutine safely even if an event loop is already running."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in async context, use executor to avoid creating new loop
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coro)
return future.result(timeout=10.0)
else:
# Event loop exists but not running, use run_until_complete
return loop.run_until_complete(coro)
except RuntimeError:
# No event loop at all - this is the only safe place to use asyncio.run()
return asyncio.run(coro)
except Exception as e:
log_debug(f"[ai_diary] Error in _run: {e}")
return None
async def _execute(query: str, params: tuple = ()):
"""Execute a database query and return the cursor.
Returning the cursor allows callers to read `lastrowid` and `rowcount`.
"""
async with get_db() as conn:
cursor = await conn.cursor()
await cursor.execute(query, params)
await conn.commit()
return cursor
async def _fetchall(query: str, params: tuple = ()) -> List[Dict]:
"""Fetch all results from a database query."""
async with get_db() as conn:
cursor = await conn.cursor(aiomysql.DictCursor)
await cursor.execute(query, params)
return await cursor.fetchall()
def _merge_json_list(existing_json: str | None, new_items: list) -> list:
"""Merge a JSON-encoded list with new_items, preserving order and deduplicating strings."""
try:
existing: list = json.loads(existing_json) if existing_json else []
except Exception:
existing = []
seen: set = {str(item) for item in existing}
combined = list(existing)
for item in new_items or []:
if str(item) not in seen:
combined.append(item)
seen.add(str(item))
return combined
async def _upsert_diary_impl(
content: str,
personal_thought: str | None,
emotions: list,
interaction_summary: str | None,
user_message: str | None,
context_tags: list,
involved_users: list,
interface: str | None,
chat_id: str | None,
thread_id: str | None,
) -> int | None:
"""Core upsert: one diary row per calendar day, updated on every new entry.
If a row already exists for today (by server date) the new content is appended
to the existing row using a '---' separator. JSON list fields (emotions,
context_tags, involved_users) are merged and deduplicated. The row's timestamp
is updated to NOW() so it reflects the last-modified time.
Returns the diary entry id (existing or newly inserted), or None on error.
"""
_SEP = "\n\n---\n\n"
try:
async with get_db() as conn:
cursor = await conn.cursor()
# Look for today's entry
await cursor.execute(
"SELECT id, content, personal_thought, interaction_summary, "
"user_message, emotions, context_tags, involved_users "
"FROM ai_diary WHERE DATE(timestamp) = CURDATE() "
"ORDER BY timestamp DESC LIMIT 1"
)
existing = await cursor.fetchone()
if existing:
(
ex_id,
ex_content,
ex_thought,
ex_summary,
ex_user_msg,
ex_emotions,
ex_tags,
ex_involved,
) = existing
merged_content = (
f"{ex_content}{_SEP}{content}" if ex_content else content
)
merged_thought = (
f"{ex_thought}{_SEP}{personal_thought}"
if ex_thought and personal_thought
else (personal_thought or ex_thought)
)
merged_summary = (
f"{ex_summary}\n---\n{interaction_summary}"
if ex_summary and interaction_summary
else (interaction_summary or ex_summary)
)
merged_user_msg = (
f"{ex_user_msg}\n---\n{user_message}"
if ex_user_msg and user_message
else (user_message or ex_user_msg)
)
await cursor.execute(
"""
UPDATE ai_diary
SET content=%s, personal_thought=%s, interaction_summary=%s,
user_message=%s, emotions=%s, context_tags=%s,
involved_users=%s, timestamp=NOW()
WHERE id=%s
""",
(
merged_content,
merged_thought,
merged_summary,
merged_user_msg,
json.dumps(_merge_json_list(ex_emotions, emotions)),
json.dumps(_merge_json_list(ex_tags, context_tags)),
json.dumps(_merge_json_list(ex_involved, involved_users)),
ex_id,
),
)
await conn.commit()
log_debug(
f"[ai_diary] Updated today's diary entry id={ex_id}: {content[:50]}..."
)
return ex_id
else:
await cursor.execute(
"""
INSERT INTO ai_diary
(content, personal_thought, emotions, interaction_summary,
user_message, context_tags, involved_users,
interface, chat_id, thread_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
content,
personal_thought,
json.dumps(emotions),
interaction_summary,
user_message,
json.dumps(context_tags),
json.dumps(involved_users),
interface,
chat_id,
thread_id,
),
)
await conn.commit()
diary_entry_id = cursor.lastrowid
log_debug(
f"[ai_diary] Created new diary entry for today: {content[:50]}..."
)
return diary_entry_id
except Exception as e:
log_error(f"[ai_diary] _upsert_diary_impl failed: {e}")
return None
[docs]
def add_diary_entry(
content: str,
personal_thought: str = None,
emotions: List[Dict[str, Any]] = None,
interaction_summary: str = None,
user_message: str = None,
context_tags: List[str] = None,
involved_users: List[str] = None,
interface: str = None,
chat_id: str = None,
thread_id: str = None,
grillo_activity_log_id: int = None,
) -> None:
"""Add a new personal diary entry where synth records what he said and how he feels.
Args:
content: What synth said/did in the interaction
personal_thought: synth's personal reflection about this interaction
emotions: List of emotions synth felt during this interaction
interaction_summary: Brief summary of what happened
user_message: What the user said that triggered this response
context_tags: Tags about the context/topic (e.g., ['food', 'cars', 'personal'])
involved_users: List of user names involved in this interaction (from bio system)
interface: Interface used (telegram_bot, discord, etc.)
chat_id: Chat identifier
thread_id: Thread identifier
"""
global PLUGIN_ENABLED
# Attempt lazy initialization if plugin was disabled at startup
if not PLUGIN_ENABLED:
try:
log_debug("[ai_diary] Attempting lazy initialization of plugin (sync)...")
_run(_execute("SELECT 1 FROM ai_diary LIMIT 1"))
PLUGIN_ENABLED = True
log_info("[ai_diary] Plugin lazy-initialized successfully (sync)")
except Exception as init_error:
log_debug(
f"[ai_diary] Lazy initialization failed (sync): {init_error}, attempting table creation..."
)
try:
_run(
_execute("""
CREATE TABLE IF NOT EXISTS ai_diary (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
content LONGTEXT,
personal_thought TEXT,
emotions JSON,
interaction_summary TEXT,
user_message TEXT,
context_tags JSON,
involved_users JSON,
interface VARCHAR(50),
chat_id VARCHAR(100),
thread_id VARCHAR(100)
)
""")
)
PLUGIN_ENABLED = True
log_info(
"[ai_diary] Plugin table created and enabled successfully via lazy init (sync)"
)
except Exception as create_error:
log_error(
f"[ai_diary] Failed to create table during lazy init (sync): {create_error}"
)
return
if not PLUGIN_ENABLED:
return
if not content.strip():
return
emotions = emotions or []
context_tags = context_tags or []
involved_users = involved_users or []
# Normalize interface name for consistency
interface = normalize_interface_name(interface)
# Validate emotions format
for emotion in emotions:
if not isinstance(emotion, dict) or "type" not in emotion:
log_warning(f"[ai_diary] Invalid emotion format: {emotion}")
continue
try:
diary_entry_id = _run(
_upsert_diary_impl(
content,
personal_thought,
emotions,
interaction_summary,
user_message,
context_tags,
involved_users,
interface,
chat_id,
thread_id,
)
)
if diary_entry_id is not None:
log_debug(
f"[ai_diary] Upserted today's diary entry id={diary_entry_id}: {content[:50]}..."
)
if personal_thought:
log_debug(f"[ai_diary] Personal thought: {personal_thought[:50]}...")
# Link to grillo activity log if this entry was created from a grillo beat
if grillo_activity_log_id and diary_entry_id:
try:
import asyncio
try:
from plugins.grillo.grillo_impl import GrilloPlugin
except ImportError:
# Fallback if direct import fails (e.g. structure change)
from plugins.grillo_plugin import GrilloPlugin
if GrilloPlugin:
asyncio.create_task(
GrilloPlugin.link_diary_entry_to_activity(
grillo_activity_log_id,
diary_entry_id,
response_text=content,
)
)
log_debug(
f"[ai_diary] Scheduled grillo activity link: activity_log={grillo_activity_log_id}, diary={diary_entry_id}"
)
else:
log_warning("[ai_diary] GrilloPlugin not available for linking")
except Exception as link_error:
log_warning(f"[ai_diary] Failed to link grillo activity: {link_error}")
except Exception as e:
log_error(f"[ai_diary] Failed to add diary entry: {e}")
# Disable plugin if database is unavailable
PLUGIN_ENABLED = False
async def add_diary_entry_async(
content: str,
personal_thought: str = None,
emotions: List[Dict[str, Any]] = None,
interaction_summary: str = None,
user_message: str = None,
context_tags: List[str] = None,
involved_users: List[str] = None,
interface: str = None,
chat_id: str = None,
thread_id: str = None,
grillo_activity_log_id: int = None,
) -> None:
"""Add a new personal diary entry (async version). Safe to call even if plugin is disabled."""
global PLUGIN_ENABLED
# Attempt lazy initialization if plugin was disabled at startup
if not PLUGIN_ENABLED:
try:
log_debug("[ai_diary] Attempting lazy initialization of plugin...")
await _execute("SELECT 1 FROM ai_diary LIMIT 1")
PLUGIN_ENABLED = True
log_info("[ai_diary] Plugin lazy-initialized successfully")
except Exception as init_error:
log_debug(
f"[ai_diary] Lazy initialization failed: {init_error}, attempting table creation..."
)
try:
await _execute("""
CREATE TABLE IF NOT EXISTS ai_diary (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
content LONGTEXT,
personal_thought TEXT,
emotions JSON,
interaction_summary TEXT,
user_message TEXT,
context_tags JSON,
involved_users JSON,
interface VARCHAR(50),
chat_id VARCHAR(100),
thread_id VARCHAR(100)
)
""")
PLUGIN_ENABLED = True
log_info(
"[ai_diary] Plugin table created and enabled successfully via lazy init"
)
except Exception as create_error:
log_error(
f"[ai_diary] Failed to create table during lazy init: {create_error}"
)
return
if not PLUGIN_ENABLED:
return
if not content.strip():
return
emotions = emotions or []
context_tags = context_tags or []
involved_users = involved_users or []
# Normalize interface name for consistency
interface = normalize_interface_name(interface)
# Validate emotions format
for emotion in emotions:
if not isinstance(emotion, dict) or "type" not in emotion:
log_warning(f"[ai_diary] Invalid emotion format: {emotion}")
continue
try:
diary_entry_id = await _upsert_diary_impl(
content,
personal_thought,
emotions,
interaction_summary,
user_message,
context_tags,
involved_users,
interface,
chat_id,
thread_id,
)
if diary_entry_id is not None:
log_debug(
f"[ai_diary] Upserted today's diary entry id={diary_entry_id}: {content[:50]}..."
)
if personal_thought:
log_debug(f"[ai_diary] Personal thought: {personal_thought[:50]}...")
# Link to grillo activity log if this entry was created from a grillo beat
if grillo_activity_log_id and diary_entry_id:
try:
from plugins.grillo_plugin import GrilloPlugin
await GrilloPlugin.link_diary_entry_to_activity(
grillo_activity_log_id,
diary_entry_id,
response_text=content,
)
log_debug(
f"[ai_diary] Linked grillo activity: activity_log={grillo_activity_log_id}, diary={diary_entry_id}"
)
except Exception as link_error:
log_warning(f"[ai_diary] Failed to link grillo activity: {link_error}")
except Exception as e:
log_error(f"[ai_diary] Failed to add diary entry: {e}")
# Disable plugin if database is unavailable
PLUGIN_ENABLED = False
[docs]
def get_recent_entries(days: int = 2, max_chars: int = None) -> List[Dict[str, Any]]:
"""Get diary entries from the last N days, optionally limited by character count.
Returns list of dict entries with all database columns, empty list if plugin is disabled.
Entries are ordered from most recent to oldest, and if max_chars is specified,
older entries are discarded first to stay within the character limit."""
global PLUGIN_ENABLED
log_debug(
f"[ai_diary] get_recent_entries called with days={days}, max_chars={max_chars}, PLUGIN_ENABLED={PLUGIN_ENABLED}"
)
# Attempt lazy initialization if plugin was disabled at startup
if not PLUGIN_ENABLED:
try:
log_debug(
"[ai_diary] Attempting lazy initialization for get_recent_entries..."
)
_run(_execute("SELECT 1 FROM ai_diary LIMIT 1"))
PLUGIN_ENABLED = True
log_info(
"[ai_diary] Plugin lazy-initialized successfully in get_recent_entries"
)
except Exception as init_error:
log_debug(
f"[ai_diary] Lazy initialization failed in get_recent_entries: {init_error}"
)
log_debug("[ai_diary] Plugin disabled, returning empty list")
return []
if not PLUGIN_ENABLED:
log_debug("[ai_diary] Plugin disabled, returning empty list")
return []
try:
cutoff_date = datetime.now() - timedelta(days=days)
log_debug(f"[ai_diary] Looking for entries after {cutoff_date}")
entries = _run(
_fetchall(
"""
SELECT id, content, personal_thought, timestamp, context_tags, involved_users,
emotions, interface, chat_id, thread_id, interaction_summary, user_message
FROM ai_diary
WHERE timestamp >= %s
ORDER BY timestamp DESC
""",
(cutoff_date,),
)
)
log_debug(f"[ai_diary] Raw query returned {len(entries)} entries")
# Convert JSON fields back to objects
for entry in entries:
entry["context_tags"] = json.loads(entry.get("context_tags", "[]"))
entry["involved_users"] = json.loads(entry.get("involved_users", "[]"))
entry["emotions"] = json.loads(entry.get("emotions", "[]"))
entry["timestamp"] = (
entry["timestamp"].isoformat() if entry["timestamp"] else None
)
log_debug(f"[ai_diary] After JSON parsing: {len(entries)} entries")
# If character limit specified, filter entries intelligently
if max_chars:
total_chars = 0
filtered_entries = []
for i, entry in enumerate(entries):
# Calculate the size of this entry as JSON (since we're returning JSON now)
entry_json = json.dumps(entry, ensure_ascii=False)
entry_size = len(entry_json)
# Log first few entries to debug size issues
if i < 3:
log_debug(
f"[ai_diary] Entry {i + 1} size: {entry_size} chars, id: {entry.get('id')}"
)
# If adding this entry would exceed the limit, stop here
# Don't truncate individual entries, remove them entirely
if total_chars + entry_size > max_chars:
log_debug(
f"[ai_diary] Stopping at {len(filtered_entries)} entries due to char limit ({total_chars}/{max_chars})"
)
log_debug(
f"[ai_diary] Entry {i + 1} would add {entry_size} chars, exceeding limit"
)
break
filtered_entries.append(entry)
total_chars += entry_size
log_debug(
f"[ai_diary] Filtered diary: {len(filtered_entries)}/{len(entries)} entries, {total_chars} chars"
)
return filtered_entries
log_debug(f"[ai_diary] Returning all {len(entries)} entries (no char limit)")
return entries
except Exception as e:
log_error(f"[ai_diary] Failed to get recent entries: {e}")
# Disable plugin if database is unavailable
PLUGIN_ENABLED = False
return []
[docs]
def get_entries_with_person(person: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Get diary entries that involve a specific person."""
try:
entries = _run(
_fetchall(
"""
SELECT id, content, personal_thought, timestamp, context_tags, involved_users,
emotions, interface, chat_id, thread_id, interaction_summary, user_message
FROM ai_diary
WHERE JSON_CONTAINS(involved_users, %s)
ORDER BY timestamp DESC
LIMIT %s
""",
(json.dumps(person), limit),
)
)
# Convert JSON fields back to objects
for entry in entries:
entry["context_tags"] = json.loads(entry.get("context_tags", "[]"))
entry["involved_users"] = json.loads(entry.get("involved_users", "[]"))
entry["emotions"] = json.loads(entry.get("emotions", "[]"))
entry["timestamp"] = (
entry["timestamp"].isoformat() if entry["timestamp"] else None
)
return entries
except Exception as e:
log_error(f"[ai_diary] Failed to get entries with person {person}: {e}")
return []
[docs]
def cleanup_old_entries(days_to_keep: int = 30) -> int:
"""Remove diary entries older than specified days. Returns number of deleted entries.
Returns 0 if plugin is disabled."""
global PLUGIN_ENABLED
if not PLUGIN_ENABLED:
return 0
try:
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
# First count how many will be deleted
count_result = _run(
_fetchall(
"SELECT COUNT(*) as count FROM ai_diary WHERE timestamp < %s",
(cutoff_date,),
)
)
count = count_result[0]["count"] if count_result else 0
# Delete old entries
_run(_execute("DELETE FROM ai_diary WHERE timestamp < %s", (cutoff_date,)))
log_info(f"[ai_diary] Cleaned up {count} old diary entries")
return count
except Exception as e:
log_error(f"[ai_diary] Failed to cleanup old entries: {e}")
PLUGIN_ENABLED = False
return 0
[docs]
def create_personal_diary_entry(
synth_response: str,
user_message: str | None = None,
context_tags: List[str] | None = None,
involved_users: List[str] | None = None,
interface: str | None = None,
chat_id: str | None = None,
thread_id: str | None = None,
grillo_activity_log_id: int | None = None,
interaction_summary: str | None = None,
personal_thought: str | None = None,
emotions: List[Dict[str, Any]] | None = None,
) -> None:
"""Helper function to create a complete personal diary entry.
This function should be called every time synth responds to a user.
Thought/emotions/summary are expected from LLM action payload.
Args:
synth_response: What synth said to the user
user_message: What the user said to trigger this response
context_tags: Tags about the topic (e.g., ['food', 'cars', 'personal', 'help'])
involved_users: List of user names involved in this interaction (from bio system)
interface: Interface used
chat_id: Chat identifier
thread_id: Thread identifier
"""
# Normalize interface name
log_debug(f"[create_personal_diary_entry] Original interface: '{interface}'")
interface = normalize_interface_name(interface or "unknown")
log_debug(f"[create_personal_diary_entry] Normalized interface: '{interface}'")
# No hardcoded generation: persist only what the LLM explicitly provides.
if emotions is None:
emotions = []
# Add the diary entry
add_diary_entry(
content=synth_response,
personal_thought=personal_thought,
emotions=emotions,
interaction_summary=interaction_summary,
user_message=user_message,
context_tags=context_tags,
involved_users=involved_users,
interface=interface,
chat_id=chat_id,
thread_id=thread_id,
grillo_activity_log_id=grillo_activity_log_id,
)
def _format_single_entry_for_prompt(entry: dict) -> str:
"""Format a single diary entry as it would appear in the prompt."""
lines = []
timestamp = entry.get("timestamp", "Unknown time")
if timestamp and len(timestamp) > 19: # Truncate ISO timestamp
timestamp = timestamp[:19].replace("T", " ")
lines.append(f"📅 {timestamp}")
if entry.get("interaction_summary"):
lines.append(f"📝 What happened: {entry['interaction_summary']}")
lines.append(f"💬 I said: {entry['content']}")
if entry.get("personal_thought"):
lines.append(f"💭 My personal thought: {entry['personal_thought']}")
if entry.get("involved_users"):
lines.append(f"👥 I was talking with: {', '.join(entry['involved_users'])}")
if entry.get("context_tags"):
lines.append(f"🏷️ Topics discussed: {', '.join(entry['context_tags'])}")
if entry.get("emotions"):
emotion_str = ", ".join(
[
f"{e.get('type', 'unknown')} (intensity: {e.get('intensity', 0)})"
for e in entry["emotions"]
]
)
lines.append(f"❤️ How I felt: {emotion_str}")
interface = entry.get("interface", "")
chat_id = entry.get("chat_id", "")
thread_id = entry.get("thread_id", "")
if interface and chat_id:
context_str = f"{interface}/{chat_id}"
if thread_id:
context_str += f"/{thread_id}"
lines.append(f"📱 Platform: {context_str}")
lines.append("") # Empty line between entries
return "\n".join(lines)
def is_plugin_enabled() -> bool:
"""Check if the diary plugin is currently enabled."""
global PLUGIN_ENABLED
return PLUGIN_ENABLED
def enable_plugin() -> bool:
"""Try to enable the plugin by testing database connectivity."""
global PLUGIN_ENABLED
try:
# Test database connectivity
_run(init_diary_table())
PLUGIN_ENABLED = True
log_info("[ai_diary] Plugin enabled successfully")
return True
except Exception as e:
log_error(f"[ai_diary] Failed to enable plugin: {e}")
PLUGIN_ENABLED = False
return False
def disable_plugin() -> None:
"""Manually disable the plugin."""
global PLUGIN_ENABLED
PLUGIN_ENABLED = False
log_info("[ai_diary] Plugin manually disabled")
# Initialize table on module load
try:
_run(init_diary_table())
log_info("[ai_diary] Plugin initialized successfully")
PLUGIN_ENABLED = True
except Exception as e:
log_warning(
f"[ai_diary] Plugin initialization failed at startup (DB may not be ready yet): {e}"
)
# Don't disable immediately - allow lazy initialization
PLUGIN_ENABLED = False
[docs]
class DiaryPlugin:
"""Plugin that manages AI diary and provides static injection of recent entries."""
display_name = "AI Diary"
def __init__(self):
register_plugin("ai_diary", self)
def get_supported_action_types(self):
return ["static_inject", "create_personal_diary_entry", "update_diary_entry"]
[docs]
def get_history_contributions(self, **kwargs):
"""Provide diary entries as a history contribution for the core HistoryEngine."""
try:
from core.history_types import HistoryContribution
from core.config_manager import config_registry
try:
days = int(
config_registry.get_value("DIARY_HISTORY_DAYS", 2, value_type=int)
)
except Exception:
days = 2
entries = get_recent_entries(days=days, max_chars=None)
return [
HistoryContribution(
name="ai_diary",
priority=INJECTION_PRIORITY,
entries=entries,
enabled_var="ENABLE_AI_DIARY",
)
]
except Exception:
return []
def get_supported_actions(self):
return {
"static_inject": {
"schema": {"type": "object", "properties": {}, "required": []},
"brief": "Inject recent diary entries into the prompt context",
"examples": {
"description": "This action injects synth's recent diary entries to maintain memory and continuity",
"instructions": {},
"examples": [],
},
},
"create_personal_diary_entry": {
"schema": {
"type": "object",
"properties": {
"interaction_summary": {
"type": "string",
"description": "Summary of what happened in this interaction. Do NOT include weather, temperature, or location data — that context is provided separately.",
},
"content": {
"type": "string",
"description": "The response content (optional, auto-captured)",
},
"personal_thought": {
"type": "string",
"description": "Personal reflection on the interaction (optional). Focus on emotions and relationship dynamics, not environmental conditions.",
},
"emotions": {
"type": "array",
"description": 'Array of emotions with type and intensity (1-10). Format: [{"type": "emotion_name", "intensity": 7}]',
"items": {
"type": "object",
"properties": {
"type": {"type": "string", "example": "joy"},
"intensity": {
"type": "number",
"example": 7,
"minimum": 1,
"maximum": 10,
},
},
},
},
"context_tags": {
"type": "array",
"description": "Tags for topics discussed (optional)",
},
"involved_users": {
"type": "array",
"description": "Users involved in the interaction (optional)",
},
},
"required": ["interaction_summary"],
},
"brief": "Add a new diary entry to synth's memory - REQUIRED in every response. Never include weather/location data in summaries.",
"examples": {
"description": "Create a diary entry recording what happened in this interaction. This action MUST be included in EVERY response to maintain synth's persistent memory.",
"when_to_use": "Use this action in every single response to record the interaction in synth's personal memory",
"examples": [
{
"scenario": "User asks about weather",
"payload": {
"interaction_summary": "User asked about weather conditions and I provided current forecast"
},
},
{
"scenario": "User has technical problem",
"payload": {
"interaction_summary": "User reported technical issues with their system and I provided troubleshooting steps"
},
},
{
"scenario": "Casual conversation",
"payload": {
"interaction_summary": "Had a friendly chat about user's interests and daily activities"
},
},
],
"notes": [
"interaction_summary is REQUIRED and must describe what happened in this conversation",
"Be specific about what the user asked and what you provided",
"Use clear, descriptive language that would help remember this interaction later",
"Other fields are optional and will be generated automatically if not provided",
"This action MUST be included in every response without exception",
],
},
},
"update_diary_entry": {
"schema": {
"type": "object",
"properties": {
"id": {
"type": "integer",
"description": "ID of the diary entry to update (provided in the consolidation prompt).",
},
"content": {
"type": "string",
"description": "Fully merged first-person prose that synthesises all of today's diary fragments.",
},
},
"required": ["id", "content"],
},
"brief": "Replace diary entry content with a synthesised version (internal — triggered by the daily consolidation beat only).",
},
}
[docs]
async def get_static_injection(self, message=None, context_memory=None) -> dict:
"""Get recent diary entries for static injection. Returns empty dict if plugin disabled."""
global PLUGIN_ENABLED
log_debug(
f"[ai_diary] get_static_injection called, PLUGIN_ENABLED: {PLUGIN_ENABLED}"
)
if not PLUGIN_ENABLED:
log_debug("[ai_diary] Plugin is disabled, returning empty entries")
return {"latest_diary_entries": []}
import time
start = time.time()
try:
# Get diary history days from config_registry
try:
from core.config_manager import config_registry
diary_days = int(
config_registry.get_value("DIARY_HISTORY_DAYS", 2, value_type=int)
)
except Exception as e:
log_debug(
f"[ai_diary] Could not get DIARY_HISTORY_DAYS from config: {e}, using default 2"
)
diary_days = 2
# Get recent entries with generous limit - prompt_engine will trim if needed
log_debug(f"[ai_diary] Getting recent entries for {diary_days} days")
# Let's call the internal fetcher directly if we can
cutoff_date = datetime.now() - timedelta(days=diary_days)
recent_entries = await _fetchall(
"""
SELECT id, content, personal_thought, timestamp, context_tags, involved_users,
emotions, interface, chat_id, thread_id, interaction_summary, user_message
FROM ai_diary
WHERE timestamp >= %s
ORDER BY timestamp DESC
""",
(cutoff_date,),
)
# Process entries
for entry in recent_entries:
if isinstance(entry.get("context_tags"), str):
entry["context_tags"] = json.loads(entry["context_tags"] or "[]")
if isinstance(entry.get("involved_users"), str):
entry["involved_users"] = json.loads(
entry["involved_users"] or "[]"
)
if isinstance(entry.get("emotions"), str):
entry["emotions"] = json.loads(entry["emotions"] or "[]")
if entry.get("timestamp") and hasattr(entry["timestamp"], "isoformat"):
entry["timestamp"] = entry["timestamp"].isoformat()
duration = time.time() - start
if duration > 0.1:
log_info(f"[ai_diary] get_static_injection took {duration:.3f}s")
if recent_entries:
log_info(
f"[ai_diary] Returning {len(recent_entries)} diary entries for injection"
)
else:
log_debug("[ai_diary] No recent entries found")
# ALWAYS return latest_diary_entries key, even if empty
return {"latest_diary_entries": recent_entries}
except Exception as e:
log_error(f"[ai_diary] Error in get_static_injection: {e}")
# Return empty list, not empty dict - so the key is present
return {"latest_diary_entries": []}
[docs]
def execute_action(self, action: dict, context: dict, bot, original_message):
"""Execute diary-related actions."""
action_type = action.get("type")
payload = action.get("payload", {})
if action_type == "create_personal_diary_entry":
try:
# Extract information from context and payload
interface_name = context.get("interface", "unknown")
chat_id = getattr(original_message, "chat_id", None)
thread_id = getattr(original_message, "thread_id", None)
# Get user message from context or original_message
user_message = ""
if hasattr(original_message, "text"):
user_message = original_message.text
elif isinstance(original_message, dict) and "text" in original_message:
user_message = original_message["text"]
elif context and "input" in context and "payload" in context["input"]:
input_payload = context["input"]["payload"]
if "text" in input_payload:
user_message = input_payload["text"]
# Extract involved users from context participants
involved_users = []
if context and "participants" in context:
for participant in context["participants"]:
if "usertag" in participant:
# Remove @ from usertag
username = participant["usertag"].lstrip("@")
if username.lower() not in ["synth", "bot"]:
involved_users.append(username)
# Also add nicknames if available
if "nicknames" in participant and participant["nicknames"]:
for nickname in participant["nicknames"]:
if nickname and nickname.lower() not in [
"synth",
"bot",
]:
involved_users.append(nickname)
# Remove duplicates while preserving order
involved_users = list(dict.fromkeys(involved_users))
# Get parameters from payload (optional)
interaction_summary = payload.get("interaction_summary")
content = payload.get("content", "")
personal_thought = payload.get("personal_thought")
emotions = payload.get("emotions", [])
context_tags = payload.get("context_tags", [])
payload_involved_users = payload.get("involved_users", [])
# Use payload involved_users if provided, otherwise use extracted ones
if payload_involved_users:
involved_users = payload_involved_users
# Check if this diary entry is from a grillo beat
grillo_activity_log_id = (
context.get("activity_log_id") if context else None
)
# If no content provided, extract from recent actions in context
if not content:
# This will be handled by the automatic diary creation in action_parser
# Just log that we received the action
log_debug(
f"[ai_diary] Received create_personal_diary_entry action with summary: '{interaction_summary}'"
)
return {
"success": True,
"message": "Diary entry will be created automatically",
}
# Create diary entry with provided information
add_diary_entry(
content=content,
personal_thought=personal_thought,
emotions=emotions,
interaction_summary=interaction_summary,
user_message=user_message,
context_tags=context_tags,
involved_users=involved_users,
interface=interface_name,
chat_id=str(chat_id) if chat_id else None,
thread_id=str(thread_id) if thread_id else None,
grillo_activity_log_id=grillo_activity_log_id,
)
log_debug(
f"[ai_diary] Created diary entry via action: '{interaction_summary}'"
)
return {
"success": True,
"message": f"Diary entry created: {interaction_summary}",
}
except Exception as e:
log_error(
f"[ai_diary] Failed to execute create_personal_diary_entry action: {e}"
)
return {"success": False, "error": str(e)}
elif action_type == "update_diary_entry":
entry_id = payload.get("id")
new_content = (payload.get("content") or "").strip()
if not entry_id or not new_content:
return {"success": False, "error": "id and content are required"}
try:
_run(
_execute(
"UPDATE ai_diary SET content=%s, timestamp=NOW() WHERE id=%s",
(new_content, int(entry_id)),
)
)
log_info(
f"[ai_diary] Diary entry {entry_id} consolidated to clean prose"
)
return {"success": True, "message": f"Diary entry {entry_id} updated"}
except Exception as e:
log_error(f"[ai_diary] Failed to update diary entry {entry_id}: {e}")
return {"success": False, "error": str(e)}
else:
log_warning(f"[ai_diary] Unknown action type: {action_type}")
return {"success": False, "error": f"Unknown action type: {action_type}"}
[docs]
async def on_debrief(
self,
processed_actions: list,
failed_actions: list,
results: list,
context: dict,
original_message: object,
) -> None:
"""After each interaction, find the oldest unmerged diary day (last 7 days) and enqueue a consolidation beat.
Looks for diary rows whose ``content`` contains the ``---`` fragment separator,
meaning the LLM has not yet synthesised them into coherent prose. Only ONE beat is
enqueued per debrief call (the oldest unmerged day) so that the queue is not flooded.
Each successive interaction will process the next oldest day until all are clean.
The ``diary_merge_beat`` context flag prevents recursive triggering when the merge
beat's own response goes through debrief.
"""
if not PLUGIN_ENABLED:
return
# Prevent recursive loop: the merge beat itself triggers on_debrief again
if (context or {}).get("diary_merge_beat"):
return
try:
# Find the oldest day in the last 7 days that still has '---' in its content.
# Because historical days may have multiple rows (pre-upsert data), we use
# GROUP_CONCAT so a day with many separate rows still shows up here if any
# row contains '---' OR if there is more than one row for that day.
rows = await _fetchall(
"""
SELECT
MAX(id) AS id,
GROUP_CONCAT(content ORDER BY id ASC SEPARATOR '\n\n---\n\n') AS combined,
COUNT(*) AS row_count
FROM ai_diary
WHERE timestamp >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
GROUP BY DATE(timestamp)
HAVING row_count > 1 OR combined LIKE '%%---%%'
ORDER BY MIN(timestamp) ASC
LIMIT 1
"""
)
except Exception as e:
log_debug(f"[ai_diary] on_debrief: DB error fetching unmerged entries: {e}")
return
if not rows:
return
row = rows[0]
entry_id: int = row["id"]
content: str = row["combined"] or ""
row_count: int = row["row_count"]
if "---" not in content and row_count <= 1:
return
log_info(
f"[ai_diary] on_debrief: oldest unmerged day has entry id={entry_id} "
f"({row_count} rows, {content.count('---')} separators) — enqueueing merge beat"
)
prompt = (
"[DIARY CONSOLIDATION — INTERNAL SYSTEM TASK]\n\n"
"Your personal diary has accumulated multiple entries from the same day "
"(separated by '---'). Rewrite them as a single, coherent first-person diary entry "
"that weaves all the information together naturally.\n\n"
"Rules:\n"
"- Write flowing first-person prose (no bullet lists, no '---' separators).\n"
"- Preserve every meaningful detail from all fragments.\n"
"- Remove exact duplicates; keep nuance and emotional context.\n"
f"- The entry id to UPDATE is: {entry_id}\n\n"
"Diary fragments:\n\n"
f"{content}\n\n"
"Respond with ONLY valid JSON — no other text:\n"
'{"actions": [{"type": "update_diary_entry", "payload": {"id": '
f'{entry_id}, "content": "<your merged prose here>"'
"}}]}"
)
try:
from types import SimpleNamespace
from core import message_queue
message = SimpleNamespace()
message.chat_id = -1
message.message_id = 0
message.text = prompt
message.from_user = SimpleNamespace(
id=-1,
username="diary_merge",
full_name="Diary Merge",
first_name="Diary",
)
message.chat = SimpleNamespace(id=-1, type="internal")
message.date = datetime.utcnow()
await message_queue.enqueue_low_priority(
None,
message,
context_memory={"diary_merge_beat": True, "diary_entry_id": entry_id},
interface_id="diary_merge",
original_message=None,
)
log_info(
f"[ai_diary] on_debrief: diary consolidation beat enqueued for entry id={entry_id}"
)
except Exception as e:
log_error(f"[ai_diary] on_debrief: failed to enqueue merge beat: {e}")
def archive_diary_entries(entry_ids: List[int]) -> Dict[str, Any]:
"""Move diary entries from ai_diary to ai_diary_archive by their IDs."""
if not PLUGIN_ENABLED:
return {"success": False, "error": "Plugin disabled"}
if not entry_ids:
return {"success": False, "error": "No entry IDs provided"}
try:
# First, get the entries to archive
placeholders = ",".join(["%s"] * len(entry_ids))
entries = _run(
_fetchall(
f"SELECT * FROM ai_diary WHERE id IN ({placeholders})", tuple(entry_ids)
)
)
if not entries:
return {"success": False, "error": "No entries found with provided IDs"}
# Insert into archive table
for entry in entries:
_run(
_execute(
"""
INSERT INTO ai_diary_archive
(id, content, personal_thought, emotions, interaction_summary, timestamp,
interface, chat_id, thread_id, user_message, context_tags)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
entry["id"],
entry["content"],
entry["personal_thought"],
entry["emotions"],
entry["interaction_summary"],
entry["timestamp"],
entry["interface"],
entry["chat_id"],
entry["thread_id"],
entry["user_message"],
entry["context_tags"],
),
)
)
# Delete from main table
_run(
_execute(
f"DELETE FROM ai_diary WHERE id IN ({placeholders})", tuple(entry_ids)
)
)
log_info(f"[ai_diary] Archived {len(entries)} diary entries")
return {"success": True, "archived_count": len(entries)}
except Exception as e:
log_error(f"[ai_diary] Failed to archive diary entries: {e}")
return {"success": False, "error": str(e)}
def unarchive_diary_entries(entry_ids: List[int]) -> Dict[str, Any]:
"""Move diary entries from ai_diary_archive back to ai_diary by their IDs."""
if not PLUGIN_ENABLED:
return {"success": False, "error": "Plugin disabled"}
if not entry_ids:
return {"success": False, "error": "No entry IDs provided"}
try:
# First, get the entries to unarchive
placeholders = ",".join(["%s"] * len(entry_ids))
entries = _run(
_fetchall(
f"SELECT * FROM ai_diary_archive WHERE id IN ({placeholders})",
tuple(entry_ids),
)
)
if not entries:
return {
"success": False,
"error": "No archived entries found with provided IDs",
}
# Insert back into main table
for entry in entries:
_run(
_execute(
"""
INSERT INTO ai_diary
(id, content, personal_thought, emotions, interaction_summary, timestamp,
interface, chat_id, thread_id, user_message, context_tags)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
entry["id"],
entry["content"],
entry["personal_thought"],
entry["emotions"],
entry["interaction_summary"],
entry["timestamp"],
entry["interface"],
entry["chat_id"],
entry["thread_id"],
entry["user_message"],
entry["context_tags"],
),
)
)
# Delete from archive table
_run(
_execute(
f"DELETE FROM ai_diary_archive WHERE id IN ({placeholders})",
tuple(entry_ids),
)
)
log_info(f"[ai_diary] Unarchived {len(entries)} diary entries")
return {"success": True, "unarchived_count": len(entries)}
except Exception as e:
log_error(f"[ai_diary] Failed to unarchive diary entries: {e}")
return {"success": False, "error": str(e)}
def delete_archived_entries(entry_ids: List[int]) -> Dict[str, Any]:
"""Permanently delete diary entries from ai_diary_archive."""
if not PLUGIN_ENABLED:
return {"success": False, "error": "Plugin disabled"}
if not entry_ids:
return {"success": False, "error": "No entry IDs provided"}
try:
placeholders = ",".join(["%s"] * len(entry_ids))
result = _run(
_execute(
f"DELETE FROM ai_diary_archive WHERE id IN ({placeholders})",
tuple(entry_ids),
)
)
deleted_count = (
result.rowcount if hasattr(result, "rowcount") else len(entry_ids)
)
log_info(f"[ai_diary] Deleted {deleted_count} archived diary entries")
return {"success": True, "deleted_count": deleted_count}
except Exception as e:
log_error(f"[ai_diary] Failed to delete archived diary entries: {e}")
return {"success": False, "error": str(e)}
def get_all_diary_entries(include_archived: bool = False) -> List[Dict[str, Any]]:
"""Get all diary entries, optionally including archived ones."""
if not PLUGIN_ENABLED:
return []
try:
entries = _run(
_fetchall(
"""
SELECT id, content, personal_thought, timestamp, context_tags,
emotions, interface, chat_id, thread_id, interaction_summary, user_message,
FALSE as archived
FROM ai_diary
ORDER BY timestamp DESC
"""
)
)
if include_archived:
archived_entries = _run(
_fetchall(
"""
SELECT id, content, personal_thought, timestamp, context_tags,
emotions, interface, chat_id, thread_id, interaction_summary, user_message,
TRUE as archived
FROM ai_diary_archive
ORDER BY timestamp DESC
"""
)
)
entries.extend(archived_entries)
# Convert JSON fields back to objects
for entry in entries:
entry["context_tags"] = json.loads(entry.get("context_tags", "[]"))
entry["emotions"] = json.loads(entry.get("emotions", "[]"))
entry["timestamp"] = (
entry["timestamp"].isoformat() if entry["timestamp"] else None
)
return entries
except Exception as e:
log_error(f"[ai_diary] Failed to get all diary entries: {e}")
return []
# Instantiate the plugin to register it with the core
try:
_diary_plugin_instance = DiaryPlugin()
log_info("[ai_diary] Plugin instance created and registered with core")
except Exception as e:
log_error(f"[ai_diary] Failed to instantiate DiaryPlugin: {e}")
PLUGIN_CLASS = DiaryPlugin