Building Custom Connectors
ThinkWork ships with Slack, GitHub, and Google Workspace connectors. When you need to integrate a service that isn’t covered, you can build a custom connector using the connector recipe pattern.
A connector has four parts:
- Webhook handler — A Lambda function that receives events from the external service, validates them, and creates threads
- Credential vault integration — Secure storage for OAuth tokens or API keys
- Outbound skill pack — A
SKILL.mdwith tools that call back to the external service - Terraform module — Infrastructure for the Lambda, API Gateway route, and connector record
The connector recipe
Section titled “The connector recipe”The examples/connector-recipe/ directory contains a complete, working connector template. Copy it and fill in the service-specific details.
examples/connector-recipe/├── handler/│ ├── main.py # Lambda handler for inbound events│ ├── auth.py # Webhook signature verification│ ├── thread.py # Thread creation helpers│ └── requirements.txt├── skill/│ └── SKILL.md # Outbound tools (optional)├── terraform/│ ├── main.tf # Lambda, route, connector record│ ├── variables.tf│ └── outputs.tf└── README.mdStep 1: Build the webhook handler
Section titled “Step 1: Build the webhook handler”The handler receives raw HTTP requests from the external service, validates the signature, and calls the ThinkWork API to create threads.
import jsonimport osimport boto3import httpx
from auth import verify_signaturefrom thread import create_thread, send_initial_message
THINKWORK_API_URL = os.environ["THINKWORK_API_URL"]THINKWORK_API_KEY = os.environ["THINKWORK_API_KEY"] # Internal service keyCONNECTOR_ID = os.environ["CONNECTOR_ID"]DEFAULT_AGENT_ID = os.environ["DEFAULT_AGENT_ID"]
def handler(event, context): body = event.get("body", "") headers = event.get("headers", {})
# Step 1: Verify the webhook signature try: verify_signature(body, headers) except ValueError as e: return {"statusCode": 401, "body": json.dumps({"error": str(e)})}
# Step 2: Parse the event payload payload = json.loads(body) event_type = headers.get("x-your-service-event", "")
# Step 3: Decide whether to handle this event type if event_type not in HANDLED_EVENT_TYPES: return {"statusCode": 200, "body": json.dumps({"ignored": True})}
# Step 4: Extract the message content and metadata message_body = extract_message(payload, event_type) metadata = extract_metadata(payload, event_type)
# Step 5: Determine which agent should handle this agent_id = route_to_agent(payload, event_type) or DEFAULT_AGENT_ID
# Step 6: Create the thread and send the initial message thread = create_thread( api_url=THINKWORK_API_URL, api_key=THINKWORK_API_KEY, channel="CUSTOM", # Register your channel type in the connector record title=build_title(payload, event_type), agent_id=agent_id, metadata=metadata )
send_initial_message( api_url=THINKWORK_API_URL, api_key=THINKWORK_API_KEY, thread_id=thread["id"], body=message_body )
return {"statusCode": 200, "body": json.dumps({"threadId": thread["id"]})}Signature verification
Section titled “Signature verification”Always verify webhook signatures before processing. The pattern varies by service:
import hashlibimport hmacimport osimport time
SIGNING_SECRET = os.environ["WEBHOOK_SIGNING_SECRET"]
def verify_signature(body: str, headers: dict) -> None: """ Verify the webhook signature. Raise ValueError if invalid. Example: HMAC-SHA256 with timestamp anti-replay. """ timestamp = headers.get("x-timestamp", "") signature = headers.get("x-signature", "")
# Anti-replay: reject requests older than 5 minutes if abs(time.time() - int(timestamp)) > 300: raise ValueError("Request timestamp too old")
expected = hmac.new( SIGNING_SECRET.encode(), f"{timestamp}:{body}".encode(), hashlib.sha256 ).hexdigest()
if not hmac.compare_digest(f"sha256={expected}", signature): raise ValueError("Invalid signature")Step 2: Integrate with the credential vault
Section titled “Step 2: Integrate with the credential vault”If your connector uses OAuth, store tokens in the credential vault.
Storing credentials (at OAuth callback time)
Section titled “Storing credentials (at OAuth callback time)”import boto3import json
def store_oauth_tokens(connector_id: str, user_id: str, tokens: dict) -> None: """Store OAuth tokens in the ThinkWork credential vault (DynamoDB + KMS).""" ssm = boto3.client("ssm")
key = f"/thinkwork/connectors/{connector_id}/tokens/{user_id}" ssm.put_parameter( Name=key, Value=json.dumps(tokens), Type="SecureString", KeyId=os.environ["KMS_KEY_ID"], # KMS key for credential vault Overwrite=True )
def get_oauth_tokens(connector_id: str, user_id: str) -> dict: """Retrieve OAuth tokens from the credential vault.""" ssm = boto3.client("ssm")
key = f"/thinkwork/connectors/{connector_id}/tokens/{user_id}" response = ssm.get_parameter(Name=key, WithDecryption=True) return json.loads(response["Parameter"]["Value"])OAuth callback handler
Section titled “OAuth callback handler”import httpx
CLIENT_ID = os.environ["OAUTH_CLIENT_ID"]CLIENT_SECRET = os.environ["OAUTH_CLIENT_SECRET"]REDIRECT_URI = os.environ["OAUTH_REDIRECT_URI"]
def handle_oauth_callback(code: str, state: str) -> dict: """Exchange authorization code for tokens.""" response = httpx.post( "https://your-service.com/oauth/token", data={ "grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET } ) response.raise_for_status() tokens = response.json()
# Parse state to get connector_id + user_id connector_id, user_id = parse_state(state) store_oauth_tokens(connector_id, user_id, tokens)
return {"status": "connected"}Step 3: Build the outbound skill pack
Section titled “Step 3: Build the outbound skill pack”For the agent to send responses back to the originating service, create a skill pack with outbound tools.
The skill’s header and instructions (skill/SKILL.md, first two sections):
# my-service
Connector for My Service. Allows posting responses and looking up data.
## Instructions
When responding to a My Service event, always use send_reply to post yourresponse back to the originating conversation. Do not just return text —the user won't see it unless you call send_reply.The skill’s Tools section (Python code block in the SKILL.md):
## Tools
import httpximport osimport jsonimport boto3
def _get_token(user_id: str) -> str: ssm = boto3.client("ssm") key = f"/thinkwork/connectors/my-service/tokens/{user_id}" response = ssm.get_parameter(Name=key, WithDecryption=True) return json.loads(response["Parameter"]["Value"])["access_token"]
def send_reply(thread_id: str, message: str) -> dict: """ Post a reply back to the originating My Service conversation.
Args: thread_id: The ThinkWork thread ID (used to look up the original message metadata) message: The reply text to send
Returns: dict with 'success' and 'message_id' fields """ # Look up thread metadata to get original conversation IDs # ... implementation specific to your service pass
def lookup_user(email: str) -> dict: """ Look up a user in My Service by email address.
Args: email: User email address
Returns: dict with user id, name, email, and role """ passStep 4: Terraform module
Section titled “Step 4: Terraform module”# Lambda function for webhook handlerresource "aws_lambda_function" "connector_handler" { function_name = "${var.stage}-thinkwork-connector-my-service" role = aws_iam_role.connector.arn handler = "main.handler" runtime = "python3.12" timeout = 30 memory_size = 256
filename = data.archive_file.handler.output_path source_code_hash = data.archive_file.handler.output_base64sha256
environment { variables = { THINKWORK_API_URL = var.thinkwork_api_url CONNECTOR_ID = "my-service" DEFAULT_AGENT_ID = var.default_agent_id WEBHOOK_SIGNING_SECRET = var.webhook_signing_secret KMS_KEY_ID = var.credential_vault_kms_key_id OAUTH_CLIENT_ID = var.oauth_client_id OAUTH_CLIENT_SECRET = var.oauth_client_secret OAUTH_REDIRECT_URI = "${var.api_gateway_url}/connectors/my-service/callback" } }}
# API Gateway route for inbound webhooksresource "aws_apigatewayv2_route" "webhook" { api_id = var.api_gateway_id route_key = "POST /connectors/my-service/webhook" target = "integrations/${aws_apigatewayv2_integration.connector.id}"}
# API Gateway route for OAuth callbackresource "aws_apigatewayv2_route" "oauth_callback" { api_id = var.api_gateway_id route_key = "GET /connectors/my-service/callback" target = "integrations/${aws_apigatewayv2_integration.connector.id}"}Registering the connector
Section titled “Registering the connector”After deploying the infrastructure, register the connector in ThinkWork:
mutation RegisterConnector { createConnector(input: { id: "my-service" name: "My Service" description: "Integration with My Service" webhookUrl: "https://api.example.com/connectors/my-service/webhook" oauthCallbackUrl: "https://api.example.com/connectors/my-service/callback" skillPackId: "my-service" defaultAgentId: "agent-abc123" enabled: true }) { id status }}Testing your connector
Section titled “Testing your connector”- Deploy the connector infrastructure with
terraform apply - Configure the webhook URL in the external service’s settings
- Send a test event from the external service (most services have a “send test” button)
- Check the AWS Lambda logs:
aws logs tail /aws/lambda/<stage>-thinkwork-connector-my-service --follow - Check the ThinkWork admin app for the new thread