Knowledge Agent — unified access to mail, OneDrive and SharePoint
This page documents a recommended ingestion → embedding → vector store → retrieval → response workflow for a Knowledge Agent that needs unified access to mail, OneDrive and SharePoint. It also includes a Python pseudocode sketch for indexing and querying, and operational guidance for privacy, scopes, re-indexing, cost and security controls.
1) Workflow: ingestion → embedding → vector store → retrieval → response
- Ingestion: collect items from sources (mail folders, OneDrive files, SharePoint lists/libraries). For each item capture: source id, path/folder, modified_at, ETag/version, and metadata (sender, recipients, permissions). Use incremental harvests (see last-run section) and respect filters (excluded folders, PII tags).
- Preprocessing: normalize text, strip boilerplate (email signatures, repeat quoted text), extract structured metadata (dates, attachments, MIME), and optionally chunk long documents into coherent passages (preserve chunk->source mapping).
- Embedding: compute semantic vectors for each chunk using a stable embed model (versioned). Record embed_model_id and embed_timestamp for each vector.
- Vector store: upsert vectors with document id, chunk id, metadata (source, path, permissions, modified_at, etag, embed_model_id). Use metadata fields to filter retrieval by scope and access rights.
- Retrieval: run a semantic search (k-NN) against the vector store, apply metadata and permission filters, then optionally run a re-ranking step using a cross-encoder / LLM reranker to improve precision.
- Response synthesis: assemble retrieved passages, add citations (source link, last modified, and confidence), and run the LLM with an instruction to ground answers in the retrieved content and avoid hallucinations. Present the result with actionable traceability info (sources, exact snippets used).
2) Minimal contract (inputs/outputs, success criteria)
- Inputs: user query Q, caller identity & permissions, optional scope filters (mail/onedrive/sharepoint), K (number of results), last_run timestamp for incremental queries.
- Outputs: final text response, list of cited sources
[{source, path, chunk_id, score, snippet}], telemetry (latency, vector-store-counts), and a flag indicating if content was truncated or partial. - Success: returned answer cites >=1 trusted source when expected, no PII leakage outside allowed scope, and operations complete under cost SLA.
3) C# indexing + query snippets (Azure OpenAI embeddings + Azure Cognitive Search)
Below are compact C# snippets you can paste into a small tool or documentation snippet. They show the main flow: incremental listing, preprocessing/chunking, calling an embedding API (Azure OpenAI or OpenAI-compatible endpoint), and upserting to Azure Cognitive Search as documents with a vector field. The examples use HttpClient for embeddings and the Search Documents REST API for upserts/search; for production prefer the Azure SDKs (Azure.AI.OpenAI, Azure.Search.Documents) and proper retries/pagination.
// Minimal: get embedding from Azure OpenAI / OpenAI-compatible endpoint
async Task<float[]> GetEmbeddingAsync(HttpClient http, string endpoint, string apiKey, string deploymentOrModel, string text)
{
http.DefaultRequestHeaders.Clear();
http.DefaultRequestHeaders.Add("api-key", apiKey); // or Authorization: Bearer <key> for OpenAI
var body = new { input = text };
var url = $"{endpoint}/openai/deployments/{deploymentOrModel}/embeddings?api-version=2023-06-01-preview";
var resp = await http.PostAsJsonAsync(url, body);
resp.EnsureSuccessStatusCode();
using var doc = await JsonDocument.ParseAsync(await resp.Content.ReadAsStreamAsync());
// JSON shape: { data: [ { embedding: [..] } ] }
var embedding = doc.RootElement.GetProperty("data")[0].GetProperty("embedding");
var result = new float[embedding.GetArrayLength()];
for (int i = 0; i < result.Length; i++) result[i] = embedding[i].GetSingle();
return result;
}
// Simple upsert to Azure Cognitive Search: mergeOrUpload a document with a vector field `contentVector`
async Task UpsertDocumentToSearchAsync(HttpClient http, string searchEndpoint, string indexName, string apiKey, object document)
{
http.DefaultRequestHeaders.Clear();
http.DefaultRequestHeaders.Add("api-key", apiKey);
var url = $"{searchEndpoint}/indexes/{indexName}/docs/index?api-version=2021-04-30-Preview";
var body = new { value = new[] { new Dictionary<string, object>(document as IDictionary<string, object> ?? new Dictionary<string, object>()) { ["@search.action"] = "mergeOrUpload" } } };
var resp = await http.PostAsJsonAsync(url, body);
resp.EnsureSuccessStatusCode();
}
// Indexing flow (conceptual): list changed items, preprocess, embed chunks, upsert
async Task<(string newLastRunIso, int totalUpserted)> RunIndexingCycleAsync(
IEnumerable<ISource> sources,
string lastRunIso,
HttpClient http,
string oaEndpoint,
string oaApiKey,
string oaModel,
string searchEndpoint,
string searchIndex,
string searchApiKey)
{
var newLast = DateTime.UtcNow.ToString("o");
int total = 0;
foreach (var src in sources)
{
await foreach (var item in src.ListModifiedSinceAsync(lastRunIso))
{
var chunks = PreprocessAndChunk(item); // implement: normalize, remove signatures, chunk
foreach (var chunk in chunks)
{
var vec = await GetEmbeddingAsync(http, oaEndpoint, oaApiKey, oaModel, chunk.Text);
var doc = new Dictionary<string, object>
{
["id"] = $"{item.Id}:{chunk.Id}",
["source"] = src.Id,
["path"] = item.Path,
["modified_at"] = item.ModifiedAt,
["etag"] = item.ETag,
["contentVector"] = vec,
["chunk_text_sample"] = chunk.Text.Length > 200 ? chunk.Text[..200] : chunk.Text,
["embed_model_id"] = oaModel,
["embed_ts"] = DateTime.UtcNow.ToString("o")
};
await UpsertDocumentToSearchAsync(http, searchEndpoint, searchIndex, searchApiKey, doc);
total++;
}
}
}
return (newLast, total);
}
// Query-time: vector search with metadata filters (Azure Search vector query body example)
async Task<JsonDocument> VectorSearchAsync(HttpClient http, string searchEndpoint, string indexName, string apiKey, float[] queryVector, int k = 5, string filter = null)
{
http.DefaultRequestHeaders.Clear();
http.DefaultRequestHeaders.Add("api-key", apiKey);
var url = $"{searchEndpoint}/indexes/{indexName}/docs/search?api-version=2021-04-30-Preview";
var body = new
{
vector = new { value = queryVector, k = k, fields = "contentVector" },
filter = filter, // e.g. "source eq 'mail' and permissions/any(p:p eq 'shared')"
select = new[] { "id", "path", "chunk_text_sample", "@search.score" }
};
var resp = await http.PostAsJsonAsync(url, body);
resp.EnsureSuccessStatusCode();
return JsonDocument.Parse(await resp.Content.ReadAsStreamAsync());
}
Notes:
- The vector field name above is
contentVectorand must be defined in your Azure Cognitive Search index as a Collection(Edm.Single) withvectorDimensionsmatching your embedding model. - Use a transactional/state store to persist
last_run_isoand delta tokens per source to make indexing idempotent and resumeable. - In production prefer the Azure SDKs (Azure.AI.OpenAI, Azure.Search.Documents) for convenience, paging, retries, and type-safety.
4) Privacy, scopes and least privilege
- Scopes: request only Graph scopes needed for the features you expose. Example minimums:
- Mail read: Mail.Read (or Mail.Read.Shared) for reading content, Mail.Send only when sending is explicit and confirmed.
- Files: Files.Read.All or Files.Read for user-delegated OneDrive; SharePoint scopes like Sites.Read.All must be narrowly constrained and use tenant admin consent only when required.
- Prefer incremental and delegate flows (on-behalf-of) rather than broad application permissions where possible.
- PII handling: tag sensitive fields during preprocessing. Options:
- redact high-sensitivity items from vectors and text used for LLM prompts
- encrypt vectors at rest and restrict access to vector-store admin APIs
- provide per-user opt-out toggles and folder-level exclusions
- Consent & disclosure: show a clear consent screen describing what goes into the Knowledge Agent (mail indexing, file content). Use incremental consent to add scopes only when user requests a feature.
5) Re-indexing strategies and operational cadence
- Incremental runs: use the last successful run timestamp to query changed items. For Graph sources prefer delta APIs where available (drive delta, mail delta) to avoid scanning everything.
- Full re-index: required when embedding model changes, major preprocessing updates, or schema/metadata mapping changes. Steps: 1) mark vectors with embed_model_id, 2) create new embed pass producing new vectors, 3) swap alias/primary pointer to new index atomically or use versioning so queries can migrate gradually.
- Partial re-index: for a folder or site where changes indicate corruption or sensitive content removal.
- Canary runs: run reindex on a small subset first and run QA (retrieval precision, hallucination rate) before full rollout.
6) Cost considerations
- Embedding costs scale with number of chunks and embedding model price. Chunking strategy affects cost; prefer reasonably-sized chunks (200–800 tokens) to balance semantic quality and cost.
- Storage costs: vector store charges depend on vector dimension and number. Implement retention policies: expire or archive old vectors for stale, low-value content.
- Query cost: reranking with cross-encoders or LLM-based re-ranking increases runtime cost. Use them only for high-value queries or on-demand.
- Monitor and budget: collect telemetry (embeddings per day, vectors total, queries per minute, average k) and set quotas/alerts.
7) Security controls
- Authentication: use delegated auth (MSAL) for user access; prefer short-lived tokens and refresh tokens guarded by hardware security where possible.
- Authorization: at query time, enforce per-document ACLs (via metadata filters) and validate user principal against source permissions before returning snippets.
- Encryption: encrypt vectors and indexes at rest, use TLS in transit. Control admin API access using granular roles.
- Audit: log indexing runs, items processed, who requested queries, and which sources were returned. Ensure logs redact sensitive content and store them in a secure SIEM.
8) Tracking load since last run & storing last run
Design:
- Persist a small indexing-state record per source with fields:
{source_id, last_run_iso, total_documents_indexed, total_vectors_upserted, last_run_duration_seconds}. - Prefer delta tokens where available (Graph/drive delta) instead of simple timestamps to avoid missed or duplicate processing.
JSON Schema for a per-source state record (example state-schema.json):
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "IndexingState",
"type": "object",
"properties": {
"source_id": { "type": "string" },
"last_run_iso": { "type": "string", "format": "date-time" },
"delta_token": { "type": ["string", "null"], "description": "Optional delta token if source supports it" },
"total_documents_indexed": { "type": "integer", "minimum": 0 },
"total_vectors_upserted": { "type": "integer", "minimum": 0 },
"last_run_duration_seconds": { "type": "number", "minimum": 0 }
},
"required": ["source_id", "last_run_iso"]
}
And a minimal C# snippet to read/write state as JSON (file-backed example):
record IndexState(string SourceId, string LastRunIso, string? DeltaToken, int TotalDocumentsIndexed, int TotalVectorsUpserted, double LastRunDurationSeconds);
IndexState ReadState(string path)
{
if (!File.Exists(path)) return new IndexState("unknown", "1970-01-01T00:00:00Z", null, 0, 0, 0);
var json = File.ReadAllText(path);
return JsonSerializer.Deserialize<IndexState>(json)!;
}
void WriteState(string path, IndexState state)
{
var json = JsonSerializer.Serialize(state, new JsonSerializerOptions { WriteIndented = true });
File.WriteAllText(path, json);
}
Reindexing (atomic swap) example (concept):
- Create new index name, e.g.
documents_v2with the updated vector field settings. - Populate
documents_v2fully. - Atomically switch the alias
documents-activeto point todocuments_v2so readers start using the new index with zero downtime.
Minimal alias swap via the Azure Cognitive Search REST API (conceptual HTTP call):
// PUT https://<searchService>.search.windows.net/indexes/aliases/documents-active?api-version=2021-04-30-Preview
// body: { "indexes": ["documents_v2"] }
async Task SetIndexAliasAsync(HttpClient http, string searchEndpoint, string aliasName, string indexName, string apiKey)
{
http.DefaultRequestHeaders.Clear();
http.DefaultRequestHeaders.Add("api-key", apiKey);
var url = $"{searchEndpoint}/indexes/aliases/{aliasName}?api-version=2021-04-30-Preview";
var body = new { indexes = new[] { indexName } };
var resp = await http.PutAsJsonAsync(url, body);
resp.EnsureSuccessStatusCode();
}
Notes:
- Alias/atomic-swap support varies by vector-store. Azure Cognitive Search supports index aliases which you can flip atomically; Pinecone offers namespace/versioning patterns. Choose the mechanism supported by your store and plan rollbacks (keep the previous index for a grace period).
- Test the alias swap in a staging environment and include health-checks (sample queries, precision metrics) before flipping the alias in production.
9) Edge cases and resiliency
- Missing permissions: gracefully skip items that the agent lacks permission to read and record them in the run report for administrators.
- Large attachments: only index attachments allowed by policy; store pointer to attachment instead of content if attachments are binary or exceed thresholds.
- Concurrent modifications: use ETag/version to detect races and either requeue or skip depending on policy.
10) Summary / checklist before production
- Use least privilege OAuth scopes and incremental/delta APIs.
- Tag vectors with embed model id; plan for safe reindexing when models change.
- Persist an atomic last-run state per source and use delta tokens when available.
- Monitor embed and query costs; implement quotas and retention to control storage and compute spend.
- Enforce ACL-based filters at query time and audit all indexing/query operations.