Skip to content

Implement Data Import

Granit.DataExchange provides a mini-ETL pipeline for importing and exporting tabular data. The import pipeline follows four steps — Extract, Map, Validate, Execute — with streaming IAsyncEnumerable processing, intelligent column mapping suggestions, and roundtrip (INSERT vs UPDATE) support.

  • A working Granit application with EF Core configured
  • A domain entity to import into (e.g., Patient)
  • Granit.Validation for row-level validation
Terminal window
dotnet add package Granit.DataExchange
dotnet add package Granit.DataExchange.EntityFrameworkCore
dotnet add package Granit.DataExchange.Endpoints

Then add one or both file parsers:

Terminal window
dotnet add package Granit.DataExchange.Csv
// Core import pipeline
services.AddGranitDataExchange();
// File parsers (at least one required)
services.AddGranitDataExchangeCsv();
services.AddGranitDataExchangeExcel();
// EF Core persistence (import jobs, saved mappings)
builder.AddGranitDataExchangeEntityFrameworkCore(opts =>
opts.UseNpgsql(connectionString));
// Import definition and executor for your entity
services.AddImportDefinition<Patient, PatientImportDefinition>();
services.AddImportExecutor<Patient, AppDbContext>();
services.AddBusinessKeyResolver<Patient, AppDbContext>();

Each importable entity needs an ImportDefinition<TEntity> that declares which properties are importable (whitelist), their display names, aliases, and the business key used for INSERT vs UPDATE resolution:

public sealed class PatientImportDefinition : ImportDefinition<Patient>
{
public override string Name => "Acme.PatientImport";
protected override void Configure(ImportDefinitionBuilder<Patient> builder)
{
builder
.HasBusinessKey(p => p.Niss)
.Property(p => p.Niss, p => p
.DisplayName("NISS")
.Aliases("National ID", "Numéro national")
.Required())
.Property(p => p.FirstName, p => p
.DisplayName("First name")
.Aliases("Prénom", "Voornaam"))
.Property(p => p.LastName, p => p
.DisplayName("Last name")
.Aliases("Nom", "Achternaam"))
.Property(p => p.Email, p => p
.DisplayName("Email")
.Aliases("Courriel", "Mail", "E-mail"))
.Property(p => p.BirthDate, p => p
.DisplayName("Date of birth")
.Format("dd/MM/yyyy"))
.ExcludeOnUpdate(p => p.CreatedAt);
}
}
MethodDescription
Property(expr, config?)Declare an importable property (whitelist)
HasBusinessKey(expr)Business key for INSERT vs UPDATE resolution
HasCompositeKey(exprs)Composite key (multiple properties)
HasExternalId()External ID resolution (Odoo pattern)
ExcludeOnUpdate(expr)Never overwrite this property on UPDATE
GroupBy(column)Group rows for parent/child imports
HasMany(collection, config)Child collection (requires GroupBy)

The pipeline processes files through four stages, all streaming via IAsyncEnumerable. Only the current batch (default 500 entities) and accumulated errors are held in memory.

Upload --> Preview --> Confirm mappings --> Execute (async)

IFileParser reads the uploaded file and produces a stream of RawImportRow values. Each row contains a dictionary of column names to string values.

services.AddGranitDataExchangeCsv();
// Sep (MIT, SIMD AVX-512/NEON) handles RFC 4180 quoting,
// configurable separators, and UTF-8 with BOM detection.

CSV parsing options:

OptionDefaultDescription
Separator","Column separator
EncodingnullFile encoding (null = UTF-8 auto-detect)
SkipRows0Rows to skip before the header

IMappingSuggestionService suggests column-to-property mappings using four strategies in decreasing confidence order:

LevelSourceConfidence
1Saved mappings from previous importsSaved
2Exact match on property name, DisplayName, or aliasExact
3Fuzzy match via normalized Levenshtein distanceFuzzy
4Semantic match via AI (optional ISemanticMappingService)Semantic

IRowValidator<T> validates each mapped entity using FluentValidation. Invalid rows are collected in the import report with their error codes and messages.

IImportExecutor<T> persists valid entities in batches. For each entity, the IRecordIdentityResolver<T> determines whether to INSERT or UPDATE based on the business key.

app.MapDataExchangeEndpoints();
MethodRouteDescription
POST/data-exchangeUpload file and create an import job
POST/{jobId}/previewGet headers, preview rows, mapping suggestions
PUT/{jobId}/mappingsConfirm column mappings
POST/{jobId}/executeExecute import asynchronously (202 Accepted)
POST/{jobId}/dry-runExecute synchronous dry run
GET/{jobId}Job status
GET/{jobId}/reportImport report (statistics + errors)
GET/{jobId}/correction-fileDownload file with error rows only

The import report contains aggregate statistics and only the rows that failed — not the successful rows. For 100,000 rows with 50 errors, only ~50 error objects are held in memory.

ImportReport report = await executor.ExecuteAsync(entities, options);
report.TotalRows; // 100000
report.SucceededRows; // 99950
report.FailedRows; // 50
report.InsertedRows; // 80000
report.UpdatedRows; // 19950
report.Duration; // TimeSpan

Granit.DataExchange also provides a full export pipeline with fluent field definitions, saveable presets, and background job support.

public sealed class PatientExportDefinition : ExportDefinition<Patient>
{
public override string Name => "Acme.PatientExport";
public override string? QueryDefinitionName => "Acme.Patients";
protected override void Configure(ExportDefinitionBuilder<Patient> builder)
{
builder
.IncludeBusinessKey()
.Field(p => p.LastName, f => f.Header("Last name"))
.Field(p => p.FirstName, f => f.Header("First name"))
.Field(p => p.Email)
.Field(p => p.BirthDate, f => f.Header("Date of birth").Format("dd/MM/yyyy"));
}
}
services.AddGranitDataExport();
services.AddExportDefinition<Patient, PatientExportDefinition>();
services.AddScoped<IExportDataSource<Patient>, PatientExportDataSource>();
services.AddSingleton<IExportWriter, CsvExportWriter>();
services.AddSingleton<IExportWriter, ClosedXmlExportWriter>();
MethodRouteDescription
GET/export/definitionsList export definitions
GET/export/definitions/{name}/fieldsAvailable fields
POST/export/jobsCreate and dispatch an export job
GET/export/jobs/{jobId}Job status
GET/export/jobs/{jobId}/downloadDownload exported file
{
"DataExchange": {
"DefaultMaxFileSizeMb": 50,
"DefaultBatchSize": 500,
"FuzzyMatchThreshold": 0.8
},
"DataExport": {
"BackgroundThreshold": 1000
}
}