Azure Durable Functions - Fan-out/Fan-in Parallel Processing with .NET 8 Isolated
What is the Fan-out/Fan-in Pattern?
The Fan-out/Fan-in pattern in Azure Durable Functions is used to execute multiple functions in parallel (fan-out) and then aggregate the results once they are complete (fan-in). This pattern is highly useful for processing large workloads where each task can be executed independently, such as parallelizing tasks to improve performance and reduce execution time.
Use Case: Processing Large CSV Files in Chunks
In this post, we’ll demonstrate how to use the Fan-out/Fan-in pattern to process a large CSV file in chunks. Each chunk will be processed in parallel, and for every 5000 lines, a new process will begin to ensure scalability and efficiency.
Why Use Fan-out/Fan-in for CSV Processing?
When processing a large file, doing everything sequentially can be slow and inefficient. The Fan-out/Fan-in pattern lets you:
- Fan-out: Split the CSV file into chunks and process each chunk in parallel.
- Fan-in: Aggregate the results once all chunks are processed, allowing you to manage large workloads without hitting performance bottlenecks.
Simple Sample: Processing a Large CSV File in Chunks
DurableFunctionFanOutFanIn.cs
using Microsoft.Azure.Functions.Worker;using Microsoft.Azure.Functions.Worker.Http;using Microsoft.Extensions.Logging;using System.Net;using System.Threading.Tasks;using System.Collections.Generic;using System.Linq;
public class DurableFunctionFanOutFanIn{ private readonly ILogger _logger;
public DurableFunctionFanOutFanIn(ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<DurableFunctionFanOutFanIn>(); }
[Function("FanOutHttpStart")] public async Task<HttpResponseData> HttpStart( [HttpTrigger(AuthorizationLevel.Function, "get", "post")] HttpRequestData req, [DurableClient] DurableTaskClient client) { var instanceId = await client.StartNewAsync(nameof(RunOrchestrator), null); _logger.LogInformation($"Started orchestration with ID = '{instanceId}'.");
var response = req.CreateResponse(HttpStatusCode.OK); await response.WriteStringAsync($"Orchestration started with ID: {instanceId}");
return response; }
[Function("RunOrchestrator")] public async Task RunOrchestrator([DurableOrchestrationTrigger] IDurableOrchestrationContext context) { // Simulate reading and splitting the CSV into chunks var csvData = GetCsvData(); // Fetches a large CSV var chunkSize = 5000; // Process every 5000 lines
var chunks = csvData .Select((line, index) => new { line, index }) .GroupBy(x => x.index / chunkSize) .Select(g => g.Select(x => x.line).ToList()) .ToList();
var tasks = new List<Task<List<string>>>();
// Fan-out: Start parallel tasks for each chunk foreach (var chunk in chunks) { tasks.Add(context.CallActivityAsync<List<string>>(nameof(ProcessChunk), chunk)); }
// Wait for all chunks to be processed (Fan-in) var results = await Task.WhenAll(tasks);
// Combine the results var aggregatedResults = results.SelectMany(r => r).ToList();
_logger.LogInformation($"Processed {aggregatedResults.Count} lines in total."); }
[Function("ProcessChunk")] public List<string> ProcessChunk([ActivityTrigger] List<string> chunk) { // Simulate processing each chunk return chunk.Select(line => $"Processed: {line}").ToList(); }
private List<string> GetCsvData() { // Simulate reading a large CSV file return Enumerable.Range(1, 20000).Select(i => $"Line {i}").ToList(); }}
Program.cs
using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
var host = new HostBuilder() .ConfigureFunctionsWorkerDefaults() .ConfigureLogging(logging => { logging.AddConsole(); }) .Build();
host.Run();
Key Points:
-
**Fan-out: The large CSV file is split into chunks, and each chunk is processed in parallel using the ProcessChunk function.
-
**Fan-in: Once all chunks are processed, the results are aggregated and combined at the end.
-
**Chunking: The CSV file is processed in chunks of 5000 lines each, ensuring that the workload is distributed efficiently across multiple tasks.
How It Works:
-
CSV Splitting: The CSV file is split into chunks of 5000 lines, and each chunk is passed to the ProcessChunk activity function.
-
Parallel Processing: The chunks are processed in parallel, with each chunk being handled by a separate instance of the ProcessChunk function.
-
Aggregation: Once all chunks are processed, the results are combined into a single list.
-
Scalability: By breaking the workload into smaller chunks and processing them in parallel, the function can handle large files efficiently without bottlenecks.
Conclusion
The Fan-out/Fan-in pattern in Azure Durable Functions is a powerful tool for parallelizing tasks and processing large workloads efficiently. By splitting a large CSV file into manageable chunks and processing each in parallel, you can dramatically reduce the time required to handle big data. With the Isolated Process model in .NET 8, this pattern is flexible and future-proof, ensuring optimal performance in serverless workflows.