Add Scheduling and Triggers to Your AI Call Agent
In [Part 1](/blog/tutorial-ai-agent-phone-number) and [Part 2](/blog/tutorial-agent-crm-integration), you built an agent that makes warm calls with CRM context. But you're still triggering them manually. In this post, you'll wire up event-driven triggers so that when a new lead comes in, a call goes out automatically — within 60 seconds. 78% of buyers go with the first vendor to respond.
What you're building
An event-driven system that listens for new form submissions, immediately triggers a Spix call with the lead's context, retries once after 30 minutes if unanswered, and logs everything to your CRM.
The architecture
Two moving parts: a Flask webhook listener and a Celery worker. Redis ties them together. Form submission hits your Flask endpoint, which fires call_lead.delay() via Celery. Celery calls the Spix API, stores a session-to-lead mapping in Redis, and schedules a retry check 30 minutes later. When Spix fires a call.ended webhook, your handler cancels the retry if the call connected.
Redis key patterns
Two keys per call. The session mapping lets your webhook handler look up which lead a call belongs to. The retry key acts as a flag — if it still exists when the retry task fires, the call wasn't answered.
# Python
# Session-to-lead mapping (who is this call for?)
# Key: spix:session:{session_id}
# Value: lead_id
# TTL: 86400 (24 hours)
# Retry flag (should we retry this lead?)
# Key: spix:retry:{lead_id}
# Value: 1
# TTL: 3600 (1 hour)Celery task: call_lead
This is the core task. It pulls lead data from HubSpot, builds a dynamic briefing, fires the call via the Spix API, stores the Redis mappings, and schedules a retry check in 30 minutes.
# Python
import requests
import redis
from celery import Celery
from datetime import timedelta
from config import SPIX_API_KEY, SPIX_PHONE, SPIX_PLAYBOOK_ID, HUBSPOT_TOKEN
app = Celery('tasks', broker='redis://localhost:6379/0')
r = redis.Redis(host='localhost', port=6379, db=1, decode_responses=True)
@app.task(bind=True, max_retries=1)
def call_lead(self, lead_id: str):
"""Pull lead data, build briefing, fire Spix call, schedule retry."""
# 1. Pull lead from HubSpot
lead = requests.get(
f"https://api.hubapi.com/crm/v3/objects/contacts/{lead_id}",
headers={"Authorization": f"Bearer {HUBSPOT_TOKEN}"},
).json()
props = lead.get("properties", {})
name = f'{props.get("firstname", "")} {props.get("lastname", "")}'.strip()
phone = props.get("phone")
company = props.get("company", "Unknown")
source = props.get("hs_analytics_source", "direct")
if not phone:
return {"status": "skipped", "reason": "no_phone"}
# 2. Build dynamic briefing
briefing = (
f"You are calling {name} at {company}. "
f"They came in via {source}. "
f"Goal: qualify interest and book a 15-minute demo. "
f"Be warm, mention their company by name, keep it under 3 minutes."
)
# 3. Fire the call via Spix API
resp = requests.post(
"https://api.spix.sh/v1/calls",
headers={"Authorization": f"Bearer {SPIX_API_KEY}"},
json={
"to": phone,
"playbook_id": SPIX_PLAYBOOK_ID,
"sender": SPIX_PHONE,
"briefing_override": briefing,
},
)
resp.raise_for_status()
session_id = resp.json()["data"]["session_id"]
# 4. Store Redis mappings
r.setex(f"spix:session:{session_id}", 86400, lead_id)
r.setex(f"spix:retry:{lead_id}", 3600, "1")
# 5. Schedule retry check in 30 minutes
check_and_retry.apply_async(
args=[lead_id],
countdown=1800,
)
return {"status": "calling", "session_id": session_id, "lead": name}Celery task: check_and_retry
This task fires 30 minutes after the first call. If the retry key still exists in Redis (meaning the call wasn't answered or the webhook didn't clear it), it fires a second call attempt.
# Python
@app.task
def check_and_retry(lead_id: str):
"""If the retry key still exists, the first call wasn't answered."""
if not r.exists(f"spix:retry:{lead_id}"):
# Call was answered — webhook handler already cleared the key
return {"status": "no_retry_needed", "lead_id": lead_id}
# Clear the retry key so we don't loop
r.delete(f"spix:retry:{lead_id}")
# Fire second attempt
call_lead.delay(lead_id)
return {"status": "retrying", "lead_id": lead_id}Webhook handler: call.ended
When Spix fires a call.ended webhook, your handler looks up the lead from the session ID and checks the outcome. If the call connected, it clears the retry key so the check_and_retry task becomes a no-op.
# Python
from flask import Flask, request, jsonify
import redis
import requests
from config import HUBSPOT_TOKEN
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=1, decode_responses=True)
@app.route("/webhooks/spix", methods=["POST"])
def spix_webhook():
payload = request.json
event = payload.get("event")
session_id = payload.get("session_id")
if event != "call.ended":
return jsonify({"ok": True}), 200
lead_id = r.get(f"spix:session:{session_id}")
if not lead_id:
return jsonify({"ok": True}), 200
outcome = payload.get("outcome", "unknown")
summary = payload.get("summary", "")
duration = payload.get("duration", 0)
if outcome != "no_answer":
r.delete(f"spix:retry:{lead_id}")
# Log outcome to HubSpot
requests.post(
f"https://api.hubapi.com/crm/v3/objects/notes",
headers={"Authorization": f"Bearer {HUBSPOT_TOKEN}"},
json={
"properties": {
"hs_note_body": (
f"Spix call [{session_id}]\n"
f"Outcome: {outcome}\n"
f"Duration: {duration}s\n"
f"Summary: {summary}"
),
},
"associations": [{
"to": {"id": lead_id},
"types": [{"associationCategory": "HUBSPOT_DEFINED", "associationTypeId": 202}],
}],
},
)
return jsonify({"ok": True, "outcome": outcome}), 200Form submission endpoint
This endpoint receives Typeform (or any form provider) webhooks. It extracts the lead data, creates a contact in HubSpot, and fires the call_lead task immediately.
# Python
@app.route("/webhooks/form", methods=["POST"])
def form_submission():
"""Handle Typeform/HubSpot form webhook."""
data = request.json
answers = {a["field"]["ref"]: a for a in data.get("form_response", {}).get("answers", [])}
name = answers.get("name", {}).get("text", "Unknown")
email = answers.get("email", {}).get("email", "")
phone = answers.get("phone", {}).get("phone_number", "")
company = answers.get("company", {}).get("text", "")
if not phone:
return jsonify({"error": "no_phone"}), 400
hs_resp = requests.post(
"https://api.hubapi.com/crm/v3/objects/contacts",
headers={"Authorization": f"Bearer {HUBSPOT_TOKEN}"},
json={
"properties": {
"firstname": name.split()[0] if name else "",
"lastname": " ".join(name.split()[1:]) if name else "",
"email": email,
"phone": phone,
"company": company,
"hs_lead_status": "NEW",
}
},
)
lead_id = hs_resp.json()["id"]
from tasks import call_lead
call_lead.delay(lead_id)
return jsonify({"status": "calling", "lead_id": lead_id}), 202Timing breakdown
From form submission to ringing phone, the total latency is under 5 seconds:
- Form submitted: 0s
- Typeform webhook fires: ~1s
- Flask handler processes + HubSpot create: ~500ms
- Celery task queued + picked up: <500ms
- Spix API receives call request: <100ms
- Phone starts ringing: ~2-4s total
Business logic considerations
- Time-of-day filtering — don't call leads at 2am. Check the lead's timezone and schedule for 9am instead using Celery's eta parameter.
- Concurrency — Agent plan allows 2 concurrent calls. Rate-limit or upgrade to Operator ($99/mo, 10 concurrent).
- Do Not Call lists — check before firing any call. Legal requirement in most jurisdictions.
- Idempotency — use the lead ID as a Celery task ID to prevent duplicate calls from double-submitted forms.
What's next
In the final part, Part 4, you'll add email follow-ups — so when a prospect doesn't pick up after two calls, your agent sends a personalized email instead.