Skip to content

Building Custom Connectors

ThinkWork ships with Slack, GitHub, and Google Workspace connectors. When you need to integrate a service that isn’t covered, you can build a custom connector using the connector recipe pattern.

A connector has four parts:

  1. Webhook handler — A Lambda function that receives events from the external service, validates them, and creates threads
  2. Credential vault integration — Secure storage for OAuth tokens or API keys
  3. Outbound skill pack — A SKILL.md with tools that call back to the external service
  4. Terraform module — Infrastructure for the Lambda, API Gateway route, and connector record

The examples/connector-recipe/ directory contains a complete, working connector template. Copy it and fill in the service-specific details.

examples/connector-recipe/
├── handler/
│ ├── main.py # Lambda handler for inbound events
│ ├── auth.py # Webhook signature verification
│ ├── thread.py # Thread creation helpers
│ └── requirements.txt
├── skill/
│ └── SKILL.md # Outbound tools (optional)
├── terraform/
│ ├── main.tf # Lambda, route, connector record
│ ├── variables.tf
│ └── outputs.tf
└── README.md

The handler receives raw HTTP requests from the external service, validates the signature, and calls the ThinkWork API to create threads.

handler/main.py
import json
import os
import boto3
import httpx
from auth import verify_signature
from thread import create_thread, send_initial_message
THINKWORK_API_URL = os.environ["THINKWORK_API_URL"]
THINKWORK_API_KEY = os.environ["THINKWORK_API_KEY"] # Internal service key
CONNECTOR_ID = os.environ["CONNECTOR_ID"]
DEFAULT_AGENT_ID = os.environ["DEFAULT_AGENT_ID"]
def handler(event, context):
body = event.get("body", "")
headers = event.get("headers", {})
# Step 1: Verify the webhook signature
try:
verify_signature(body, headers)
except ValueError as e:
return {"statusCode": 401, "body": json.dumps({"error": str(e)})}
# Step 2: Parse the event payload
payload = json.loads(body)
event_type = headers.get("x-your-service-event", "")
# Step 3: Decide whether to handle this event type
if event_type not in HANDLED_EVENT_TYPES:
return {"statusCode": 200, "body": json.dumps({"ignored": True})}
# Step 4: Extract the message content and metadata
message_body = extract_message(payload, event_type)
metadata = extract_metadata(payload, event_type)
# Step 5: Determine which agent should handle this
agent_id = route_to_agent(payload, event_type) or DEFAULT_AGENT_ID
# Step 6: Create the thread and send the initial message
thread = create_thread(
api_url=THINKWORK_API_URL,
api_key=THINKWORK_API_KEY,
channel="CUSTOM", # Register your channel type in the connector record
title=build_title(payload, event_type),
agent_id=agent_id,
metadata=metadata
)
send_initial_message(
api_url=THINKWORK_API_URL,
api_key=THINKWORK_API_KEY,
thread_id=thread["id"],
body=message_body
)
return {"statusCode": 200, "body": json.dumps({"threadId": thread["id"]})}

Always verify webhook signatures before processing. The pattern varies by service:

auth.py
import hashlib
import hmac
import os
import time
SIGNING_SECRET = os.environ["WEBHOOK_SIGNING_SECRET"]
def verify_signature(body: str, headers: dict) -> None:
"""
Verify the webhook signature. Raise ValueError if invalid.
Example: HMAC-SHA256 with timestamp anti-replay.
"""
timestamp = headers.get("x-timestamp", "")
signature = headers.get("x-signature", "")
# Anti-replay: reject requests older than 5 minutes
if abs(time.time() - int(timestamp)) > 300:
raise ValueError("Request timestamp too old")
expected = hmac.new(
SIGNING_SECRET.encode(),
f"{timestamp}:{body}".encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(f"sha256={expected}", signature):
raise ValueError("Invalid signature")

Step 2: Integrate with the credential vault

Section titled “Step 2: Integrate with the credential vault”

If your connector uses OAuth, store tokens in the credential vault.

Storing credentials (at OAuth callback time)

Section titled “Storing credentials (at OAuth callback time)”
import boto3
import json
def store_oauth_tokens(connector_id: str, user_id: str, tokens: dict) -> None:
"""Store OAuth tokens in the ThinkWork credential vault (DynamoDB + KMS)."""
ssm = boto3.client("ssm")
key = f"/thinkwork/connectors/{connector_id}/tokens/{user_id}"
ssm.put_parameter(
Name=key,
Value=json.dumps(tokens),
Type="SecureString",
KeyId=os.environ["KMS_KEY_ID"], # KMS key for credential vault
Overwrite=True
)
def get_oauth_tokens(connector_id: str, user_id: str) -> dict:
"""Retrieve OAuth tokens from the credential vault."""
ssm = boto3.client("ssm")
key = f"/thinkwork/connectors/{connector_id}/tokens/{user_id}"
response = ssm.get_parameter(Name=key, WithDecryption=True)
return json.loads(response["Parameter"]["Value"])
oauth_callback.py
import httpx
CLIENT_ID = os.environ["OAUTH_CLIENT_ID"]
CLIENT_SECRET = os.environ["OAUTH_CLIENT_SECRET"]
REDIRECT_URI = os.environ["OAUTH_REDIRECT_URI"]
def handle_oauth_callback(code: str, state: str) -> dict:
"""Exchange authorization code for tokens."""
response = httpx.post(
"https://your-service.com/oauth/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET
}
)
response.raise_for_status()
tokens = response.json()
# Parse state to get connector_id + user_id
connector_id, user_id = parse_state(state)
store_oauth_tokens(connector_id, user_id, tokens)
return {"status": "connected"}

For the agent to send responses back to the originating service, create a skill pack with outbound tools.

The skill’s header and instructions (skill/SKILL.md, first two sections):

# my-service
Connector for My Service. Allows posting responses and looking up data.
## Instructions
When responding to a My Service event, always use send_reply to post your
response back to the originating conversation. Do not just return text —
the user won't see it unless you call send_reply.

The skill’s Tools section (Python code block in the SKILL.md):

## Tools
import httpx
import os
import json
import boto3
def _get_token(user_id: str) -> str:
ssm = boto3.client("ssm")
key = f"/thinkwork/connectors/my-service/tokens/{user_id}"
response = ssm.get_parameter(Name=key, WithDecryption=True)
return json.loads(response["Parameter"]["Value"])["access_token"]
def send_reply(thread_id: str, message: str) -> dict:
"""
Post a reply back to the originating My Service conversation.
Args:
thread_id: The ThinkWork thread ID (used to look up the original message metadata)
message: The reply text to send
Returns:
dict with 'success' and 'message_id' fields
"""
# Look up thread metadata to get original conversation IDs
# ... implementation specific to your service
pass
def lookup_user(email: str) -> dict:
"""
Look up a user in My Service by email address.
Args:
email: User email address
Returns:
dict with user id, name, email, and role
"""
pass
terraform/main.tf
# Lambda function for webhook handler
resource "aws_lambda_function" "connector_handler" {
function_name = "${var.stage}-thinkwork-connector-my-service"
role = aws_iam_role.connector.arn
handler = "main.handler"
runtime = "python3.12"
timeout = 30
memory_size = 256
filename = data.archive_file.handler.output_path
source_code_hash = data.archive_file.handler.output_base64sha256
environment {
variables = {
THINKWORK_API_URL = var.thinkwork_api_url
CONNECTOR_ID = "my-service"
DEFAULT_AGENT_ID = var.default_agent_id
WEBHOOK_SIGNING_SECRET = var.webhook_signing_secret
KMS_KEY_ID = var.credential_vault_kms_key_id
OAUTH_CLIENT_ID = var.oauth_client_id
OAUTH_CLIENT_SECRET = var.oauth_client_secret
OAUTH_REDIRECT_URI = "${var.api_gateway_url}/connectors/my-service/callback"
}
}
}
# API Gateway route for inbound webhooks
resource "aws_apigatewayv2_route" "webhook" {
api_id = var.api_gateway_id
route_key = "POST /connectors/my-service/webhook"
target = "integrations/${aws_apigatewayv2_integration.connector.id}"
}
# API Gateway route for OAuth callback
resource "aws_apigatewayv2_route" "oauth_callback" {
api_id = var.api_gateway_id
route_key = "GET /connectors/my-service/callback"
target = "integrations/${aws_apigatewayv2_integration.connector.id}"
}

After deploying the infrastructure, register the connector in ThinkWork:

mutation RegisterConnector {
createConnector(input: {
id: "my-service"
name: "My Service"
description: "Integration with My Service"
webhookUrl: "https://api.example.com/connectors/my-service/webhook"
oauthCallbackUrl: "https://api.example.com/connectors/my-service/callback"
skillPackId: "my-service"
defaultAgentId: "agent-abc123"
enabled: true
}) {
id
status
}
}
  1. Deploy the connector infrastructure with terraform apply
  2. Configure the webhook URL in the external service’s settings
  3. Send a test event from the external service (most services have a “send test” button)
  4. Check the AWS Lambda logs: aws logs tail /aws/lambda/<stage>-thinkwork-connector-my-service --follow
  5. Check the ThinkWork admin app for the new thread