Adapters¶
An Adapter is any source that yields Conversation objects: a local file, a third-party API, a database, or a custom enterprise system. Detectors are source-agnostic — they operate on the Conversation schema and don't know or care where the data came from.
The contract¶
from chatbot_auditor import Adapter, Conversation
class Adapter:
name: ClassVar[str]
def fetch(self) -> Iterator[Conversation]: ...
def collect(self) -> list[Conversation]: ... # provided by base
Implementations yield conversations lazily. Pipelines can stream large datasets without holding everything in memory; .collect() materializes to a list when you need one.
The built-in adapters¶
| Adapter | Source | Needs extra | Notes |
|---|---|---|---|
JSONAdapter |
.json / .jsonl file |
none | auto-detects single / list / JSONL |
CSVAdapter |
.csv / .tsv file |
none | flexible headers, custom role mapping |
IntercomAdapter |
Intercom REST API | [intercom] |
cursor pagination, HTML cleaning, 429 retry |
ZendeskAdapter |
Zendesk REST API | [zendesk] |
OAuth or email+token, bot user IDs |
Writing your own¶
Any data source can be wrapped as an adapter:
from collections.abc import Iterator
from typing import ClassVar
from chatbot_auditor import Adapter, Conversation, Message, Role
class MyDatabaseAdapter(Adapter):
name: ClassVar[str] = "my_db"
def __init__(self, connection_string: str, days: int = 30) -> None:
self.connection_string = connection_string
self.days = days
def fetch(self) -> Iterator[Conversation]:
# query rows from your DB grouped by conversation...
for conv_id, messages in self._query_conversations():
yield Conversation(
id=conv_id,
platform="my_db",
messages=[
Message(role=Role(m.role), content=m.content, timestamp=m.ts)
for m in messages
],
reported_resolved=True,
)
Then plug it into the audit:
from chatbot_auditor import audit
adapter = MyDatabaseAdapter(connection_string="postgres://...", days=7)
detections = audit(adapter.fetch())
Best practices¶
Be lazy¶
Large exports can be millions of messages. Yield conversations one at a time rather than loading everything up front:
Normalize roles¶
Every source uses different terminology — "customer", "visitor", "requester", "end-user" are all your real customer. Map them to Role.USER at adapter boundaries so detectors don't need to know about the source's vocabulary.
Preserve provenance¶
Set Conversation.platform and stash source-specific fields in metadata rather than losing them. A detection's metadata can then link back to the original ticket / conversation in the source system.
Handle authentication gracefully¶
API adapters should accept credentials via constructor arguments AND environment variables, in that order:
def __init__(self, token: str | None = None):
self.token = token or os.environ.get("MY_SOURCE_TOKEN")
if not self.token:
raise ValueError("token required (pass token=... or set MY_SOURCE_TOKEN)")
Respect rate limits¶
For API sources, handle 429 responses with exponential backoff. Both IntercomAdapter and ZendeskAdapter do this — see their source for reference.
Injecting a test client¶
Every built-in API adapter accepts a pre-configured httpx.Client. This is how their tests run without hitting real APIs:
import httpx
from chatbot_auditor.adapters.intercom import IntercomAdapter
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, json={"conversations": [...], "pages": {}})
adapter = IntercomAdapter(
access_token="test",
client=httpx.Client(
base_url=IntercomAdapter.BASE_URL,
transport=httpx.MockTransport(handler),
),
)
The same pattern works for corporate proxies, custom retry policies, or VCR-style recorded replay.