You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

860 lines
35 KiB

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import create_engine, text
from sentence_transformers import SentenceTransformer
from pydantic import BaseModel
from dotenv import load_dotenv
from app.bootstrap import bootstrap_domain
from app.db_schema import ensure_schema
from app.normalizer import normalize_query
from app.embedding_cache import get_or_encode
from app.semantic_cache import get_semantic_cache, set_semantic_cache, create_index_if_not_exists
import threading
import requests as http_requests
import os
import re
import time
import json as json_module
import hashlib
import redis as redis_lib
from app.vertex_client import get_access_token
from app.logger import log_event
from fastapi import Request
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)
model = SentenceTransformer("all-MiniLM-L6-v2")
GOOGLE_PROJECT_ID = "sylvan-deck-387207"
LOCATION = "us-central1"
GOOGLE_SEARCH_API_KEY = os.getenv("GOOGLE_SEARCH_API_KEY")
GOOGLE_SEARCH_CX = os.getenv("GOOGLE_SEARCH_CX")
# Gemini URL
GEMINI_URL = f"https://{LOCATION}-aiplatform.googleapis.com/v1/projects/{GOOGLE_PROJECT_ID}/locations/{LOCATION}/publishers/google/models/gemini-2.5-flash-lite:generateContent"
# ── Startup ──────────────────────────────────────────
ensure_schema(engine)
create_index_if_not_exists()
print("System ready")
# ── Redis helper ──────────────────────────────────────
def get_redis():
try:
r = redis_lib.Redis(host='localhost', port=6379, decode_responses=True)
r.ping()
return r
except Exception:
return None
def get_live_cache_key(query: str) -> str:
word_count = len(query.strip().split())
prefix = "live:full" if word_count >= 3 else "live:short"
return f"{prefix}:{hashlib.md5(query.lower().strip().encode()).hexdigest()}"
# ── Async cache writer ────────────────────────────────
def write_cache_async(query: str, embedding: list, suggestions: list, domain: str = None):
def _write():
try:
set_semantic_cache(query, embedding, suggestions, domain=domain)
except Exception as e:
print(f"⚠️ Cache write failed: {e}")
threading.Thread(target=_write, daemon=True).start()
# ── Gibberish detector ────────────────────────────────
def is_gibberish(text: str) -> bool:
words = text.lower().split()
if not words:
return True
gibberish_count = 0
for word in words:
if len(word) <= 3:
continue
if re.search(r'[^aeiou]{6,}', word):
gibberish_count += 1
continue
vowels = len(re.findall(r'[aeiou]', word))
if len(word) > 5 and vowels / len(word) < 0.1:
gibberish_count += 1
continue
if re.search(r'[a-z]\d{3,}[a-z]|[0-9]{4,}[a-z]', word):
gibberish_count += 1
continue
return gibberish_count > len(words) / 2
# ── Middleware ────────────────────────────────────────
@app.middleware("http")
async def log_requests(request, call_next):
start_time = time.time()
response = await call_next(request)
latency = int((time.time() - start_time) * 1000)
cache_status = getattr(request.state, "cache_status", "missing")
try:
query = request.query_params.get("query", "")
category = request.query_params.get("category", "general")
log_event({
"path": request.url.path,
"query": query,
"latency_ms": latency,
"status_code": response.status_code,
"category": category,
"cache_status": cache_status
})
print("CACHE:", cache_status)
except Exception as e:
print(f"Logging middleware error: {e}")
print("MIDDLEWARE sees:", getattr(request.state, "cache_status", "NOT_SET"))
return response
# ── Fast Gemini call ──────────────────────────────────
def call_gemini_fast(query: str, limit: int = 15) -> list:
prompt = f"""List {limit} specific evaluation criteria for: "{query}"
Rules:
- Specific to "{query}" only — no competitors
- 2-5 words each
- Most important first
Return ONLY a JSON array. Example: ["Battery Life", "On Road Price", "Fuel Efficiency"]
Generate for: "{query}" """
for attempt in range(2):
try:
res = http_requests.post(
GEMINI_URL,
headers={
"Authorization": f"Bearer {get_access_token()}",
"Content-Type": "application/json"
},
json={
"contents": [{"role": "user", "parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.1,
"maxOutputTokens": 250 # small = fast
}
},
timeout=6
)
if res.status_code == 429:
time.sleep(2 ** attempt)
continue
res.raise_for_status()
raw = res.json()["candidates"][0]["content"]["parts"][0]["text"]
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
suggestions = json_module.loads(match.group(0))
print(f"✅ Fast Gemini: {len(suggestions)} suggestions for '{query}'")
return suggestions
except http_requests.exceptions.Timeout:
print(f"⏰ Gemini timeout attempt {attempt+1}")
except Exception as e:
print(f"⚠️ Gemini attempt {attempt+1}: {type(e).__name__}")
time.sleep(1)
return []
def call_gemini_more(query: str, existing: list) -> list:
"""Second background call — 25 more suggestions, different ones"""
existing_str = ", ".join(existing[:15]) # send first 15 so Gemini avoids them
prompt = f"""List 25 more specific evaluation criteria for: "{query}"
Rules:
- Specific to "{query}" only
- 2-5 words each
- Do NOT repeat any of these: {existing_str}
- Different aspects not yet covered
- Most important first
Return ONLY a JSON array.
Generate for: "{query}" """
try:
res = http_requests.post(
GEMINI_URL,
headers={
"Authorization": f"Bearer {get_access_token()}",
"Content-Type": "application/json"
},
json={
"contents": [{"role": "user", "parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.3,
"maxOutputTokens": 600
}
},
timeout=15
)
res.raise_for_status()
raw = res.json()["candidates"][0]["content"]["parts"][0]["text"]
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
more = json_module.loads(match.group(0))
# filter out any duplicates
existing_lower = [e.lower() for e in existing]
return [s for s in more if s.lower() not in existing_lower]
except Exception as e:
print(f"⚠️ call_gemini_more failed: {type(e).__name__}")
return []
# ── Background store in Redis + DB ───────────────────
def store_in_background(query: str, suggestions: list, r=None, cache_key: str = None):
def _store():
if r and cache_key:
try:
ttl = 86400 if len(query.split()) >= 3 else 1800
r.setex(cache_key, ttl, json_module.dumps(suggestions))
print(f"💾 Redis cached: '{query}' ({len(suggestions)} items)")
except Exception:
pass
try:
bootstrap_domain(query)
print(f"✅ DB bootstrapped: '{query}'")
if r and cache_key:
try:
r.delete(cache_key)
print(f"🗑️ Redis cleared → DB now serves '{query}'")
except Exception:
pass
except Exception as e:
print(f"⚠️ Bootstrap failed: {type(e).__name__}")
threading.Thread(target=_store, daemon=True).start()
# ── Main get suggestions function ────────────────────
def get_suggestions_fast(query: str, limit: int = 15, existing: list = None) -> dict:
existing = existing or []
# ── L1: Redis cache ──────────────────────────────
r = get_redis()
cache_key = get_live_cache_key(query)
if r:
try:
cached = r.get(cache_key)
if cached:
all_suggestions = json_module.loads(cached)
new_only = [s for s in all_suggestions if s not in existing]
print(f"⚡ Redis HIT: '{query}'{len(new_only)} available")
return {"suggestions": new_only[:limit], "cache": "redis_hit"}
except Exception:
pass
# ── L2: DB (pgvector) ────────────────────────────
try:
normalized = normalize_query(query.strip())
embedding = get_or_encode(normalized, model)
emb_param = str(embedding)
with engine.begin() as conn:
domain_row = conn.execute(text("""
SELECT id, name,
embedding <-> CAST(:emb AS vector) AS distance
FROM domains ORDER BY distance LIMIT 1
"""), {"emb": emb_param}).fetchone()
if domain_row and domain_row.distance <= 0.8:
results = conn.execute(text("""
SELECT a.name,
(1 - (a.embedding <-> CAST(:emb AS vector))) * 0.7
+ LEAST(COALESCE(af.click_count, 0), 100) * 0.003 AS score
FROM attributes a
JOIN dimension_groups g ON a.group_id = g.id
LEFT JOIN attribute_feedback af
ON af.attribute_name = a.name
AND af.domain = :domain_name
WHERE g.domain_id = :domain_id
ORDER BY score DESC
LIMIT :limit
"""), {"emb": emb_param, "domain_id": domain_row.id,
"domain_name": domain_row.name, "limit": limit * 2})
db_suggestions = [row[0] for row in results]
new_only = [s for s in db_suggestions if s not in existing]
if new_only:
print(f"⚡ DB HIT: '{query}'{len(new_only)} suggestions")
if r:
try:
r.setex(cache_key, 3600, json_module.dumps(db_suggestions))
except Exception:
pass
return {"suggestions": new_only[:limit], "cache": "db_hit",
"domain": domain_row.name}
except Exception as e:
print(f"⚠️ DB check failed: {type(e).__name__}")
# ── L3: Gemini fast (new query) ──────────────────
print(f"🔄 New query — calling Gemini fast: '{query}'")
suggestions = call_gemini_fast(query, limit=15)
if suggestions:
# Background: fetch 25 more + store everything in Redis
def fetch_more_and_store():
more = call_gemini_more(query, suggestions)
all_suggestions = suggestions + more
print(f"✅ Total suggestions after background fetch: {len(all_suggestions)}")
store_in_background(query, all_suggestions, r, cache_key)
threading.Thread(target=fetch_more_and_store, daemon=True).start()
new_only = [s for s in suggestions if s not in existing]
return {"suggestions": new_only[:limit], "cache": "gemini_fast"}
return {"suggestions": [], "cache": "gemini_failed"}
# ── Category suggestions via Gemini ──────────────────
def generate_category_suggestions(query: str, category: str, existing: list, limit: int = 15):
category_context = {
"shopping": "buying options, best price, where to buy, deals, discounts, EMI, cashback, online vs offline, delivery, sellers",
"images": "exterior design, interior photos, color options, visual features, styling, aesthetics, dimensions look, photo gallery",
"videos": "video reviews, test drive videos, owner reviews, comparison videos, YouTube channels, expert opinions, unboxing",
"news": "latest news, recent updates, new model announcement, price change, upcoming launch, recalls, awards won",
"places": "best dealers, showrooms near me, service centers, test drive locations, authorized dealers, city availability"
}
context = category_context.get(category, "general evaluation")
prompt = f"""The user is searching for "{query}" in {category.upper()} category.
Generate {limit} specific keyword suggestions related to "{query}" for the {category} context.
Focus on: {context}
Rules:
- Each suggestion must be 2-5 words
- Must be directly related to "{query}" in {category} context
- Order by most searched/relevant first
Return ONLY a JSON array of strings. No explanation."""
for attempt in range(3):
try:
res = http_requests.post(
GEMINI_URL,
headers={
"Authorization": f"Bearer {get_access_token()}",
"Content-Type": "application/json"
},
json={
"contents": [{"role": "user", "parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0.4, "maxOutputTokens": 512}
},
timeout=30
)
if res.status_code == 429:
time.sleep(2 ** attempt)
continue
res.raise_for_status()
raw = res.json()["candidates"][0]["content"]["parts"][0]["text"]
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
suggestions = json_module.loads(match.group(0))
return {"suggestions": suggestions[:limit], "cache": "category_ai"}
return {"suggestions": [], "cache": "parse_error"}
except http_requests.exceptions.ConnectionError as e:
print(f"⚠️ Connection error attempt {attempt + 1}: {type(e).__name__}")
if attempt < 2:
time.sleep(2)
continue
return {"suggestions": [], "cache": "connection_error"}
except Exception as e:
print(f"❌ Category suggestions failed: {type(e).__name__}")
return {"suggestions": [], "cache": "error"}
return {"suggestions": [], "cache": "failed"}
# ── serve_from_db ─────────────────────────────────────
def serve_from_db(conn, domain_row, emb_param, limit, offset):
results = conn.execute(text("""
SELECT a.name,
(1 - (a.embedding <-> CAST(:emb AS vector))) * 0.7
+ LEAST(COALESCE(af.click_count, 0), 100) * 0.003 AS score
FROM attributes a
JOIN dimension_groups g ON a.group_id = g.id
LEFT JOIN attribute_feedback af ON af.attribute_name = a.name
WHERE g.domain_id = :domain_id
ORDER BY score DESC
LIMIT :limit OFFSET :offset
"""), {"emb": emb_param, "domain_id": domain_row.id,
"limit": limit, "offset": offset})
return [r[0] for r in results]
# ── /suggest endpoint ─────────────────────────────────
@app.get("/suggest")
def suggest(request: Request, query: str, offset: int = 0, limit: int = 15, category: str = "general"):
def build_response(data, status):
request.state.cache_status = status
data["cache"] = status
return data
start = time.time()
if len(query.strip()) < 2:
return build_response({"suggestions": []}, "skip")
if is_gibberish(query.strip()):
return build_response({"suggestions": []}, "gibberish")
# Category → AI directly
if category != "general":
result = generate_category_suggestions(query.strip(), category, [], limit)
return build_response(result, "category_ai")
normalized = normalize_query(query.strip())
print(f"Normalized: '{query}''{normalized}'")
# ── Semantic cache check ──────────────────────────
try:
embedding = get_or_encode(normalized, model)
emb_param = str(embedding)
with engine.begin() as conn:
domain_row = conn.execute(text("""
SELECT id, name,
embedding <-> CAST(:emb AS vector) AS distance
FROM domains ORDER BY distance LIMIT 1
"""), {"emb": emb_param}).fetchone()
if domain_row and domain_row.distance <= 0.8:
domain_name = domain_row.name
if offset == 0:
cached = get_semantic_cache(embedding, domain=domain_name)
if cached:
distance = cached["distance"]
cached_query = cached["query"]
suggestions = cached["suggestions"]
print(f"Cache found → distance: {distance:.4f}, query: {cached_query}")
def is_relevant(q, cq):
q_words = set(q.lower().split())
c_words = set(cq.lower().split())
overlap = len(q_words & c_words) / max(len(q_words), 1)
return overlap > 0.5
if distance < 0.05 and is_relevant(normalized, cached_query):
print("✅ Strong semantic cache HIT")
return build_response(
{"suggestions": suggestions[:limit], "domain": domain_name},
"strong_hit"
)
elif distance < 0.1 and is_relevant(normalized, cached_query):
print("⚡ Medium match → refining")
fresh_results = conn.execute(text("""
SELECT a.name,
1 - (a.embedding <-> CAST(:emb AS vector)) AS score
FROM attributes a
JOIN dimension_groups g ON a.group_id = g.id
WHERE g.domain_id = :domain_id
ORDER BY score DESC
LIMIT :limit
"""), {"emb": emb_param, "domain_id": domain_row.id, "limit": limit})
fresh = [r[0] for r in fresh_results]
merged = []
seen = set()
for item in suggestions + fresh:
if item.lower() not in seen:
seen.add(item.lower())
merged.append(item)
return build_response(
{"suggestions": merged[:limit], "domain": domain_name},
"refined_hit"
)
else:
print("❌ Cache ignored (low relevance)")
results = conn.execute(text("""
SELECT a.name,
(1 - (a.embedding <-> CAST(:emb AS vector))) * 0.7
+ LEAST(COALESCE(af.click_count, 0), 100) * 0.003 AS score
FROM attributes a
JOIN dimension_groups g ON a.group_id = g.id
LEFT JOIN attribute_feedback af
ON af.attribute_name = a.name
AND af.domain = :domain_name
WHERE g.domain_id = :domain_id
ORDER BY score DESC
LIMIT :limit OFFSET :offset
"""), {"emb": emb_param, "domain_id": domain_row.id,
"domain_name": domain_name, "limit": limit, "offset": offset})
suggestions = [r[0] for r in results]
seen = set()
ranked = []
for name in suggestions:
if name.lower() not in seen:
seen.add(name.lower())
ranked.append(name)
if ranked:
if offset == 0 and len(normalized.split()) >= 2:
write_cache_async(normalized, embedding, ranked, domain=domain_name)
print(f"⚡ DB HIT: '{query}'{len(ranked)} suggestions in {int((time.time()-start)*1000)}ms")
return build_response(
{"suggestions": ranked, "domain": domain_name},
"db_hit"
)
except Exception as e:
print(f"⚠️ DB/cache check failed: {type(e).__name__}")
# ── Fast Gemini fallback ──────────────────────────
print(f"🔄 Fast Gemini for: '{query}'")
result = get_suggestions_fast(query, limit=40)
return build_response(result, result.get("cache", "gemini_fast"))
# ── /suggest/more endpoint ────────────────────────────
@app.get("/suggest/more")
def suggest_more(query: str, category: str = "general", existing: str = ""):
existing_list = [e.strip() for e in existing.split(",") if e.strip()]
if category == "general":
r = get_redis()
if r:
try:
cache_key = get_live_cache_key(query)
cached = r.get(cache_key)
if cached:
all_suggestions = json_module.loads(cached)
new_only = [s for s in all_suggestions if s not in existing_list]
print(f"⚡ Instant more: {len(new_only)} from cache")
return {"suggestions": new_only[:12], "cache": "live_hit"}
except Exception:
pass
result = get_suggestions_fast(query, limit=12, existing=existing_list)
return result
category_context = {
"shopping": "focus on buying, pricing, deals, offers, sellers, delivery, payment options",
"images": "focus on visual aspects, design, appearance, colors, aesthetics, photos",
"videos": "focus on reviews, test drives, comparisons, tutorials, unboxing, demos",
"news": "focus on latest updates, recent changes, announcements, trends, events",
"places": "focus on locations, dealers, service centers, showrooms, nearby options",
}
context = category_context.get(category, "general evaluation")
prompt = f"""Generate 12 more keyword suggestions for "{query}" in {category.upper()} context.
Focus on: {context}
Do NOT repeat: {', '.join(existing_list) if existing_list else 'none'}
Each suggestion 2-5 words max.
Return ONLY a JSON array."""
try:
res = http_requests.post(
GEMINI_URL,
headers={"Authorization": f"Bearer {get_access_token()}", "Content-Type": "application/json"},
json={"contents": [{"role": "user", "parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0.5, "maxOutputTokens": 300}},
timeout=15)
res.raise_for_status()
raw = res.json()["candidates"][0]["content"]["parts"][0]["text"]
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
suggestions = json_module.loads(match.group(0))
return {"suggestions": [s for s in suggestions if s not in existing_list][:12],
"cache": "ai_generated"}
return {"suggestions": [], "cache": "parse_error"}
except Exception as e:
print(f"❌ suggest/more failed: {type(e).__name__}")
return {"suggestions": [], "cache": "error"}
# ── /search/videos endpoint ───────────────────────────
@app.get("/search/videos")
def search_videos(query: str):
if not GOOGLE_SEARCH_API_KEY:
return {"results": [], "error": "API key not configured"}
try:
url = "https://www.googleapis.com/youtube/v3/search"
params = {
"part": "snippet",
"q": query,
"type": "video",
"maxResults": 6,
"order": "relevance",
"key": GOOGLE_SEARCH_API_KEY
}
res = http_requests.get(url, params=params, timeout=10)
res.raise_for_status()
data = res.json()
results = []
for item in data.get("items", []):
snippet = item["snippet"]
video_id = item["id"]["videoId"]
results.append({
"title": snippet["title"],
"channel": snippet["channelTitle"],
"thumbnail": snippet["thumbnails"]["medium"]["url"],
"url": f"https://www.youtube.com/watch?v={video_id}",
"published": snippet["publishedAt"][:10],
"description": snippet.get("description", "")[:120]
})
return {"results": results, "query": query}
except Exception as e:
print(f"❌ YouTube search failed: {type(e).__name__}")
return {"results": [], "error": "Search unavailable. Please try again."}
# ── /search/shopping endpoint ─────────────────────────
@app.get("/search/shopping")
def search_shopping(query: str):
if not GOOGLE_SEARCH_API_KEY or not GOOGLE_SEARCH_CX:
return {"results": [], "error": "Google Search API not configured"}
try:
url = "https://www.googleapis.com/customsearch/v1"
params = {
"q": f"{query} buy price",
"cx": GOOGLE_SEARCH_CX,
"key": GOOGLE_SEARCH_API_KEY,
"num": 6
}
res = http_requests.get(url, params=params, timeout=10)
res.raise_for_status()
data = res.json()
results = []
for item in data.get("items", []):
results.append({
"title": item.get("title", ""),
"link": item.get("link", ""),
"snippet": item.get("snippet", "")[:150],
"source": item.get("displayLink", ""),
"image": item.get("pagemap", {}).get("cse_image", [{}])[0].get("src", "")
})
return {"results": results, "query": query}
except Exception as e:
print(f"❌ Shopping search failed: {type(e).__name__}")
return {"results": [], "error": "Search unavailable. Please try again."}
# ── /search/images endpoint ───────────────────────────
@app.get("/search/images")
def search_images(query: str):
if not GOOGLE_SEARCH_API_KEY or not GOOGLE_SEARCH_CX:
return {"results": [], "error": "Google Search API not configured"}
try:
url = "https://www.googleapis.com/customsearch/v1"
params = {
"q": query,
"cx": GOOGLE_SEARCH_CX,
"key": GOOGLE_SEARCH_API_KEY,
"searchType": "image",
"num": 6,
"safe": "active"
}
res = http_requests.get(url, params=params, timeout=10)
res.raise_for_status()
data = res.json()
results = []
for item in data.get("items", []):
results.append({
"title": item.get("title", ""),
"link": item.get("link", ""),
"source": item.get("displayLink", ""),
"thumbnail": item.get("image", {}).get("thumbnailLink", ""),
"context_link": item.get("image", {}).get("contextLink", "")
})
return {"results": results, "query": query}
except Exception as e:
print(f"❌ Image search failed: {type(e).__name__}")
return {"results": [], "error": "Search unavailable. Please try again."}
# ── /search/news endpoint ─────────────────────────────
@app.get("/search/news")
def search_news(query: str):
if not GOOGLE_SEARCH_API_KEY or not GOOGLE_SEARCH_CX:
return {"results": [], "error": "Google Search API not configured"}
try:
url = "https://www.googleapis.com/customsearch/v1"
params = {
"q": f"{query} news 2025",
"cx": GOOGLE_SEARCH_CX,
"key": GOOGLE_SEARCH_API_KEY,
"num": 6,
"sort": "date"
}
res = http_requests.get(url, params=params, timeout=10)
res.raise_for_status()
data = res.json()
results = []
for item in data.get("items", []):
results.append({
"title": item.get("title", ""),
"link": item.get("link", ""),
"snippet": item.get("snippet", "")[:200],
"source": item.get("displayLink", ""),
"image": item.get("pagemap", {}).get("cse_image", [{}])[0].get("src", "")
})
return {"results": results, "query": query}
except Exception as e:
print(f"❌ News search failed: {type(e).__name__}")
return {"results": [], "error": "Search unavailable. Please try again."}
# ── /search/places endpoint ───────────────────────────
@app.get("/search/places")
def search_places(query: str):
if not GOOGLE_SEARCH_API_KEY or not GOOGLE_SEARCH_CX:
return {"results": [], "error": "Google Search API not configured"}
try:
url = "https://www.googleapis.com/customsearch/v1"
params = {
"q": f"{query} near me dealers showroom location",
"cx": GOOGLE_SEARCH_CX,
"key": GOOGLE_SEARCH_API_KEY,
"num": 6
}
res = http_requests.get(url, params=params, timeout=10)
res.raise_for_status()
data = res.json()
results = []
for item in data.get("items", []):
results.append({
"title": item.get("title", ""),
"link": item.get("link", ""),
"snippet": item.get("snippet", "")[:200],
"source": item.get("displayLink", ""),
"image": item.get("pagemap", {}).get("cse_image", [{}])[0].get("src", "")
})
return {"results": results, "query": query}
except Exception as e:
print(f"❌ Places search failed: {type(e).__name__}")
return {"results": [], "error": "Search unavailable. Please try again."}
# ── /generate endpoint ────────────────────────────────
class GenerateRequest(BaseModel):
query: str
selected_attributes: list[str]
category: str = "general"
chat_history: list[dict] = []
@app.post("/generate")
def generate(request: GenerateRequest):
if not request.query.strip():
return {"answer": "Please enter a query."}
history_text = ""
if request.chat_history:
history_text = "\n".join([
f"User: {h['query']}\nCriteria: {', '.join(h.get('chips', []))}\nAnswer: {h['answer']}"
for h in request.chat_history
])
history_text = f"Previous conversation:\n{history_text}\n\n"
attributes = ", ".join(request.selected_attributes) if request.selected_attributes else "general evaluation"
category_focus = {
"shopping": "Focus on buying options, best prices, deals, where to buy, payment plans, EMI options.",
"images": "Focus on visual design, appearance, color options, exterior/interior aesthetics.",
"videos": "Focus on video reviews, test drives, comparisons, what reviewers say.",
"news": "Focus on latest news, recent updates, upcoming changes, current trends.",
"places": "Focus on best places to buy, dealers, showrooms, service centers.",
"general": ""
}.get(request.category, "")
prompt = f"""{history_text}USER QUESTION: "{request.query}"
CATEGORY: {request.category.upper()}
{category_focus}
EVALUATION CRITERIA: {attributes}
You are an expert advisor. Answer "{request.query}" directly and analyze through each selected criterion.
Format:
## About: {request.query}
[Direct answer in 2-3 sentences]
---
**[Criterion Name]**
- How it applies to "{request.query}"
- Specific facts or numbers
- Recommendation
---
## Bottom Line
[2-3 sentence summary]
RULES:
- Every sentence must be about "{request.query}" specifically
- Use real numbers where confident
- If unsure say "verify on official website"
- Total under 400 words"""
print(f"🔄 Calling Vertex AI for: {request.query}")
for attempt in range(3):
try:
res = http_requests.post(
GEMINI_URL,
headers={
"Authorization": f"Bearer {get_access_token()}",
"Content-Type": "application/json"
},
json={
"contents": [{"role": "user", "parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0.3, "maxOutputTokens": 800}
},
timeout=30
)
if res.status_code == 429:
wait = 2 ** attempt
print(f"⏳ Rate limited, waiting {wait}s...")
time.sleep(wait)
continue
print(f"✅ Vertex response: {res.status_code}")
res.raise_for_status()
answer = res.json()["candidates"][0]["content"]["parts"][0]["text"]
return {"answer": answer}
except http_requests.exceptions.Timeout:
print(f"⏰ Timeout on attempt {attempt + 1}")
if attempt == 2:
return {"answer": "Request timed out. Please try again."}
time.sleep(2)
except Exception as e:
print(f"❌ Error: {type(e).__name__}")
return {"answer": "Error getting response. Please try again."}
return {"answer": "Could not get response after 3 attempts. Please try again."}
# ── /feedback endpoint ────────────────────────────────
class FeedbackRequest(BaseModel):
query: str
selected_chips: list[str]
domain: str = ""
@app.post("/feedback")
def feedback(request: FeedbackRequest):
try:
with engine.begin() as conn:
for chip in request.selected_chips:
conn.execute(text("""
INSERT INTO attribute_feedback (query, attribute_name, domain, click_count)
VALUES (:query, :attr, :domain, 1)
ON CONFLICT (query, attribute_name)
DO UPDATE SET click_count = attribute_feedback.click_count + 1,
last_clicked = NOW()
"""), {"query": request.query, "attr": chip, "domain": request.domain})
return {"status": "ok"}
except Exception as e:
print(f"Feedback write failed: {type(e).__name__}")
return {"status": "error"}