Data Factory
The Arcus.Testing.Integration.DataFactory
package provides test fixtures related to Azure DataFactory. By using the common test practice 'clean environment', it provides things like an automatic temporary DataFlow debug session to help with testing DataFlow pipelines.
Installation
The following functionality is available when installing this package:
PM> Install-Package -Name Arcus.Testing.Integration.DataFactory
Temporary DataFlow debug session
The TemporaryDataFlowDebugSession
test fixture provides an answer to automatically tracking the process of a DataFactory DataFlow under test. More information on DataFlow debugging can be found on Mapping data flow Debug Mode.
The test fixture instance is meant to be used across tests. By all using the same instance, the performance of the tests is greatly improved.
💡 Several testing framework provides the concept of 'singleton test fixtures':
- NUnit uses
[OneTimeSetUp/TearDown]
attributes.- xUnit uses injectable Collection fixtures.
- MSTest uses
[Assembly/ClassInitializer]
attributes.- Expecto uses higher-order functions for both tests and fixtures.
using Arcus.Testing;
var dataFactoryResourceId = new ResourceIdentifier(
".../Microsoft.DataFactory/factories/<dataFactoryName>")
await using var session =
await TemporaryDataFlowDebugSession.StartDebugSessionAsync(dataFactoryResourceId, logger);
⚡ Uses by default the
DefaultAzureCredential
but other type of authentication mechanisms are supported with overloads that take in theDataFactoryResource
directly.
Customization
The setup of the TemporaryDataFlowDebugSession
test fixture can be customized with the following options:
await TemporaryDataFlowDebugSession.StartDebugSessionAsync(..., options =>
{
// The time to live setting of the cluster in the debug session in minutes (default: 90 minutes).
options.TimeToLiveInMinutes = 60;
// The session ID of an already active debug session.
// Default: empty, meaning: a new debug session will be started. This also happens when no matching session is found.
options.ActiveSessionId = new Guid("3B0E4AF5-AA5C-4BB3-9CDB-06442EE2F2E3");
});
💡 The
ActiveSessionId
is useful when developing locally when you do not want to start/stop the debug session on every run. But this also means that in case an active session is found, it will not be teardown when the test fixture disposes. This follows the 'clean environment' principle that test fixtures should only be responsible for the things they set up.
Full example
The following snippet provides a full examples of how the TemporaryDataFlowDebugSession
test fixture can be used as a singleton test fixture across tests.
- xUnit
- NUnit
- MSTest
- Expecto
using Arcus.Testing;
using Xunit;
public class DataFactoryFixture : IAsyncLifetime
{
public TemporaryDataFlowDebugSession Session { get; private set; }
public async Task InitializeAsync()
{
Session = await TemporaryDataFlowDebugSession.StartDebugSessionAsync(...);
}
public async Task DisposeAsync()
{
await Session.DisposeAsync();
}
}
[CollectionDefinition("DataFactory")]
public class DataFactoryFixtureCollection : ICollectionFixture<DataFactoryFixture>
{
}
[Collection("DataFactory")]
public class MyDataFactoryTests
{
public MyDataFactoryTests(DataFactoryFixture fixture)
{
}
}
using Arcus.Testing;
using NUnit.Framework;
[TestFixture]
public class MyDataFactoryTests
{
private TemporaryDataFlowDebugSession _session;
[OneTimeSetUp]
public async Task InitAsync()
{
_session = await TemporaryDataFlowDebugSession.StartDebugSessionAsync(...);
}
[OneTimeTearDown]
public async Task CleanupAsync()
{
await _session.DisposeAsync();
}
}
using Arcus.Testing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
[TestClass]
public class MyDataFactoryTests
{
private static TemporaryDataFlowDebugSession _session;
[ClassInitialize]
public static async Task InitializeAsync(TestContext context)
{
_session = await TemporaryDataFlowDebugSession.StartDebugSessionAsync(...);
}
[ClassCleanup]
public static async Task CleanupAsync()
{
await _session.DisposeAsync();
}
}
open Arcus.Testing
open Expecto
let myDataFactoryTests session = testList "datafactory tests" []
[<EntryPoint>]
let main args = task {
use! session = TemporaryDataFlowDebugSession.StartDebugSessionAsync(...)
let tests = TestList ([ myDataFactoryTests session ], Normal)
return runTestsWithCLIArgs [] args tests } |> Async.AwaitTask |> Async.RunSynchronously
Run DataFlow within Debug Session
To run a specific DataFlow separately within a DataFactory resource, the manual process would be to start the debug session yourself, start the flow and wait for the result in the preview window.
The TemporaryDataFlowDebugSession
provides an automated approach on activating and reusing debug sessions across tests. Running DataFlows on this automated debug session requires you to provide the name of the DataFlow and the target sink where the result of the DataFlow will be sent to.
using Arcus.Testing;
await using TemporaryDataFlowDebugSession session = ...
DataFlowRunResult result =
await session.RunDataFlowAsync("<dataflow-name>", "<target-sink-name>");
// The run status of data preview, statistics or expression preview.
string status = result.Status;
// The result raw run data of data preview, statistics or expression preview.
BinaryData data = result.Data;
// Parse the data as a specific format (CSV or JSON).
CsvTable csv = result.GetDataAsCsv();
JsonNode json = result.GetDataAsJson();
⚠️ IMPORTANT to note on the
GetDataAs...()
calls is that the DataFactory data preview functionality does not support the full format of all types of file formats. Therefore, take these warnings in consideration.
- For CSV: only the upper header names are considered being a part of the CSV table. Object or arrays expressed in the data preview will be in a single cell.
- For JSON:
- Based on the
SingleDocument
orArrayOfDocuments
, DataFactory can load one or more documents in one run, but there is no distinction in the data preview. Arcus, therefore, assumes that a single row is considered aJsonObject
and multiple rows is aJsonArray
.- The data preview does not support the full JSON format, only objects and array of objects at the root level are supported. Arcus therefore also only supports these two formats in parsing the data preview.
- The data preview does not support the full JSON format, an array with objects that have different property names is valid JSON, but is not supported in the data preview.
- The data preview does not support the full JSON format, be careful of using
null
s for JSON nodes (objects and arrays), as these are also not fully supported.
Customization
The process of running a DataFlow can be manipulated with several options described here:
using Arcus.Testing;
await session.RunDataFlowAsync(..., options =>
{
// Adds a parameter to the data flow upon starting.
options.AddDataFlowParameter("<name>", "<value>");
// Add additional linked services to the debug session.
// 💡 This can be useful to add, for example, an additional Key vault linked services for certain authentication types of datasets.
options.AddLinkedService("datafactory_sales_keyvaultLS");
// The maximum amount of rows to include in the preview response (default: 100 rows).
options.MaxRows = 100;
});
🚩 When the
RunDataFlow
method gives obscure Microsoft failures, it might be a problem with missing linked services that are being passed to the debug session. By default, all datasets are loaded automatically, but additional dependent linked services might not.
Run a Data pipeline
Arcus does not provide any additional functionality to run a pipeline and wait for its result, as all this can be easily done with the Azure.ResourceManager.DataFactory
and Arcus.Testing.Core
packages:
using Arcus.Testing;
using Azure.ResourceManager.DataFactory;
using Azure.ResourceManager.DataFactory.Models;
DataFactoryResource resource = ...
DataFactoryPipelineResource pipeline =
await resource.GetDataFactoryPipelineAsync(pipelineName);
PipelineCreateRunResult run = await pipeline.CreateRunAsync();
DataFactoryPipelineRunInfo finalStatus =
await Poll.Target(async () => await resource.GetPipelineRunAsync(run.RunId.ToString()))
.Until(current => current.Status == "Succeeded")
.Every(TimeSpan.FromSeconds(5))
.Timeout(TimeSpan.FromMinutes(1))
.FailWith("DataFactory pipeline did not succeeded within the expected time frame");