Overview



Service Bus Durable Task Framework(Preview)Developer GuideContents TOC \o "1-3" \h \z \u 1Overview PAGEREF _Toc359062746 \h 22Problem Statement PAGEREF _Toc359062747 \h 23Service Bus Durable Task Framework PAGEREF _Toc359062748 \h 33.1Core Concepts PAGEREF _Toc359062749 \h 33.1.1Task Hub PAGEREF _Toc359062750 \h 33.1.2Task Activities PAGEREF _Toc359062751 \h 33.1.3Task Orchestrations PAGEREF _Toc359062752 \h 33.1.4Task Hub Worker PAGEREF _Toc359062753 \h 33.1.5Task Hub Client PAGEREF _Toc359062754 \h 33.1.6Concrete Example – Video Encoding Orchestration PAGEREF _Toc359062755 \h 43.2Writing Task Orchestrations PAGEREF _Toc359062756 \h 63.3Writing Task Activities PAGEREF _Toc359062757 \h 73.4Orchestration Instance Management PAGEREF _Toc359062758 \h 73.5Error Handling & Compensation PAGEREF _Toc359062759 \h 73.6Task Hub Management PAGEREF _Toc359062760 \h 83.7Orchestration Features PAGEREF _Toc359062761 \h 83.7.1Automatic Retries PAGEREF _Toc359062762 \h 83.7.2Waiting on Timers PAGEREF _Toc359062763 \h 93.7.3Waiting on External Events PAGEREF _Toc359062764 \h 103.7.4Supporting Infinite Loops (or really long lived Orchestrations) PAGEREF _Toc359062765 \h 103.7.5Reusing existing Orchestrations PAGEREF _Toc359062766 \h 114Service Considerations PAGEREF _Toc359062767 \h 114.1Logging PAGEREF _Toc359062768 \h 11OverviewThe Service Bus Durable Task Framework provides developers a means to write code orchestrations in C# using the .Net Task framework and the async/await keywords added in .Net 4.5.Here are the key features of the durable task framework:Definition of code orchestrations in simple C# codeAutomatic persistence and check-pointing of program stateVersioning of orchestrations and activitiesAsync timers, orchestration composition, user aided checkpointingThe framework itself is very light weight and only requires an Azure Service Bus namespace and optionally an Azure Storage account. Running instances of the orchestration and worker nodes are completely hosted by the user. No user code is executing ‘inside’ Service Bus.Problem StatementMany scenarios involve updating state or executing actions in multiple places in a transactional manner. E.g. debiting some amount of money from some account in database A and crediting it to some other account in database B needs to be done atomically. This consistency can be achieved by using a distributed transaction where this transaction would span the debit & credit operations against database A and B respectively. However, for strict consistency, transactions imply locks and locks are detrimental for scale as subsequent operations that require the same lock would be blocked until the lock is released. This becomes a big scale bottleneck for cloud services which are designed to be highly available as well as consistent. Furthermore, even if we decided that we could take the hit of a distributed transaction, we’d find that almost none of the cloud services actually supported distributed transactions (or even simple locking for that matter).The alternate model for achieving consistency is by executing the business logic for debit and credit within a durable workflow. In this case the workflow will do something like this in pseudo-code:debit from some account in DB Aif the debit was successful then:credit to some account in DB Bif the above failed then keep retrying until some thresholdif the credit still failed then undo the debit from DB A as well and send notification emailIn the happy path, this will give us ‘eventual’ consistency. I.e. after (1) the overall system state becomes inconsistent but it will become consistent eventually after the workflow is completed. However, in the unhappy path a number of things can go wrong; the node executing the pseudo-code can crash at an arbitrary point, debit from DB A can fail or credit to DB B can fail. In these cases, to maintain consistency we must ensure the following:The debit and credit operations are idempotent i.e. re-executing the same debit or credit operation would become no-ops. If the executing node crashes it would restart from the last place where we did a successful durable operation (e.g. #1 or #2a above)From these two items, (a) can only be supplied by the debit/credit activity implementation. Item (b) can also be done via code by keeping track of the current position in some database. But this state management becomes a hassle especially when the number of durable operations grows. This is where a framework to do automatic state management would greatly simplify the experience of building a code based workflow.Service Bus Durable Task FrameworkThe Service Bus Durable Task Framework allows users to write C# code and encapsulate it within ‘durable’ .Net Tasks. These durable tasks can then be composed with other durable tasks to build complex task orchestrations.Core ConceptsThere are a few fundamental concepts in the framework.Task HubThe Task Hub is a logical container for Service Bus entities within a namespace. These entities are used by the Task Hub Worker to pass messages reliably between the code orchestrations and the activities that they are orchestrating. Task ActivitiesTask Activities are pieces of code that perform specific steps of the orchestration. A Task Activity can be ‘scheduled’ from within some Task Orchestration code. This scheduling yields a plain vanilla .Net Task which can be (asynchronously) awaited on and composed with other similar Tasks to build complex orchestrations.Task OrchestrationsTask Orchestrations schedule Task Activities and build code orchestrations around the Tasks that represent the activities. Task Hub Worker The worker is the host for Task Orchestrations and Activities. It also contains APIs to perform CRUD operations on the Task Hub itself. Task Hub ClientThe Task Hub Client provides:APIs for creating and managing Task Orchestration instancesAPIs for querying the state of Task Orchestration instances from an Azure TableBoth the Task Hub Worker and Task Hub Client are configured with connection strings connection strings for Service Bus and optionally with connection strings for a storage account. Service Bus is used for storing the control flow state of the execution and message passing between the task orchestration instances and task activities. However Service Bus is not meant to be a database so when a code orchestration is completed, the state is removed from Service Bus. If an Azure Table storage account was configured then this state would be available for querying for as long as it is kept there by the user.The framework provides TaskOrchestration and TaskActivity base classes which users can derive from to specify their orchestrations and activities. They can then use the TaskHub APIs to load these orchestrations and activities into the process and then start the worker which starts processing requests for creating new orchestration instances. The TaskHubClient APIs are used to create new orchestration instances, query for existing instances and then terminate those instances if required.Concrete Example – Video Encoding OrchestrationAssume that user wants to build a code orchestration that will encode a video and then send an email to the user when it is done.To implement this using the Service Bus Durable Task Framework, the user will write two Task Activities for encoding a video and sending email and one Task Orchestration that orchestrates between these two. public class EncodeVideoOrchestration : TaskOrchestration<string, string>{ public override async Task<string> RunTask(OrchestrationContext context, string input) { string encodedUrl = await context.ScheduleTask<string>(typeof (EncodeActivity), input); await context.ScheduleTask<object>(typeof (EmailActivity), input); return encodedUrl; }}In this orchestration, the user is scheduling the Encode Video activity, waiting for the response and then scheduling the Send Email activity. The framework will ensure that the state of the execution is preserved durably. E.g., if the node hosting the task orchestration above crashed before scheduling the Encode Video activity, on restart it will know to schedule this activity. If the node crashed after it had scheduled the activity but before the response came back, on restart it will be smart enough to know that the activity was already scheduled and it will directly start waiting for the response of the EncodeVideo activity.public class EncodeActivity : TaskActivity<string, string>{ protected override string Execute(TaskContext context, string input) { Console.WriteLine("Encoding video " + input); // TODO : actually encode the video to a destination return ""; }}public class EmailActivity : TaskActivity<string, object>{ protected override object Execute(TaskContext context, string input) { // TODO : actually send email to user return null; }}The user code above (EncodeVideoOrchestration, EncodeActivity and EmailActivity) needs to be hosted and available somewhere to be useful. This is how users can load these orchestration and activity classes in a worker and start processing requests to create new orchestration instances.string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";TaskHubWorker hubWorker = new TaskHubWorker("myvideohub", serviceBusConnString) .AddTaskOrchestrations(typeof (EncodeVideoOrchestration)) .AddTaskActivities(typeof (EncodeActivity), typeof (EmailActivity)) .Start();Multiple instances of these workers can be running concurrently against the same task hub to provide load balancing as required. The framework guarantees that a particular orchestration instance code would only be executing on a single worker at one time.The TaskHubWorker also exposes methods to stop the worker instance.The last remaining piece is creation and management of orchestration instances i.e. how to actually trigger the code orchestrations that the user has loaded and how to monitor or terminate them.string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";TaskHubClient client = new TaskHubClient("myvideohub", serviceBusConnString);client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "");This snippet starts an instance of the Encode Video orchestration and passes it the input parameter.Other APIs on the TaskHubClient allow querying, termination and other controls over a running orchestration instance.Writing Task OrchestrationsTask orchestrations basically invoke Task Activities and define how the control flows from one activity to another. The code that can be written within an orchestration is plain C# but with a few constraints. These constraints exist because of how the framework replays the orchestration code. This is described in a nutshell below.Every time new work needs to be processed by an orchestration (e.g. a Task Activity finished or a timer fired), the framework replays the user’s TaskOrchestration code from scratch. Whenever this user code attempts to schedule a TaskActivity, the framework intercepts this call and consults the ‘execution history’ of the orchestration. If it finds that the particular TaskActivity had already been executed and yielded some result, it would replay that Activity’s result immediately and the TaskOrchestration would continue. This would continue happening until the user code has executed to a point where either it is finished or it has scheduled a new Activity. If it is the latter case then the framework would actually schedule and execute the specified Activity. After this Activity is completed its result also becomes part of the execution history and the value would be used in subsequent replays.With this in mind here are the constraints of the type of code we can write within a TaskOrchestration:Code must be deterministic since it is going to be replayed multiple times, it must yield the same result every time. E.g. there cannot be any direct calls to get the current date/time, random numbers, Guids or remote service invocations etc. There is a helper API on the OrchestrationContext object passed to the TaskOrchestration.RunTask() method that provides a deterministic way to get the current date/time. This should be used instead of System.DateTime.Users can make the non-deterministic operations deterministic by wrapping them within TaskActivities. E.g. GenerateGuidActivity, GenerateRandomNumberActivity etc. Since Task Activity results are replayed by the framework, the non-deterministic value will be generated once on first execution and then on subsequent executions the same value will be replayed.In the future, other helper APIs will also be added to the OrchestrationContext.Code should be non-blocking i.e. no thread sleep or Task.WaitXXX() methods. The framework provides helper methods to setup async timers which should be used instead.The execution history of an orchestration has a record of all scheduled task activities and their results. This history is also bounded by size limitations of Service Bus so as a consequence, infinite loops are not possible without user-aided checkpointing (described under Generations below).The Task Orchestration code is always executed in a single thread. This means that if the code was awaiting multiple tasks and one of them completed followed immediately by another one, the framework is guaranteed to run the continuations for both of these tasks serially. Writing Task ActivitiesTask Activities are the ‘leaf’ nodes of an orchestration. This is the code which actually performs a unit of operation within the orchestration. This is plain C# code with no constraints. Task Activity code is guaranteed to be called at least once. However in error cases it might be invoked multiple times so idempotence is desirable.Note: In a future version of the framework, users would be able to switch the guarantee to at-most-once instead of at-least-once thus giving them more control in the orchestration code.Orchestration Instance ManagementThe TaskHubClient API allows users to create new orchestration instances, query for the state of created orchestration instances and terminate these instances.The API for creating an orchestration instance will return the instance information. This information can be used in subsequent APIs to query for the state of the instance. OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "");OrchestrationState state = client.GetOrchestrationState(instance);Console.WriteLine(state.Name + " " + state.OrchestrationStatus + " " + state.Output);The returned instance can also be used to terminate the orchestration:OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "");// something bad happenedclient.TerminateInstance(instance);Note that the instance querying methods require the Task Hub to have been created with an Azure Storage connection string. If the connection string has not been supplied then all instance querying methods will throw an InvalidOperationException.Error Handling & CompensationAny exception that is thrown in the TaskActivity code is marshalled back and thrown as a TaskFailedException in the TaskOrchestration code. Users can write the appropriate error handling and compensation code that suits their needs around this.public class DebitCreditOrchestration : TaskOrchestration<object, DebitCreditOperation>{ public override async Task<object> RunTask(OrchestrationContext context, DebitCreditOperation operation) { bool failed = false; bool debited = false; try { await context.ScheduleTask<object>(typeof (DebitAccount), new Tuple<string, float>(operation.SourceAccount, operation.Amount)); debited = true; await context.ScheduleTask<object>(typeof(CreditAccount), new Tuple<string, float>(operation.TargetAccount, operation.Amount)); } catch (TaskFailedException exception) { failed = true; } if (failed) { if (debited) { // can build a try-catch around this as well, in which case the // orchestration may either retry a few times or log the inconsistency for review await context.ScheduleTask<object>(typeof(CreditAccount), new Tuple<string, float>(operation.SourceAccount, operation.Amount)); } } return null; }}Note that due to a CLR limitation, the await keyword cannot be used within a catch block. This is the reason why in the snippet above we are using a flag to signal failure to the code outside the catch block.Task Hub ManagementThe TaskHubWorker has APIs that can be used to perform CRUD operations on the TaskHub itself.string serviceBusConnString = string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";string tableConnectionString = "UseDevelopmentStorage=true;DevelopmentStorageProxyUri=";TaskHubWorker hubWorker = new TaskHubWorker("mytesthub", serviceBusConnString, tableConnectionString);// creates the required underlying entities in Service Bus and Azure Storage for the task hubhubWorker.CreateHub();// creates the required underlying entities in Service Bus and Azure Storage for the task hub// only if they don't already existhubWorker.CreateHubIfNotExists();// deletes the underlying entities in Service Bus and Azure Storage for the task hubhubWorker.DeleteHub();// existence checkbool hubExists = hubWorker.HubExists();The Azure Storage connection string is optional. If it is not supplied then the instance storage will not be created and consequently the instance data will not be available for querying.Orchestration FeaturesThe framework contains a number of features that are designed to make it easy to implement common patterns in code orchestrations. In most of the cases, these features are accessible via the OrchestrationContext that is passed in the TaskOrchestration.RunTask() method override.Automatic RetriesAny application that is consuming cloud services should be resilient to failures to some extent and hence client side retries become an important part of the implementation. The framework provides alternative scheduling methods that perform retries on TaskActivity failures as per the supplied policy. This is useful if you want automatic retries e.g. for a TaskActivity that reads data from a webservice or performs an idempotent write to a database.public class GetQuoteOrchestration : TaskOrchestration<string, string>{ public override async Task<string> RunTask(OrchestrationContext context, string input) { // retry every 10 seconds upto 5 times before giving up and bubbling up the exception RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(10), 5); await context.ScheduleWithRetry<object>(typeof (GetQuote), retryOptions, null); return null; }}Waiting on TimersUsers can wait on async timer events within the orchestration code.public class EncodeVideoOrchestration : TaskOrchestration<string, string>{ public override async Task<string> RunTask(OrchestrationContext context, string input) { string encodedUrl = await context.ScheduleTask<string>(typeof (EncodeActivity), input); await context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromDays(1)), "timer1"); await context.ScheduleTask<object>(typeof (EmailActivity), input); return encodedUrl; }}The highlighted line will cause the orchestration to sleep for one day between the encode video and the email activity.Timers can be used to do periodic work as well as timeouts.public class BillingOrchestration : TaskOrchestration<string, string>{ public override async Task<string> RunTask(OrchestrationContext context, string input) { for (int i = 0; i < 10; i++) { await context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromDays(1)), "timer1"); await context.ScheduleTask<object>(typeof (BillingActivity)); } return null; }}In the snippet above, the billing orchestration will be signaled every day and would invoke some billing activity on waking up.public class GetQuoteOrchestration : TaskOrchestration<string, string>{ public override async Task<string> RunTask(OrchestrationContext context, string input) { Task timer = context.CreateTimer( context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(5)), "timer1"); Task getQuote = context.ScheduleTask<object>(typeof(GetQuote)); Task winner = Task.WhenAny(timer, getQuote); if (timer.IsCompleted) { // request timed out, do some compensating action } else { // use getQuote task result } return null; }}In this snippet, we schedule the GetQuote activity and also create a timer to fire in 5 seconds. If the timer fires before the activity returns then we run some compensation otherwise we use the returned quote.Waiting on External EventsOften orchestrations need to wait for external events like a human being entering some input or some other external trigger. The framework provides a mechanism for the orchestration to asynchronously wait for an external event.public class GetQuoteOrchestration : TaskOrchestration<string, string>{ TaskCompletionSource<object> getPermission = new TaskCompletionSource<object>(); public override async Task<string> RunTask(OrchestrationContext context, string input) { await getPermission.Task; await context.ScheduleTask<object>(typeof (GetQuote), null); return null; } public override void OnEvent(OrchestrationContext context, string name, string input) { getPermission.SetResult(null); }}To trigger the event from the outside, the user can call the TaskHubClient.RaiseEvent method.TaskHubClient client = new TaskHubClient("test", serviceBusConnString);OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "");client.RaiseEvent(instance, "dummyEvent", "dummyData");Supporting Infinite Loops (or really long lived Orchestrations)As mentioned above, the framework replays the execution history to recreate program state for the user’s TaskOrchestration instance. This history is bounded by size so it is not possible to have TaskOrchestrations with infinite loops. Using the generation feature, users can ‘checkpoint’ the orchestration instance and create a new one. public class CronOrchestration : TaskOrchestration<string, int>{ public override async Task<string> RunTask(OrchestrationContext context, int intervalHours) { // bounded loop for (int i = 0; i < 10; i++) { await context.CreateTimer<object>( context.CurrentUtcDateTime.Add(TimeSpan.FromHours(intervalHours)), null); // TODO : do something interesting } // create a new instance of self with the same input (or different if needed) context.ContinueAsNew(intervalHours); return null; }}In this snippet, the user is telling the framework to create a brand new instance of itself (i.e. a new generation or execution) and forwards the input it received as the input to the new instance. This orchestration can run indefinitely without running into the history size limitations.Reusing existing OrchestrationsOrchestrations can also start and wait on other orchestrations using the sub orchestration feature. This is useful for cases where you have a library of orchestrations and you want to build a larger orchestration around these. public class PeriodicBillingJob : TaskOrchestration<string, int>{ // hardcoded list of apps to run billing orchestrations on static readonly string[] ApplicationList = new string[] { "app1", "app2" }; public override async Task<string> RunTask(OrchestrationContext context, int intervalHours) { // bounded loop for (int i = 0; i < 10; i++) { await context.CreateTimer<object>( context.CurrentUtcDateTime.Add(TimeSpan.FromHours(intervalHours)), null); List<Task> billingTasks = new List<Task>(); foreach (string appName in PeriodicBillingJob.ApplicationList) { billingTasks.Add( context.CreateSubOrchestrationInstance<bool>(typeof (BillingOrchestration), appName)); } await Task.WhenAll(billingTasks); } // create a new instance of self with the same input (or different if needed) context.ContinueAsNew(intervalHours); return null; }}// a reusable orchestration which can either be triggered directly by the admin or via // some master recurring periodic billing orchestrationpublic class BillingOrchestration : TaskOrchestration<bool, string>{ public override async Task<bool> RunTask(OrchestrationContext context, string applicationName) { // TODO : process billing information for 'applicationName' return true; }}Service ConsiderationsLoggingAll components of the framework log to the TraceSource “Microsoft.ServiceBus.DurableTask”. Listeners can be attached to this trace source to get the framework traces.The framework ships with a TraceListener that logs to the Console and Debug streams. The class name of the listener is Microsoft.ServiceBus.DurableTask.Tracing.OrchestrationConsoleTraceListener. Here is a snippet of an app.config file that shows how to load the console trace listener: <system.diagnostics> <trace autoflush="true"/> <sources> <source name="Microsoft.ServiceBus.Orchestration" switchName="traceSwitch" switchType="System.Diagnostics.SourceSwitch" > <listeners> <clear/> <add name="configConsoleListener" type=" Microsoft.ServiceBus.DurableTask.Tracing.OrchestrationConsoleTraceListener, Microsoft.ServiceBus.DurableTask" traceOutputOptions="DateTime" /> </listeners> </source> </sources> <switches> <add name="traceSwitch" value="Verbose" /> </switches> </system.diagnostics> ................
................

In order to avoid copyright disputes, this page is only a partial summary.

Google Online Preview   Download