Skip to content

Adapters

An Adapter is any source that yields Conversation objects: a local file, a third-party API, a database, or a custom enterprise system. Detectors are source-agnostic — they operate on the Conversation schema and don't know or care where the data came from.

The contract

from chatbot_auditor import Adapter, Conversation

class Adapter:
    name: ClassVar[str]

    def fetch(self) -> Iterator[Conversation]: ...
    def collect(self) -> list[Conversation]: ...  # provided by base

Implementations yield conversations lazily. Pipelines can stream large datasets without holding everything in memory; .collect() materializes to a list when you need one.

The built-in adapters

Adapter Source Needs extra Notes
JSONAdapter .json / .jsonl file none auto-detects single / list / JSONL
CSVAdapter .csv / .tsv file none flexible headers, custom role mapping
IntercomAdapter Intercom REST API [intercom] cursor pagination, HTML cleaning, 429 retry
ZendeskAdapter Zendesk REST API [zendesk] OAuth or email+token, bot user IDs

Writing your own

Any data source can be wrapped as an adapter:

from collections.abc import Iterator
from typing import ClassVar

from chatbot_auditor import Adapter, Conversation, Message, Role


class MyDatabaseAdapter(Adapter):
    name: ClassVar[str] = "my_db"

    def __init__(self, connection_string: str, days: int = 30) -> None:
        self.connection_string = connection_string
        self.days = days

    def fetch(self) -> Iterator[Conversation]:
        # query rows from your DB grouped by conversation...
        for conv_id, messages in self._query_conversations():
            yield Conversation(
                id=conv_id,
                platform="my_db",
                messages=[
                    Message(role=Role(m.role), content=m.content, timestamp=m.ts)
                    for m in messages
                ],
                reported_resolved=True,
            )

Then plug it into the audit:

from chatbot_auditor import audit

adapter = MyDatabaseAdapter(connection_string="postgres://...", days=7)
detections = audit(adapter.fetch())

Best practices

Be lazy

Large exports can be millions of messages. Yield conversations one at a time rather than loading everything up front:

def fetch(self) -> Iterator[Conversation]:
    for raw in self._stream_source():
        yield self._parse(raw)

Normalize roles

Every source uses different terminology — "customer", "visitor", "requester", "end-user" are all your real customer. Map them to Role.USER at adapter boundaries so detectors don't need to know about the source's vocabulary.

Preserve provenance

Set Conversation.platform and stash source-specific fields in metadata rather than losing them. A detection's metadata can then link back to the original ticket / conversation in the source system.

Handle authentication gracefully

API adapters should accept credentials via constructor arguments AND environment variables, in that order:

def __init__(self, token: str | None = None):
    self.token = token or os.environ.get("MY_SOURCE_TOKEN")
    if not self.token:
        raise ValueError("token required (pass token=... or set MY_SOURCE_TOKEN)")

Respect rate limits

For API sources, handle 429 responses with exponential backoff. Both IntercomAdapter and ZendeskAdapter do this — see their source for reference.

Injecting a test client

Every built-in API adapter accepts a pre-configured httpx.Client. This is how their tests run without hitting real APIs:

import httpx
from chatbot_auditor.adapters.intercom import IntercomAdapter

def handler(request: httpx.Request) -> httpx.Response:
    return httpx.Response(200, json={"conversations": [...], "pages": {}})

adapter = IntercomAdapter(
    access_token="test",
    client=httpx.Client(
        base_url=IntercomAdapter.BASE_URL,
        transport=httpx.MockTransport(handler),
    ),
)

The same pattern works for corporate proxies, custom retry policies, or VCR-style recorded replay.