← Back to blog
TutorialMarch 12, 202612 min read

Add Scheduling and Triggers to Your AI Call Agent

In [Part 1](/blog/tutorial-ai-agent-phone-number) and [Part 2](/blog/tutorial-agent-crm-integration), you built an agent that makes warm calls with CRM context. But you're still triggering them manually. In this post, you'll wire up event-driven triggers so that when a new lead comes in, a call goes out automatically — within 60 seconds. 78% of buyers go with the first vendor to respond.

What you're building

An event-driven system that listens for new form submissions, immediately triggers a Spix call with the lead's context, retries once after 30 minutes if unanswered, and logs everything to your CRM.

The architecture

Two moving parts: a Flask webhook listener and a Celery worker. Redis ties them together. Form submission hits your Flask endpoint, which fires call_lead.delay() via Celery. Celery calls the Spix API, stores a session-to-lead mapping in Redis, and schedules a retry check 30 minutes later. When Spix fires a call.ended webhook, your handler cancels the retry if the call connected.

Redis key patterns

Two keys per call. The session mapping lets your webhook handler look up which lead a call belongs to. The retry key acts as a flag — if it still exists when the retry task fires, the call wasn't answered.

# Python
# Session-to-lead mapping (who is this call for?)
# Key:   spix:session:{session_id}
# Value: lead_id
# TTL:   86400 (24 hours)

# Retry flag (should we retry this lead?)
# Key:   spix:retry:{lead_id}
# Value: 1
# TTL:   3600 (1 hour)

Celery task: call_lead

This is the core task. It pulls lead data from HubSpot, builds a dynamic briefing, fires the call via the Spix API, stores the Redis mappings, and schedules a retry check in 30 minutes.

# Python
import requests
import redis
from celery import Celery
from datetime import timedelta
from config import SPIX_API_KEY, SPIX_PHONE, SPIX_PLAYBOOK_ID, HUBSPOT_TOKEN

app = Celery('tasks', broker='redis://localhost:6379/0')
r = redis.Redis(host='localhost', port=6379, db=1, decode_responses=True)


@app.task(bind=True, max_retries=1)
def call_lead(self, lead_id: str):
    """Pull lead data, build briefing, fire Spix call, schedule retry."""

    # 1. Pull lead from HubSpot
    lead = requests.get(
        f"https://api.hubapi.com/crm/v3/objects/contacts/{lead_id}",
        headers={"Authorization": f"Bearer {HUBSPOT_TOKEN}"},
    ).json()

    props = lead.get("properties", {})
    name = f'{props.get("firstname", "")} {props.get("lastname", "")}'.strip()
    phone = props.get("phone")
    company = props.get("company", "Unknown")
    source = props.get("hs_analytics_source", "direct")

    if not phone:
        return {"status": "skipped", "reason": "no_phone"}

    # 2. Build dynamic briefing
    briefing = (
        f"You are calling {name} at {company}. "
        f"They came in via {source}. "
        f"Goal: qualify interest and book a 15-minute demo. "
        f"Be warm, mention their company by name, keep it under 3 minutes."
    )

    # 3. Fire the call via Spix API
    resp = requests.post(
        "https://api.spix.sh/v1/calls",
        headers={"Authorization": f"Bearer {SPIX_API_KEY}"},
        json={
            "to": phone,
            "playbook_id": SPIX_PLAYBOOK_ID,
            "sender": SPIX_PHONE,
            "briefing_override": briefing,
        },
    )
    resp.raise_for_status()
    session_id = resp.json()["data"]["session_id"]

    # 4. Store Redis mappings
    r.setex(f"spix:session:{session_id}", 86400, lead_id)
    r.setex(f"spix:retry:{lead_id}", 3600, "1")

    # 5. Schedule retry check in 30 minutes
    check_and_retry.apply_async(
        args=[lead_id],
        countdown=1800,
    )

    return {"status": "calling", "session_id": session_id, "lead": name}

Celery task: check_and_retry

This task fires 30 minutes after the first call. If the retry key still exists in Redis (meaning the call wasn't answered or the webhook didn't clear it), it fires a second call attempt.

# Python
@app.task
def check_and_retry(lead_id: str):
    """If the retry key still exists, the first call wasn't answered."""

    if not r.exists(f"spix:retry:{lead_id}"):
        # Call was answered — webhook handler already cleared the key
        return {"status": "no_retry_needed", "lead_id": lead_id}

    # Clear the retry key so we don't loop
    r.delete(f"spix:retry:{lead_id}")

    # Fire second attempt
    call_lead.delay(lead_id)
    return {"status": "retrying", "lead_id": lead_id}

Webhook handler: call.ended

When Spix fires a call.ended webhook, your handler looks up the lead from the session ID and checks the outcome. If the call connected, it clears the retry key so the check_and_retry task becomes a no-op.

# Python
from flask import Flask, request, jsonify
import redis
import requests
from config import HUBSPOT_TOKEN

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=1, decode_responses=True)


@app.route("/webhooks/spix", methods=["POST"])
def spix_webhook():
    payload = request.json
    event = payload.get("event")
    session_id = payload.get("session_id")

    if event != "call.ended":
        return jsonify({"ok": True}), 200

    lead_id = r.get(f"spix:session:{session_id}")
    if not lead_id:
        return jsonify({"ok": True}), 200

    outcome = payload.get("outcome", "unknown")
    summary = payload.get("summary", "")
    duration = payload.get("duration", 0)

    if outcome != "no_answer":
        r.delete(f"spix:retry:{lead_id}")

    # Log outcome to HubSpot
    requests.post(
        f"https://api.hubapi.com/crm/v3/objects/notes",
        headers={"Authorization": f"Bearer {HUBSPOT_TOKEN}"},
        json={
            "properties": {
                "hs_note_body": (
                    f"Spix call [{session_id}]\n"
                    f"Outcome: {outcome}\n"
                    f"Duration: {duration}s\n"
                    f"Summary: {summary}"
                ),
            },
            "associations": [{
                "to": {"id": lead_id},
                "types": [{"associationCategory": "HUBSPOT_DEFINED", "associationTypeId": 202}],
            }],
        },
    )

    return jsonify({"ok": True, "outcome": outcome}), 200

Form submission endpoint

This endpoint receives Typeform (or any form provider) webhooks. It extracts the lead data, creates a contact in HubSpot, and fires the call_lead task immediately.

# Python
@app.route("/webhooks/form", methods=["POST"])
def form_submission():
    """Handle Typeform/HubSpot form webhook."""
    data = request.json

    answers = {a["field"]["ref"]: a for a in data.get("form_response", {}).get("answers", [])}
    name = answers.get("name", {}).get("text", "Unknown")
    email = answers.get("email", {}).get("email", "")
    phone = answers.get("phone", {}).get("phone_number", "")
    company = answers.get("company", {}).get("text", "")

    if not phone:
        return jsonify({"error": "no_phone"}), 400

    hs_resp = requests.post(
        "https://api.hubapi.com/crm/v3/objects/contacts",
        headers={"Authorization": f"Bearer {HUBSPOT_TOKEN}"},
        json={
            "properties": {
                "firstname": name.split()[0] if name else "",
                "lastname": " ".join(name.split()[1:]) if name else "",
                "email": email,
                "phone": phone,
                "company": company,
                "hs_lead_status": "NEW",
            }
        },
    )
    lead_id = hs_resp.json()["id"]

    from tasks import call_lead
    call_lead.delay(lead_id)

    return jsonify({"status": "calling", "lead_id": lead_id}), 202

Timing breakdown

From form submission to ringing phone, the total latency is under 5 seconds:

  • Form submitted: 0s
  • Typeform webhook fires: ~1s
  • Flask handler processes + HubSpot create: ~500ms
  • Celery task queued + picked up: <500ms
  • Spix API receives call request: <100ms
  • Phone starts ringing: ~2-4s total

Business logic considerations

  • Time-of-day filtering — don't call leads at 2am. Check the lead's timezone and schedule for 9am instead using Celery's eta parameter.
  • Concurrency — Agent plan allows 2 concurrent calls. Rate-limit or upgrade to Operator ($99/mo, 10 concurrent).
  • Do Not Call lists — check before firing any call. Legal requirement in most jurisdictions.
  • Idempotency — use the lead ID as a Celery task ID to prevent duplicate calls from double-submitted forms.

What's next

In the final part, Part 4, you'll add email follow-ups — so when a prospect doesn't pick up after two calls, your agent sends a personalized email instead.