import os, requests, time
from collections import defaultdict
SEG_TOKEN = os.environ["SEGMENT_TOKEN"]
MV = os.environ["MAVERA_API_KEY"]
MB = "https://app.mavera.io/api/v1"
MH = {"Authorization": f"Bearer {MV}", "Content-Type": "application/json"}
SH = {"Authorization": f"Bearer {SEG_TOKEN}", "Content-Type": "application/json"}
SPACE_ID = os.environ.get("SEGMENT_SPACE_ID", "spa_xxxxx")
r = requests.get(
f"https://api.segmentapis.com/spaces/{SPACE_ID}/collections/users/profiles",
headers=SH,
params={"limit": 200, "include": "traits"},
)
r.raise_for_status()
profiles_data = r.json()
profiles = profiles_data.get("data", {}).get("profiles", [])
if not profiles:
profiles = [
{"traits": {"ltv": 1200, "purchase_count": 8, "engagement_tier": "high", "churn_risk": 0.05, "plan": "pro", "industry": "SaaS"}},
{"traits": {"ltv": 450, "purchase_count": 3, "engagement_tier": "medium", "churn_risk": 0.22, "plan": "starter", "industry": "E-commerce"}},
{"traits": {"ltv": 85, "purchase_count": 1, "engagement_tier": "low", "churn_risk": 0.65, "plan": "free", "industry": "Agency"}},
] * 20
print(f"Fetched {len(profiles)} profiles with traits\n")
clusters = {"high_value": [], "growth": [], "at_risk": [], "new": []}
for p in profiles:
traits = p.get("traits", {})
ltv = float(traits.get("ltv", traits.get("lifetime_value", 0)) or 0)
purchases = int(traits.get("purchase_count", traits.get("order_count", 0)) or 0)
churn_risk = float(traits.get("churn_risk", traits.get("churn_probability", 0.5)) or 0.5)
tier = traits.get("engagement_tier", "unknown")
if ltv > 500 and churn_risk < 0.15:
clusters["high_value"].append(traits)
elif ltv > 100 and purchases >= 2:
clusters["growth"].append(traits)
elif churn_risk > 0.4:
clusters["at_risk"].append(traits)
else:
clusters["new"].append(traits)
def summarize_cluster(users, label):
if not users:
return None
avg_ltv = sum(float(u.get("ltv", 0) or 0) for u in users) / len(users)
avg_purchases = sum(int(u.get("purchase_count", 0) or 0) for u in users) / len(users)
avg_churn = sum(float(u.get("churn_risk", 0) or 0) for u in users) / len(users)
industries = defaultdict(int)
plans = defaultdict(int)
for u in users:
if u.get("industry"): industries[u["industry"]] += 1
if u.get("plan"): plans[u["plan"]] += 1
top_ind = sorted(industries, key=industries.get, reverse=True)[:3]
top_plans = sorted(plans, key=plans.get, reverse=True)[:3]
return {
"label": label, "n": len(users),
"avg_ltv": avg_ltv, "avg_purchases": avg_purchases, "avg_churn": avg_churn,
"industries": top_ind, "plans": top_plans,
}
created = []
for cluster_name, users in clusters.items():
summary = summarize_cluster(users, cluster_name)
if not summary or summary["n"] < 3:
continue
label = cluster_name.replace("_", " ").title()
persona = requests.post(f"{MB}/personas", headers=MH, json={
"name": f"Segment CDP: {label}",
"description": (
f"{label} segment from Segment computed traits. N={summary['n']}. "
f"Avg LTV: ${summary['avg_ltv']:.0f}. Avg purchases: {summary['avg_purchases']:.1f}. "
f"Avg churn risk: {summary['avg_churn']:.0%}. "
f"Industries: {', '.join(summary['industries'])}. "
f"Plans: {', '.join(summary['plans'])}."
),
"demographic": {
"industries": summary["industries"],
"plans": summary["plans"],
},
"psychographic": {
"cluster": cluster_name,
"avg_ltv": summary["avg_ltv"],
"avg_churn_risk": summary["avg_churn"],
"engagement_tier": "high" if summary["avg_ltv"] > 500 else "medium" if summary["avg_ltv"] > 100 else "low",
},
}).json()
created.append({"cluster": label, "id": persona["id"], "n": summary["n"]})
print(f" {label}: {persona['id']} (N={summary['n']}, LTV=${summary['avg_ltv']:.0f}, churn={summary['avg_churn']:.0%})")
time.sleep(0.3)
print(f"\nCreated {len(created)} CDP-enriched personas")