Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 35 additions & 144 deletions core/business_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Business (project) manager with hot-reload and CRUD operations."""
"""""Business (project) manager with hot-reload and CRUD operations."""

import os
import time
Expand All @@ -11,7 +11,6 @@

logger = logging.getLogger(__name__)


class BusinessManager:
"""Manages project YAML files with hot-reload capability.

Expand Down Expand Up @@ -89,153 +88,45 @@ def reload(self):

# ── File Watcher ──────────────────────────────────────────────────

def start_watching(self, interval: float = 5.0):
"""Start file watcher thread (daemon, polls every interval seconds)."""
if self._watching:
return
self._watching = True
self._watcher_thread = threading.Thread(
target=self._watch_loop, args=(interval,), daemon=True
)
def start_watching(self, projects_dir: str = "projects/"):
"""Start watching for file changes in the projects directory."""
self.projects_dir = Path(projects_dir)
self.projects_dir.mkdir(parents=True, exist_ok=True)
self._watcher_thread = threading.Thread(target=self._watch_files)
self._watcher_thread.start()
logger.info(f"Project file watcher started (interval={interval}s)")

def _watch_files(self):
"""Thread target to watch for file changes and reload projects."""
while True:
time.sleep(5) # Check every 5 seconds
new_mtimes = {}
for f in self.projects_dir.glob("*.yaml"):
new_mtimes[str(f)] = f.stat().st_mtime
if new_mtimes != self._file_mtimes:
self.reload()

def stop_watching(self):
"""Stop file watcher thread."""
self._watching = False
"""Stop the file watcher thread."""
if self._watcher_thread and self._watcher_thread.is_alive():
self._watcher_thread.join()
self._watcher_thread = None

def _watch_loop(self, interval: float):
"""Poll for file changes in projects/ directory."""
while self._watching:
time.sleep(interval)
try:
current_mtimes = {}
for f in self.projects_dir.glob("*.yaml"):
current_mtimes[str(f)] = f.stat().st_mtime
# ── CRUD Operations ─────────────────────────────────────────────

if current_mtimes != self._file_mtimes:
logger.info("Project files changed, reloading...")
self.reload()
except Exception as e:
logger.error(f"File watcher error: {e}")

def on_reload(self, callback: Callable):
"""Register a callback for when projects are reloaded.

Callback receives: callback(projects: List[Dict])
"""
self._on_reload_callbacks.append(callback)

# ── CRUD Operations ───────────────────────────────────────────────

def add_project(
self,
name: str,
url: str,
description: str,
project_type: str = "SaaS",
**kwargs,
) -> str:
"""Create a new project YAML file.

Returns the filepath of the created file.
Raises ValueError if file already exists.
"""
slug = name.lower().replace(" ", "_").replace("-", "_")
filepath = self.projects_dir / f"{slug}.yaml"

if filepath.exists():
raise ValueError(f"Project file already exists: {filepath}")

data = {
"project": {
"name": name,
"url": url,
"type": project_type,
"description": description,
"tagline": kwargs.get("tagline", ""),
"weight": kwargs.get("weight", 1.0),
"enabled": True,
"selling_points": kwargs.get("selling_points", []),
"target_audiences": kwargs.get("target_audiences", []),
"business_profile": {
"socials": {
"twitter": "",
"website": url,
},
"features": [],
"pricing": {
"model": "unknown",
"free_tier": "",
"paid_plans": [],
},
"faqs": [],
"competitors": [],
"rules": {
"never_say": [],
"always_accurate": [
f"Product name is exactly '{name}'",
f"URL is {url}",
],
},
},
},
"reddit": {
"target_subreddits": {"primary": [], "secondary": []},
"keywords": [],
"min_post_score": 3,
"max_post_age_hours": 24,
},
"twitter": {
"keywords": [],
"hashtags": [],
},
"tone": {
"style": "helpful_casual",
"language": "en",
"formality": "casual",
},
}

with open(filepath, "w") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
def create_project(self, project_data: Dict):
"""Create a new project."""
with open(f"{self.projects_dir}/{project_data['name']}.yaml", "w") as fh:
yaml.dump(project_data, fh)
self.reload()

def update_project(self, project_name: str, project_data: Dict):
"""Update an existing project."""
with open(f"{self.projects_dir}/{project_name}.yaml", "w") as fh:
yaml.dump(project_data, fh)
self.reload()
return str(filepath)

def delete_project(self, name: str) -> bool:
"""Delete a project by name. Returns True if found and deleted."""
for f in self.projects_dir.glob("*.yaml"):
try:
with open(f) as fh:
data = yaml.safe_load(fh) or {}
if data.get("project", {}).get("name", "").lower() == name.lower():
f.unlink()
self.reload()
return True
except Exception:
continue
return False

def get_project(self, name: str) -> Optional[Dict]:
"""Get a project by name (case-insensitive)."""
for p in self.projects:
if p.get("project", {}).get("name", "").lower() == name.lower():
return p
return None

def list_projects(self) -> List[str]:
"""List all project names."""
return [p["project"]["name"] for p in self.projects]

def get_project_filepath(self, name: str) -> Optional[str]:
"""Get the YAML file path for a project."""
for f in self.projects_dir.glob("*.yaml"):
try:
with open(f) as fh:
data = yaml.safe_load(fh) or {}
if data.get("project", {}).get("name", "").lower() == name.lower():
return str(f)
except Exception:
continue
return None
def delete_project(self, project_name: str):
"""Delete a project."""
os.remove(f"{self.projects_dir}/{project_name}.yaml")
self.reload()
""