Download.microsoft.com



CHange Data Capture for Specified Interval package SampleSQL Server Technical ArticleWriter: Sandra Ward Published: 11 2008Applies To: SQL Server 2008 Summary: These are the accompanying notes describing the Change Data Capture for Specified Interval Package Sample available on Codeplex. The sample demonstrates the use of CDC technology in support of SSIS incremental load packages. CopyrightThe information contained in this document represents the current view of Microsoft Corporation on the issues discussed as of the date of publication. Because Microsoft must respond to changing market conditions, it should not be interpreted to be a commitment on the part of Microsoft, and Microsoft cannot guarantee the accuracy of any information presented after the date of publication.This White Paper is for informational purposes only. MICROSOFT MAKES NO WARRANTIES, EXPRESS, IMPLIED OR STATUTORY, AS TO THE INFORMATION IN THIS plying with all applicable copyright laws is the responsibility of the user. Without limiting the rights under copyright, no part of this document may be reproduced, stored in or introduced into a retrieval system, or transmitted in any form or by any means (electronic, mechanical, photocopying, recording, or otherwise), or for any purpose, without the express written permission of Microsoft Corporation. Microsoft may have patents, patent applications, trademarks, copyrights, or other intellectual property rights covering subject matter in this document. Except as expressly provided in any written license agreement from Microsoft, the furnishing of this document does not give you any license to these patents, trademarks, copyrights, or other intellectual property.? 2008 Microsoft Corporation. All rights reserved.Microsoft is a registered trademark of Microsoft Corporation in the United States and/or other countries.The names of actual companies and products mentioned herein may be the trademarks of their respective owners.Table of Contents TOC \h \z \t "Heading 4,1,Heading 5,2,Heading 6,3" Getting Started PAGEREF _Toc213210694 \h 1Sample Structure PAGEREF _Toc213210695 \h 1The Sample Environment Test Harness PAGEREF _Toc213210696 \h 2Package Architecture for Incremental Load PAGEREF _Toc213210697 \h 2SetupCDCSample Package PAGEREF _Toc213210698 \h 3Initializing the Environment and Enabling Change Data Capture PAGEREF _Toc213210699 \h 3Execute SQL Task - Create Tables and Enable Change Data Capture PAGEREF _Toc213210700 \h 3Creating Capture Instances for the Source Tables PAGEREF _Toc213210701 \h 5Generating Wrapper TVFs for the Query Functions PAGEREF _Toc213210702 \h 6Generating the Sample Workload PAGEREF _Toc213210703 \h 8Script Tasks – Apply Inserts and Updates to CDC Enabled Tables PAGEREF _Toc213210704 \h 8Script Task – Mark Workload Completion PAGEREF _Toc213210705 \h 9Initializing the Target Tables PAGEREF _Toc213210706 \h 10Execute SQL Task – Create Database Snapshot PAGEREF _Toc213210707 \h 10Data Flow Tasks to Perform the Initial Load PAGEREF _Toc213210708 \h 10Setting Up the Periodic Requests for Change Data PAGEREF _Toc213210709 \h 11Execute SQL Task – Verify Capture Process is Started PAGEREF _Toc213210710 \h 12Execute SQL Task – Determine Datetime Base for Initial Extraction Interval PAGEREF _Toc213210711 \h 13Script Task – Log Capture Not Started Message PAGEREF _Toc213210712 \h 14Extracting and Processing Change Data PAGEREF _Toc213210713 \h 16For Loop Container - Cycle Master at 10 Second Intervals PAGEREF _Toc213210714 \h 17Validating Incremental Load and Reporting Completion Status PAGEREF _Toc213210715 \h 19Execute SQL Task – Check for Mismatch in Replicas PAGEREF _Toc213210716 \h 19VB Script Task – Output Run Completion Status PAGEREF _Toc213210717 \h 20SetupCDCSample Package Variables PAGEREF _Toc213210718 \h 22MasterCDC Package PAGEREF _Toc213210719 \h 23The Change Data Capture Database Validity Interval PAGEREF _Toc213210720 \h 23The Change Data Capture Validity Interval for a Capture Instance PAGEREF _Toc213210721 \h 24A Closer Look at an SSIS Wrapper Function PAGEREF _Toc213210722 \h 25Master Package Configurations and Variables PAGEREF _Toc213210723 \h 26Package Configurations for Initializing Package Variables PAGEREF _Toc213210724 \h 26Master Package Variables PAGEREF _Toc213210725 \h 27Master Package Tasks PAGEREF _Toc213210726 \h 28Execute SQL Task – Check for Data PAGEREF _Toc213210727 \h 28Execute Package Tasks – Extract Change Data for CDC Enabled Tables PAGEREF _Toc213210728 \h 30Script Task – Log Extract Error PAGEREF _Toc213210729 \h 30Script Task – Delay PAGEREF _Toc213210730 \h 31Script Task – Log Extraction Complete PAGEREF _Toc213210731 \h 32Child Packages for the Change Data Capture for Specified Interval Package Sample PAGEREF _Toc213210732 \h 33Child Package Variables PAGEREF _Toc213210733 \h 35Child Package Tasks PAGEREF _Toc213210734 \h 35Script Task - Generate SQL Data Query PAGEREF _Toc213210735 \h 35Data Flow Task – Process Change Data PAGEREF _Toc213210736 \h 37Error Logging in the Child Packages PAGEREF _Toc213210737 \h 40Wrappers for CDC TVFs PAGEREF _Toc213210738 \h 41Instantiated Multi-Statement TVF Wrappers PAGEREF _Toc213210739 \h 41A Closer Look at an SSIS Wrapper Function PAGEREF _Toc213210740 \h 43Use of datetime Values in the Wrapper Signature PAGEREF _Toc213210741 \h 43Returning Column Information in the Wrapper Function PAGEREF _Toc213210742 \h 45Extracting Information from the CDC Update Mask PAGEREF _Toc213210743 \h 46Customizing Wrapper Functions for the First Extraction Interval PAGEREF _Toc213210744 \h 48Running the Change Data Capture for Specified Interval Package Sample in BI Studio PAGEREF _Toc213210745 \h 50Conclusion PAGEREF _Toc213210746 \h 52Getting Started This document goes through the Change Data Capture for Specified Interval Package Sample in detail, describing how the Change Data Capture feature in SQL Server 2008 can be used to support ETL from an SSIS package. Code for the Change Data Capture for Specified Interval Package Sample is available through CODEPLEX. The README supplied with the sample provides detailed information on installing the sample files locally, along with the requirements needed to run the sample. This accompanying document is available on MSDN. The Change Data Capture for Specified Interval Package Sample makes use of the databases AdventureWorks2008 and AdventureWorksDW2008, both of which are available for download on Codeplex. Follow the instructions at the download site to insure they are properly loaded in your environment.NOTE: After loading AdventureWorks2008, execute sp_helpusers to determine whether the database user ‘dbo’ has an associated login. If the LoginName for the returned ‘dbo’ entry is NULL, the database user ‘dbo’ has been orphaned. Run the following command to associate the database user ‘dbo’ with the login ‘sa.’exec sp_changedbowner 'sa'The database user ‘dbo’ must be associated with a valid login in order for the Change Data Capture capture process to successfully harvest changes from the log and deposit them in the database change tables.There are two package variables in the SetupCDCSample package that are key to successfully running the sample: SQLServerInstallPath and BasePath. SQLServerInstallPath, which defaults to c:\program files\Microsoft SQL Server\ identifies the install path for SQL Server on the local machine. If this is different in your environment, modify the package variable appropriately. BasePath, which defaults to @[SQLServerInstallPath] + “100\Samples\Integration Services\Package Samples\Change Data Capture for Specified Interval Package Sample\Change Data Capture Sample\”, identifies the standard install path of the sample relative to the SQL Server install path. If you have installed the sample elsewhere, set this path appropriately.Sample StructureThe Change Data Capture feature of SQL Server 2008 allows the DML activity against database tables to be captured in change tables. The change data is made available to applications through table valued functions that query the change tables. This sample demonstrates the use of CDC technology from within SSIS incremental load packages to obtain source table changes to be applied to a Data Mart.The Sample Environment Test HarnessA setup package is provided that both initializes the sample environment and provides a test harness for driving the data extraction process. The package begins by initializing the source tables to an initial state. It then configures the database for Change Data Capture, and creates a capture instance for each table to be tracked. The package then launches several tasks to generate DML activity against the source tables. At the same time that DML is being applied to the source tables, a database snapshot is taken by the Setup package. The snapshot is then used to provide data for an initial load of the target tables within the Data Mart. Metadata maintained in the snapshot is used to identify the starting point for the initial extraction for the incremental load.Once the initial load completes, the setup package enters a loop that launches the master package to harvest changes for 10 second intervals. It is the action of the master package and its associated child packages to obtain change data for a specified interval and apply the changes to the target environment that represents the principle focus of the sample. The loop logic continues to monitor the progress of the DML tasks through global variables. When the loop logic determines that the DML tasks have all completed and that the window of the next extraction interval has moved beyond the period of time when the workload was applied, the loop logic terminates. On completion, SQL CHECKSUM is used to compare the source files to the replicas. Table differences, if present, are noted in an event log entry that is made to record the status of the completed run. Package Architecture for Incremental Load The core of the sample consists of four packages: one master package and three child packages. The master package obtains the extraction interval for the incremental load from the setup package, and then verifies that the interval lies within the current Change Data Capture validity interval for the database. If the low end-point of the extraction interval is earlier in time than the minimum commit time for change table entries, the master package logs an error to the application event log and terminates. This is an indication that change table cleanup has been too aggressive, and the needed change data is no longer available in the change tables. If the high end-point of the extraction interval is later in time that the maximum commit time associated with a change table entry, the capture process has not completed the population of the change tables for the interval. In this case, the master package delays giving the capture process the opportunity to catch up before checking again. The delay loop will continue to execute until the capture process has caught up, or an iteration limit is reached and the package terminates depositing an error message in the event log. If both end-points of the extraction interval are in range, the master package launches the child packages to retrieve the change data and update the Data Mart. The master package waits for notification from all packages before writing an informational message to the application event log at the conclusion of the extraction cycle. The log entry identifies the start and end times of the extraction interval as well as other packages variables. SetupCDCSample PackageThe package SetupCDCSample.dtsx sets up the test environment for the sample. It uses AdventureWorks2008 as its source database, and AdventureWorksDW2008 as the target. The database snapshot AdventureWorks2008_dbss is created for AdventureWorks2008 to provide a consistent view of the source tables for the initial load. The diagram below shows the control flow for the setup package.Figure 1: SETUPCDCSAMPLE PackageInitializing the Environment and Enabling Change Data CaptureExecute SQL Task - Create Tables and Enable Change Data CaptureThe SetupCDC Sample package begins with a single SSIS Execute SQL Task. Its purpose is to run the T-SQL script CDCSetupTables.sql to setup the environment for Change Data Capture.The script begins by generally cleaning up the sample environment, removing any objects created in previous runs. This makes it straightforward to rerun the sample and still allow the created objects to endure at the end of the run.The script then enables Change Data Capture for the database AdventureWorks2008. The Change Data Capture feature of SQL Server 2008 that allows the DML activity against database tables to be captured in change tables must initially be enabled at the database level by a member of the fixed server sysadmin role.You can use the following T-SQL query to determine whether Change Data Capture is already enabled for a database:SELECT is_cdc_enabled from sys.databasesWHERE name = 'AdventureWorks2008'AND is_cdc_enabled = 1WIthin the sample, this query is used to first determine whether Change Data Capture has already been enabled for the database. If it is, the following stored procedure is run to disable it.exec sys.sp_cdc_disable_dbDisabling Change Data Capture at the database level will cleanup all of the Change Data Capture metadata for the database including the metadata associated with capture instances that have have previously been created for tracked tables. This allows each execution of the sample to execute from a clean test environment.The following stored procedure is then executed to enable Change Data Capture for AdventureWorks2008:exec sys.sp_cdc_enable_dbThe script then removes preexisting tables and functions from the schema CDCSample in AdventureWorks2008. The schema CDCSample is used for the new tables that will be created to serve as the source tables for the SSIS packages used to extract change data.The following three tables are created in the CDCSample schema of AdventureWorks2008, each mirroring an existing AdventiureWorks2008 table:CDCSample.CustomerCDCSample.CreditCardCDCSample.WorkOrderThree multi-statement table valued functions are generated and instantiated at this time: CDCSample.fn_net_changes_Customer, CDCSample.fn_net_changes_CreditCard and CDCSample.fn_net_changes_WorkOrder. Three additional custom table valued functions are also created to handle the initial extraction interval when synchronizing to the initial load. Both sets of functions serve as wrapper functions for the CDC generated functions used to query for change data. They will be discussed in detail when the SSIS packages for extracting change data are examined.A portion of the table data in the original AdventureWorks2008 tables is then used to initialize the source tables in the CDCSample schema. The remaining data will be used later to generate a dynamic workload when tracking is enabled.With the source tables initialized, the script now creates three destination tables in the database AdventureWorksDW2008, one for each source table. For the purposes of the sample, the column structure of the destination tables is identical to that of the source tables. The function of the sample is to simply apply the changes associated with the source to the destination to allow the destination to reflect changes to the source in a timely fashion.Creating Capture Instances for the Source TablesOnce the source tables are initialized, the setup script creates capture instances for each of the source tables. While the database itself must be enabled for Change Data Capture by a member of the sysadmin server role, capture instances for individual tables can be created by members of the db_owner database role. The stored procedure sys.sp_cdc_enable_table is used to create a capture instance for a source table. The following calls are made within the setup script to create capture instances for the three source tables.exec sys.sp_cdc_enable_table'CDCSample', 'Customer', 'Customer', @supports_net_changes = 1, @role_name = nullexec sys.sp_cdc_enable_table 'CDCSample', 'CreditCard', ' CreditCard ', @supports_net_changes = 1, @role_name = nullexec sys.sp_cdc_enable_table 'CDCSample', 'WorkOrder', 'WorkOrder',@supports_net_changes = 1, @role_name = nullThe first two parameters to the stored procedure are the schema and table name of the source table to be tracked. The third parameter is the name chosen for the associated capture instance. Any name can be chosen, but within a given database, the capture instance name must be unique. Since it is used to identify the change data associated with a given source table, it usually makes sense to name the capture instance in a manner that provides cues to its associated source table. If not specified, it will default to the schema name followed by the table name, separated by an underscore. In the sample, the tablename has been used as the capture instance name.The parameter @supports_net_changes is used to indicate that functions to query for net changes should be generated for the capture instance. The source table must have a primary key or a defined unique index if this parameter is set to 1. Here, the term ‘net changes’ is used in a very specific way. The function returning net changes will only return a single row for each changed row for a given query window representing the final state of the row at the end of the interval. The operation returned with the row will be the one needed to correctly apply the row to the destination. In contrast, when a query for all changes is requested, a row is returned in the result set for each committed change to a row of the table.The parameter @role_name = null is used to indicate that no gating role is begin used to restrict access to change data. In order to have access to change data, the requestor must have select access to the captured columns of the source table. In addition, if the caller is not sysadmin or db_owner and a gating role has been defined, the caller must also be a member of the gating role. By default, a gating role must be defined. If, however, the @role_name parameter is explicitly set to null when the capture instance is created, no gating role is used and select access alone is sufficient to gain access to the change data. Generating Wrapper TVFs for the Query FunctionsAt the time Change Data Capture is enabled for a source table, the single statement Table Valued Functions (TVFs) needed to query the change table for change data are automatically generated. Functions to query for all changes are always generated, while those used to extract net changes are generated only if the @support_net_changes parameter is set to 1. The principle drawback of the generated TVFs with respect to SSIS developers is that the query interval used to identify the range for which change data is needed is Log Sequence Number (LSN) based rather than time based. Change Data Capture does, however, provide a stored procedure that will script wrapper functions for the generated TVFs that use time based rather than LSN based parameters as interval boundaries. The sample setup script makes use of this capability and instantiates wrapper stored procedures for each of the defined net changes functions after the capture instances are created.The following simple stored procedure is created to generate the wrappers. The call to sp_cdc_generate_wrapper_function when no parameters are specified will return a result set that includes scripts to generate wrappers for all of the capture instances that the caller is authorized to access. The instantiation loop then selects from among those wrappers only those that wrap the net changes queries.create procedure [CDCSample].[generate_wrappers]asbegindeclare @wrapper_functions table(function_name sysname, create_stmt nvarchar(max))insert into @wrapper_functionsexec sys.sp_cdc_generate_wrapper_functiondeclare @stmt nvarchar(max)declare #hfunctions cursor local fast_forward forselect create_stmt from @wrapper_functionswhere function_name like 'fn_net_changes%'open #hfunctionsfetch #hfunctions into @stmtwhile (@@fetch_status <> -1)beginexec sp_executesql @stmtfetch @hfunctions into @stmtendclose @hfunctionsdeallocate #hfunctionsendThe above procedure is then called after the capture instances have been created.exec CDCSample.generate_wrappersGenerating the Sample WorkloadScript Tasks – Apply Inserts and Updates to CDC Enabled TablesOnce the environment has been enabled for Change Data Capture three Execute SQL Tasks are launched in parallel to generate DML activity against the source tables. After each task completes, a second task is launched to generate additional activity. In total, there are six tasks that generate load: one applying inserts and one applying updates, for each of three source tables. The update load only targets table rows populated as part of the initial load. A seeded random number generator is used so that the results are reproducible. The insert load is generated from the portion of the table rows not included within the initial load. By periodically enforcing a 10 second delay between batches of inserts and updates, the load is spread across several minutes. This enforced delay makes it easier to demonstrate techniques for systematically moving the query window when data needs to be extracted periodically. Where the period for an actual ETL system is most typically 24 hours, the period used in this sample is 20 seconds. The principle used to walk the change tracking timeline, however, is the same irrespective of the size of the extraction interval.Structurally, all of the Execute SQL Tasks that are used to generate workload are identical. They take no parameters and do not generate results sets. Each runs a SQL script to apply changes to CDC source tables. The table below shows the common SQL Statement attributes of the workload tasks.SQL Statement AttributeValueConnectionTypeOLE DBConnectionAdventureWorks2008SQLSourceTypeFile connectionBelow the script files for generating workload are paired with their corresponding tasks.Execute SQL Task for WorkloadSQLStatementSourceExpressions- ConnectionStringInsert Customer Table@[ScriptPath] + ’CDCCustomerInsert.sql’Modify Customer Table@[ScriptPath] + ’CDCCustomerModify.sql’Insert CreditCard Table@[ScriptPath] + ’CDCCreditCardInsert.sql’Modify CreditCard Table@[ScriptPath] + ’CDCCreditCardModify.sql’Insert WorkOrder Table@[ScriptPath] + ’CDC WorkOrderInsert.sql’Modify WorkOrder Table@[ScriptPath] + ’CDC WorkOrderModify.sql’Script Task – Mark Workload CompletionThe Script Task, Mark Workload Completion, runs after all the DML workload has been applied to the source tables. It is used to set package variables flagging the completion of the workload tasks and noting the completion time. The loop that periodically launches the master package to harvest change data monitors these package variables and terminates sample execution after the entire workload has been applied to the target environment.The EntryPoint property which defines the first method that executes when the script task runs is MarkWorkloadCompletion. The ReadWriteVariables property allows the following package variables to be set by the script:User::WorkloadCompleted,User::WorkloadEndTimeThe VB script code to support setting these package variables is shown below. Public Sub MarkWorkloadCompletion() Dim varWorkloadEndTime As Variable = Dts.Variables("User::WorkloadEndTime") Dim varWorkloadCompleted As Variable = Dts.Variables("User::WorkloadCompleted") Dts.VariableDispenser.LockForWrite("User::WorkloadEndTime") Dts.VariableDispenser.LockForWrite("User::WorkloadCompleted") varWorkloadEndTime.Value() = Now varWorkloadCompleted.Value() = 1 Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Success End SubInitializing the Target TablesOne of the principle issues to address when applying change data to a target environment is the synchronization of the stream of change data to the initial load. Simply put, the initial load of the target reflects a snapshot of the source at some point in time. The challenge is to determine the last change represented within the snapshot in order to have a basis for determining the first incremental change that needs to be applied.For Change Data Capture, the recommended strategy for synchronization makes use of a database snapshot created after the source tables are enabled. Use of the snapshot as the source for the initial load insures the cross table consistency of the tracked tables. More importantly, however, the metadata maintained for the snapshot allows you to precisely determine the LSN to use when you apply the first incremental load. The sample treats the first extraction interval differently than the others, using an LSN value for the low end-point and a datetime value as the high end-point. This is needed to insure that no changes are missed or repeated as a result of the synchronization process. A custom wrapper function is created to deal with this special interval.Execute SQL Task – Create Database SnapshotAs described above, a database snapshot is created to be used as the source for the initial load of the target tables. The snapshot is created concurrently with the execution of the workload tasks to demonstrate that the content of the target tables subsequent to the initial load is not explicitly predetermined. Snapshot metadata will be used to determine the appropriate starting point for the incremental loads that follow.The package variable User::CreateSnapshot contains the statement used to create the database snapshot. The content of the variable is derived from an expression that references the package variable User::BasePath to construct the path where the snapshot file is to be located."CREATE DATABASE @AdventureWorks2008_dbss ON(NAME = AdventureWorks2008_Data, FILENAME = '" +@[BasePath] + "AdventureWorks2008_data.ss')AS SNAPSHOT OF AdventureWorks2008;"The SQL statement has no input parameters and no result set is returned.Data Flow Tasks to Perform the Initial LoadOnce the snapshot is created it can be queried concurrently to obtain a consistent initial load for the target tables. The sample makes use of data flow tasks to perform the initial load, but any technology that uses the snapshot as the source for the data can be used. Use of the snapshot as the source guarantees the cross table consistency of the target tables that are loaded.The three Data Flow Tasks used to perform the initial load of the target tables are all structured identically. What differs among the three tasks are the names of the source and destination tables that provide the end points of the data flow. Each consists of an OLE DB source referencing the database snapshot, and an OLE DB destination that references AdventureWorksDW2008, the target database. All of the columns of the source table are output to the data flow. In general, column names associated with the source are mapped to identical names in the destination. Columns defined as computed columns, however, are ignored.Below is the association between individual Data Flow Tasks and the source and destination tables.OLE DB Source AttributeValueOLE DB connection managerAdventureWorks2008_dbssData access modeTable or view OLE DB Destination AttributeValueOLE DB connection managerAdventureWorksDW2008Data access modeTable or viewThe table below shows the common attributes of the OLE DB source and destination Data Flow Components:Data Flow TaskSource/Destination TableLoad Target Customer Table From SnapshotCDCSample.CustomerLoad CreditCard Table From SnapshotCDCSample.CreditCardLoad WorkOrder Table From SnapshotCDCSample.WorkOrderSetting Up the Periodic Requests for Change DataOnce the initial load of the target tables has completed the sample verifies that the capture process responsible for harvesting changes from the transaction log and depositing them in the change tables is active. If there are no entries in cdc.lsn_time_mapping, this is an indication that the capture process did not auto-start and that SQL Agent is not running. If an active capture process is not detected, the sample will log an error in the event log and terminate. If the presence of an active capture process is verified, metadata from the database snapshot is used to determine the LSN that will anchor the initial incremental load. A base time for computing the high end-point for the initial load is also determined at this time. Execute SQL Task – Verify Capture Process is StartedIf there are no entries in cdc.lsn_time_mapping, this is an indication that the capture process did not auto-start when Change Data Capture was enabled. This typically occurs because SQL Agent is not running. This condition is detected here so that an event log message can be posted to indicate the failure to auto-start and the run can be terminated. The package variable User::CaptureStarted is used to report this condition.The following SQL Statement is defined as Direct input:declare @start_time datetime, @capturestarted bitselect @start_time = min(tran_end_time)from cdc.lsn_time_mappingif @start_time is nullbeginselect @capturestarted = 0endelsebeginselect @capturestarted = 1endselect @capturestarted as CaptureStartedThe SQL statement has no input parameters. The result set is defined as a single row with the column CaptureStarted:Result NameVariable NameCaptureStartedUser::CaptureStartedIf there are no entries in the cdc.lsn_time_mapping table, the script task Log Capture Not Started Message is called to log an informational message indicating that the capture process did not auto start, which requires SQL Agent to be running. Execute SQL Task – Determine Datetime Base for Initial Extraction IntervalAfter the initial load of the target tables the sample uses an Execute SQL task to determine the end-points for the first extraction interval. Subsequent extraction intervals will be based off the initial interval, at ten second intervals. For the initial extraction interval, the lower bound is expressed as an LSN value to insure that no incremental changes are lost between the snapshot and the first incremental load. This LSN value is determined by using the function sys.fn_cdc_dbsnapshotLSN to retrieve the last commit LSN from the database snapshot. This value is saved as the package variable LastLSN. Once the lower boundary expressed as an LSN value is determined, the sample uses information in the cdc.lsn_time_mapping table to assign an approximate datetime value to this LSN. It is this approximate datetime value that is used to compute the high end-point for the initial extraction interval. The script has a built in delay to wait until the capture process has processed all LSN values through the determined LastLSN value. After this is insured, the minimum tran_end_time of the entries in cdc.lsn_time_mapping with start_lsn value greater than LastLSN is then determined. This value is saved in the package variable ExtractEndTime. Later in the loop that cycles the master package every 10 seconds, this value will be used as the base from which the high-end point of the first extraction interval is determined.The following SQL Statement is defined as Direct input:declare @command nvarchar(max), @database_name nvarchar(1000),@lastLSN binary(10), @max_lsn binary(10), @start_time datetime,@lastLSNstr nvarchar(42)exec sys.sp_cdc_dbsnapshotLSN 'AdventureWorks2008_dbss',@lastLSN output, @lastLSNstr outputselect @max_lsn = sys.fn_cdc_get_max_lsn()while (@lastLSN >= @max_lsn)beginwaitfor delay '00:00:10'select @max_lsn = sys.fn_cdc_get_max_lsn()endselect @start_time = min(tran_end_time)from cdc.lsn_time_mappingwhere start_lsn > @lastLSNselect @start_time = convert(nvarchar(40),@start_time,20)select @start_time as ExtractStartTime, @start_time as ExtractEndTime, @lastLSNstr as LastLSNThe SQL statement has no input parameters.The result set is defined as a single row with the columns ExtractEndTime and LastLSN.Result NameVariable NameExtractEndTimeUser:: ExtractEndTimeLastLSNUser:: LastLSNScript Task – Log Capture Not Started MessageThe Script Task Log Capture Not Started Message, is used to log an information message to the Windows event log to indicate that the capture process did not auto-start. The precedence constraint that defines the workflow between the Execute SQL Task Determine Extraction Start Time and this task is the following: Completion and@CaptureStarted == false evaluates to TRUEThe EntryPoint property which defines the first method that executes when the script task runs is LogCaptureNotStartedMessage. The ReadOnlyVariables property setting makes the following package variables available to the script:System::PackageName,System::StartTime,User::CaptureStartedThe VB script code to support writing to the event log is shown below. Public Sub LogCaptureProcessNotStartedMessage() Dim varPackageName As Variable =Dts.Variables("System::PackageName") Dim varStartTime As Variable = Dts.Variables("System::StartTime") Dim varCaptureStarted As Variable =Dts.Variables("User::CaptureStarted") Dim sLog As String Dim sEventMessage As String Dim sMachine As String Dim sSource As String Dts.VariableDispenser.LockForRead("System::PackageName") Dts.VariableDispenser.LockForRead("System::StartTime") Dts.VariableDispenser.LockForRead("User::CaptureStarted") sLog = "Application" sSource = varPackageName.Value().ToString sEventMessage = "The CDC Capture Process was not started." _ & "Make certain that SQL Agent is running." _ & Chr(10) _ & "=============================================" & Chr(10) _ & "The Package: " + varPackageName.Value().ToString _ & Chr(10) _ & "Started: " & varStartTime.Value().ToString _ & Chr(10) _ & "Current Time:" & System.DateTime.Now _ & Chr(10) _ & "=============================================" _ & Chr(10) _ & "Capture Started: " & varCaptureStarted.Value().ToString sMachine = "." Dim ELog As New EventLog(sLog, sMachine, sSource) ELog.WriteEntry(sEventMessage, rmation) Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Success End SubThe event log message posted will be similar to that shown in Figure 2:Figure 2: Event Log Message Signaling Capture Process Not StartedExtracting and Processing Change DataAfter the boundaries of the first incremental load have been determined, the setup package enters a loop which invokes the master package to harvest change data at 10 second intervals until all the changes from the generated workload have been applied to the target. For Loop Container - Cycle Master at 10 Second IntervalsThe Loop Container logic itself is designed to execute the container tasks until all of the changes in the generated workload have been extracted and applied to the target and then terminate. Two conditions need to be satisfied in order for the loop to terminate. First the workload generation tasks need to have all completed. This is determined by checking the package variable WorkloadCompleted, which is set by the Mark Workload Completion task that runs after all of the load generation tasks run. Second, all of the changes associated with the workload must have been applied to the target tables. This is determined by checking the proposed start time for the next extraction interval. If this value is greater than the time when the Workload Completion Task ran, then we are assured that the extraction intervals already processed fully cover the time when changes were applied to the source tables and the run can terminate.The loop container also maintains a package variable that is used as an interval counter. It initializes the variable to 0 and then increments it by one during each iteration. Since the interval counter is a package variable, it can be passed to the master package launched from within the loop.For Loop PropertiesProperty NameProperty ValueInitExpression@IntervalID = 0EvalExpression!((@WorkloadCompleted == 1) && (@ExtractStartTime > @WorkloadEndTime))AssignExpression@IntervalID = @IntervalID + 1Within the for loop container are two tasks - a Script Task that computes the new end-points for the extraction interval and an Execute Package Task that invokes the master package driving the extraction cycle.Script Task - Set Extract IntervalThe Script Task Set Extract Interval is used to determine the end-points for the next query window. It uses the high end-point of the previous window as the low end-point of the current window. It then adds 10 seconds to the new low end-point to obtain the new high end-point. The computed end-points are returned as a single row result set and deposited in package variables User::ExtractStartTime and User::ExtractEndTime.The EntryPoint property which defines the first method that executes when the script task runs is SetExtractInterval. The ReadWriteVariables property setting makes the following package variables available to the script:User::ExtractStartTimeUser::ExtractEndTimeThe VB script code to support computing the query interval end-points is show below. Public Sub SetExtractInterval() Dim varStartTime As Variable =Dts.Variables("User::ExtractStartTime") Dim varEndTime As Variable =Dts.Variables("User::ExtractEndTime") Dts.VariableDispenser.LockForWrite("User::ExtractStartTime") Dts.VariableDispenser.LockForWrite("User::ExtractEndTime") varStartTime.Value() = varEndTime.Value() varEndTime.Value() =DateAdd(DateInterval.Second, 10, varStartTime.Value()) Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Success End SubExecute Package Task – Run Master to Extract DataThe Execute Package Task, Run Master to Extract Data, functions as the test harness for the ETL extraction cycle. Its’ sole function is to launch the master package that will in turn launch the individual extraction packages for the individual tables.Validating Incremental Load and Reporting Completion StatusOnce the generated workload has been applied to the target environment, the state of the replicas should match that of the source tables. The SQL Script Task Check for Mismatch in Replicas uses SQL CHECKSUM to verify the contents of the source and target tables match, setting package variables to record the status of the run. Finally the VB Script task Output Run Completion Status is launched to output the run status to the event log. Execute SQL Task – Check for Mismatch in ReplicasThe Execute SQL Task Check for Mismatch in Replicas is used to determine whether the replicas created match the source tables. CHECKSUM is used to compare the table contents and package variables are set for each of the tracked tables to indicate whether differences were detected. The following SQL Statement is defined as Direct input:declare @CustomerMismatch int,@CreditCardMismatch int,@WorkOrderMismatch int,@Checksum bigint,@ChecksumDW bigintselect @CustomerMismatch = 0,@CreditCardMismatch = 0,@WorkOrderMismatch = 0,@Checksum = 0,@ChecksumDW = 0select @Checksum = CHECKSUM(*)from AdventureWorks2008.CDCSample.Customerselect @ChecksumDW = CHECKSUM(*)from AdventureWorksDW2008.CDCSample.Customerif (@Checksum <> @ChecksumDW)beginset @CustomerMismatch = 1endselect @Checksum = CHECKSUM(*)from AdventureWorks2008.CDCSample.CreditCardselect @ChecksumDW = CHECKSUM(*)from AdventureWorksDW2008.CDCSample.CreditCardif (@Checksum <> @ChecksumDW)beginset @CreditCardMismatch = 1endselect @Checksum = CHECKSUM(*)from AdventureWorks2008.CDCSample.WorkOrderselect @ChecksumDW = CHECKSUM(*)from AdventureWorksDW2008.CDCSample.WorkOrderif (@Checksum <> @ChecksumDW)beginset @WorkOrderMismatch = 1endselect @CustomerMismatch as CustomerMismatch,@CreditCardMismatch as CreditCardMismatch,@WorkOrderMismatch as WorkOrderMismatchThe SQL statement has no input parameters. The result set is defined as a single row with the columns CustomerMismatch, CreditCardMismatch, and WorkOrder Mismatch.Result NameVariable NameCustomerMismatchUser::CustomerMismatchCreditCardMismatchUser::CreditCardMismatchWorkOrderMismatchUser::WorkOrderMismatchVB Script Task – Output Run Completion StatusThe VB Script Task Output Run Completion Status writes a status entry to the Event Log indicating the result of the comparisons made between the source tables and the target tables that were updated using Change Data Capture. The EntryPoint property which defines the first method that executes when the script task runs is OutputRunCompletionStatus.. The ReadOnlyVariables property setting makes the following package variables available to the script:System::PackageNameSystem::StartTimeUser::CreditCardMismatchUser::CustomerMismatchUser::WorkOrderMismatchThe VB script code to log completion status is shown below. Note that a comparison failure for any of the tables will cause the task to complete with a failure status. Public Sub OutputRunCompletionStatus()Dim varPackageName As Variable =Dts.Variables("System::PackageName") Dim varStartTime As Variable = Dts.Variables("System::StartTime") Dim varCustomerMismatch As Variable =Dts.Variables("User::CustomerMismatch") Dim varCreditCardMismatch As Variable =Dts.Variables("User::CreditCardMismatch") Dim varWorkOrderMismatch As Variable =Dts.Variables("User::WorkOrderMismatch") Dim sLog As String Dim sEventMessage As String Dim sMachine As String Dim sSource As String Dim sCustomer As String Dim sCreditCard As String Dim sWorkOrder As String sCustomer = "Customer replica is identical." sCreditCard = "CreditCard replica is identical." sWorkOrder = "WorkOrder replica is identical." Dts.VariableDispenser.LockForRead("System::PackageName") Dts.VariableDispenser.LockForRead("System::StartTime") Dts.VariableDispenser.LockForRead("User::CustomerMismatch") Dts.VariableDispenser.LockForRead("User::CreditCardMismatch") Dts.VariableDispenser.LockForRead("User::WorkOrderMismatch") Dts.TaskResult = ScriptResults.Success If varCustomerMismatch.Value = 1 Then sCustomer = "Customer replica does not match source." Dts.TaskResult = ScriptResults.Failure End If If varCreditCardMismatch.Value = 1 Then sCreditCard = "CreditCard replica does not match source." Dts.TaskResult = ScriptResults.Failure End If If varWorkOrderMismatch.Value = 1 Then sWorkOrder = "WorkOrder replica does not match source." Dts.TaskResult = ScriptResults.Failure End If sLog = "Application" sSource = varPackageName.Value().ToString sEventMessage = "CDC SSIS Sample Completion Status" _ & Chr(10) _ & "=============================================" & Chr(10) _ & "The Package: " + varPackageName.Value().ToString _ & Chr(10) _ & "Started: " & varStartTime.Value().ToString _ & Chr(10) _ & "Current Time:" & System.DateTime.Now _ & Chr(10) _ & "=============================================" & Chr(10) _ & "Customer table: " & sCustomer _ & Chr(10) _ & "=============================================" & Chr(10) _ & "CreditCard table: " & sCreditCard _ & Chr(10) _ & "=============================================" & Chr(10) _ & "WorkOrder table: " & sWorkOrder _ & Chr(10) _ & "=============================================" sMachine = "." Dim ELog As New EventLog(sLog, sMachine, sSource) ELog.WriteEntry(sEventMessage, rmation) Dts.Variables.Unlock() End SubSetupCDCSample Package VariablesThe following package variables are defined for the setup package:NameData TypeDescription and Default ValueBasePathStringInstall path for Change Data Capture for Specified Interval Package Sample packagesDefault: @[SQLServerInstallPath] + "100\\Samples\\Integration Services\\Package Samples\\Change Data Capture for Specified Interval Package Sample\\Change Data Capture Sample\\"ScriptPathStringInstall path for Change Data Capture for Specified Interval Package Sample scriptsDefault: @[BasePath] + "Scripts\\"SQLServerInstallPathStringInstall path for SQL ServerDefault: c:\program files\Microsoft SQL Server\CaptureStartedBooleanFlag indicating capture process has startedDefault: FalseCreateSnapshotStringSQL statement to create database snapshotDefault: "CREATE DATABASE AdventureWorks2008_dbss ON( NAME = AdventureWorks2008_Data, FILENAME = '" + @[BasePath] + "AdventureWorks2008_data.ss' )AS SNAPSHOT OF AdventureWorks2008;"ExtractEndTimedatetimeEnd time of next extractionDefault: 5/6/2007 8:54 AMExtractStartTimedatetimeStart time of next extractionDefault: 5/6/2007 8:54 AMIntervalIDInt32Extraction Interval identifierDefault: 0LastLSNStringLSN anchor for first extractionDefault: 0x00000000000000000000WorkloadCompletedInt320 if Customer replica is identical at the end of the run: 1 if validation detected a mismatchDefault: 0WorkloadEndTimedatetimeTime of workload completionDefault: 2/21/2008 12:31 PMCustomerMismatchInt320 if Customer replica is identical at the end of the run: 1 if validation detected a mismatchDefault: 0CreditCardMismatchInt320 if CreditCard replica is identical at the end of the run: 1 is validation detected a mismatchDefault: 0WorkOrderMismatchInt320 if WorkOrder replica is identical at the end of the run: 1 is validation detected a mismatchDefault: 0MasterCDC PackageThe package MasterCDC is responsible for launching all of the individual packages used to extract change data for the SQL Server source tables and apply the changes to the Data Mart. The principle task of the master package is to verify that the extraction interval passed from the parent lies with the current Change Data Capture database validity interval.The Change Data Capture Database Validity IntervalThe Change Data Capture validity interval for a database is simply the time interval for which change data is currently available for its capture instances. In principle, it begins when the first capture instance is created for a database table, and extends forward in time to the present. In practice, the validity interval is a moving window just as the extraction interval is a moving window. Change data deposited in tables would grow unmanageably if it wasn’t periodically and systematically pruned. By default, only three days of data are retained in the change tables. Hence, the period of time covered by the validity interval is typically 72 hours.While the cleanup process works to move the low end-point of the validity interval to the right on the Change Data Capture timeline, the capture process does the same for the high end-point. Since the capture process extracts change data from the transaction log, there is a built in latency between the time that a change is committed to a source table and the time that the change appears within its associated change table. While this latency is typically small, it is nevertheless important to remember that change data is not available until the capture process has processed the related log entries. For each transaction that results in one or more entries appearing within database change tables, an entry is logged to the cdc.lsn_time_mapping table. Each entry in the mapping table contains both a commit Log Sequence Number or commit LSN, and a transaction commit time (columns start_lsn and tran_end_time respectively.) Change Data Capture uses the table cdc.lsn_time_mapping to identify the current bounds of the database validity interval, with the smallest and largest commit times represented in the entries of cdc.lsn_time_mapping table denoting the low and high end-points of the validity interval for the database. The Change Data Capture Validity Interval for a Capture InstanceWhile it is often the case that the database validity interval and an individual capture instance’s validity interval will coincide, this is not always true. The validity interval of the capture instance begins when the capture process recognizes the capture instance and begins logging associated changes to its change table. As a result, if capture instances are created at different times, each will initially have a different low end-point. The start_lsn column of the result set returned by sys.sp_cdc_help_change_data_capture shows the current low end-point for each defined capture instance. When the cleanup process cleans up change table entries, it adjusts the start_lsn values for all capture instances to reflect the new low water mark for available change data. Only those capture instances with start_lsn values currently less than the new low water mark are adjusted. Over time, if no new capture instances are created, the validity intervals for all individual instances will coincide with the database validity interval. The validity interval is important to consumers of change data because the extraction interval for a request must be fully covered by the current Change Data Capture validity interval for the capture instance. If the low end-point of the extraction interval is to the left of the low end-point of the validity interval, there could be missing change data due to aggressive cleanup. If the high end-point of the extraction interval is to the right of the high end-point of the validity interval, the capture process has not yet processed through the time period represented by the extraction interval and change data could also be missing. This relationship is illustrated in the diagram below.Figure 3: Change Data Capture Validity IntervalsA Closer Look at an SSIS Wrapper FunctionIt is important to note that the CDC query functions will fail if the request interval is not fully covered by the validity interval. In this sample, validity checks are systematically performed up front in the master package, so that the recoverable case, where the capture process simply needs to catch up, can be dealt with automatically.The first task of the master package is to verify the extraction interval that it is passed. If the low end–point of the extraction interval is outside the validity interval, the package logs an error to the event log and exits with error status. If the high end-point of the extraction interval identifies a time that is ahead of current processing by the CDC capture process, the master package will delay to allow the capture process to catch up. Once the extraction interval is in range, the master package launches the child packages. Each child package picks up the datetime values defining the extraction interval from its parent through Package Configurations. When all client packages have completed, the master package logs an event message indicating completion.The control flow diagram for the master package is shown below.Figure 4: Master Package for Incremental Data ExtractionMaster Package Configurations and VariablesPackage Configurations for Initializing Package VariablesThe master package uses Package Configurations to initialize several runtime parameters. The table below shows the parent variables that are used to initialize local package variables of the master package.Configuration NameConfiguration TypeConfiguration StringTarget ObjectTarget PropertyConfigure ExtractStartTimeParent Package VariableUser::ExtractStartTimeExtractStartTimeValueConfigure ExtractEndTimeParent Package VariableUser::ExtractEndTimeExtractEndTimeValueConfigure IntervalIDParent Package VariableUser::IntervalIDIntervalIDValueConfigure Last LSNParent Package VariableUser::LastLSNLastLSNValueConfigure Base PathParent Package VariableUser::BasePathBasePathValueMaster Package VariablesThe following package variables are defined for the master package:NameData TypeScopeDescriptionBasePathStringMasterCDCPath where Change Data Capture for Specified Interval Package Sample packages are installedDataReadyInt32MasterCDCCode indicating query status0 = Need to wait for capture process1 = Start time predates validity interval2 = Ready for data query ( interval > 1)3 = Ready for first data query5 = Timeout ceiling reached waiting for capture processDelayInt32MasterCDCmilliseconds to delay before rechecking if capture process has caught upExtractStartTimedatetimeMasterCDCStart time of extraction interval ( Interval > 1)ExtractEndTimedatetimeMasterCDCEnd time of extraction intervalIntervalIDInt32MasterCDCInterval ID LastLSNStringMasterCDCLSN anchor used for first extraction intervalTimeoutCeilingInt32MasterCDCNumber of cycles to delay waiting for capture process to catch up prior to terminating with errorTimeoutCountInt32MasterCDCNumber of delays that have already been invokedMaster Package TasksExecute SQL Task – Check for DataThe Execute SQL Task, Check for Data, is used by the master package to determine how to proceed based upon the relationship of the requested extraction interval to the current Change Data Capture database validity interval and the current timeout count. It assigns one of 5 possible values to the package variable DataReady to indicate which of five possible conditions holds. It requires an OLE DB connection to the source database.The following SQL Statement is defined as Direct input:declare @DataReady int, @TimeoutCount intif not exists (select tran_end_ from cdc.lsn_time_mapping where tran_end_time > ? )select @DataReady = 0else if ? = 0select @DataReady = 3else if not exists(select tran_end_time from cdc.lsn_time_mapping where tran_end_time <= ? )select @DataReady = 1else select @DataReady = 2select @TimeoutCount = ?if (@DataReady = 0select @TimeoutCount = @TimeoutCount + 1else select @TimeoutCount = 0if (@TimeoutCount > ?) select @DataReady = 5select @DataReady as DataReady, @TimeoutCount as TimeoutCountThe SQL statement has five input parameters:Variable NameDirectionData TypeParameter NameUser::ExtractEndTimeInputDATE0User::IntervalIDInputSHORT1User::ExtractStartTimeInputDATE2User::TimeoutCountInputSHORT3User::TimeoutCeilingInputSHORT4The result set is defined as a single row with the columns DataReady and TimeoutCount:Result NameVariable NameDateReadyUser::DataReadyTimeoutCountUser::TimeoutCountThe SQL query first determines whether there are any entries in the cdc.lsn_time_mapping table that are later than the requested end of the extraction interval. If there are none, this means that the capture process has not yet processed all the changes in the request interval and DataReady is set to 0. If the capture process is caught up, the Interval ID is next checked. If it is set to 0, this is the first interval to be processed which requires special treatment. DataReady in this case is assigned a value of 3 to indicate this is the first interval. If this is not the first interval, a check is then made to verify that the starting point for the extraction interval is not smaller than all existing entries in the cdc.lsn_time_mapping table. DataReady in this case is set to 1, indicating the extraction interval is invalid and cannot be automatically corrected. Finally, if the starting point of the interval is not outside the validity interval, DataReady is set to 2 indicating that change data can be extracted from change tables for the interval. Once DataReady has been determined, the query determines if the master package is in a delay loop waiting for the capture process to catch up. If DataReady is 0, the timeout counter is incremented and then checked to see if the configured number of wait intervals has been exhausted. If the counter exceeds the ceiling currently set to 20, @DataReady is set to 5 to indicate the wait for the capture process has timed out. Note that whenever a non-zero value is determined for @DataReady, the timeout counter is reset to 0. After execution of the Execute SQL Task Check for Data by the package, there are three possible paths that can be taken: If DataReady is 0 the required changes have not yet been propagated to the change tables. In this case, the Script Task Delay executes to wait for a period of time to allow the capture process to catch up. If DataReady is 1, the low end point of the extraction interval is outside the CDC Change Data Capture validity interval. In this case, an error is logged by the VB Script Task Log Extract Error and the package terminates, since there is no automatic recovery if there is a possibility for data loss. Similarly, if DataReady is 5 the allowable number of delays has been exhausted and the Script Task Log Extract Error is called to log the timeout error. Finally, if DataReady is 2 or 3, all changes through the indicated ExtractEndTime have been deposited in change tables and it is possible to allow the data extraction packages to gather change data. In this case, the three data extraction packages are launched.Execute Package Tasks – Extract Change Data for CDC Enabled TablesThe Execute Package Tasks Extract Cutomer Data, Extract CreditCard Data and extract WorkOrder Data launch the individual extraction packages for the individual tables.Script Task – Log Extract ErrorThe Script Task, Log Extract Error is used to log an error to the Windows event log. The precedence constraint that defines the workflow between the Execute SQL Task Check Data and this task is the following:Success Completion Code and@DataReady == 1 || @DataReady == 5 evaluates to TRUEThe EntryPoint property which defines the first method that executes when the script task runs is LogExtractError. The ReadOnlyVariables property setting makes the following package variables available to the script:User::DataReadySystem::ExecutionInstanceGUID,User::ExtractStartTime,System::PackageName,System::StartTimeThe VB script code to support writing to the event log is shown below. Public Sub LogExtractError() Dim varPackageName As Variable =Dts.Variables("System::PackageName") Dim varStartTime As Variable = Dts.Variables("System::StartTime") Dim varInstanceID As Variable =Dts.Variables("System::ExecutionInstanceGUID")Dim varExtractStartTime As Variable =Dts.Variables("User::ExtractStartTime") Dim varDataReady As Variable = Dts.Variables("User::DataReady") Dim sLog As String Dim sEventMessage As String Dim sMachine As String Dim sSource As String Dim iDataReady As Integer Dts.VariableDispenser.LockForRead("System::PackageName") Dts.VariableDispenser.LockForRead("System::StartTime") Dts.VariableDispenser.LockForRead("System::ExecutionInstanceGUID") Dts.VariableDispenser.LockForRead("User::ExtractStartTime") Dts.VariableDispenser.LockForRead("User::DataReady") sLog = "Application" sSource = varPackageName.Value().ToString iDataReady = varDataReady.Value() If iDataReady = 1 Then sEventMessage = "Start Time Error" Else sEventMessage = "Timeout Error" End If sEventMessage = sEventMessage _ & Chr(10) _ & "=============================================" & Chr(10) _ & "The Package: " + varPackageName.Value().ToString _ & Chr(10) _ & "Started: " & varStartTime.Value().ToString _ & Chr(10) _ & "Current Time:" & System.DateTime.Now _ & Chr(10) _ & "=============================================" _ & Chr(10) _ & "Extract Start Time: " _& varExtractStartTime.Value().ToString _ & Chr(10) _ & "Execution GUID: " & varInstanceID.Value().ToString sMachine = "." Dim ELog As New EventLog(sLog, sMachine, sSource) ELog.WriteEntry(sEventMessage, EventLogEntryType.Error) Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Failure End SubScript Task – DelayThe Script Task Delay is used to delay for a given interval of time. The precedence constraint that defines the workflow between the Execute SQL Task Check Data and this task is the following:Success Completion Code and@DataReady == 0 && @TimeoutCount <=@TimeoutCeiling evaluates to TRUEThe EntryPoint property which defines the first method that executes when the script task runs is Delay. The ReadOnlyVariables property setting makes the following package variables available to the script:User::DelayThe VB script code to support delaying for a period of time is shown below. Public Sub Delay() Dim varDelay As Variable = Dts.Variables("User::Delay") Dim iDelay As Integer Dts.VariableDispenser.LockForRead("User::Delay") iDelay = varDelay.Value() Threading.Thread.Sleep(iDelay) Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Success End SubScript Task – Log Extraction CompleteThe Script Task, Log Extraction Complete is used to log a completion message to the Windows event log. The precedence constraint that defines the workflow between the three Execute Package Tasks is Completion. The EntryPoint property that defines the first method that executes when the script task runs is LogExtractionCompletion. The ReadOnlyVariables property setting makes the following package variables available to the script:User::DataReady,System::ExecutionInstanceGUID,User::ExtractEndTime,User::ExtractStartTime,User::IntervalID,System::PackageName,System::StartTimeThe VB script code to support writing the completion notification to the event log is shown below.Public Sub LogExtractionCompletion() Dim varPackageName As Variable = Dts.Variables("PackageName") Dim varStartTime As Variable = Dts.Variables("StartTime") Dim varInstanceID As Variable = Dts.Variables("ExecutionInstanceGUID") Dim varExtractStartTime As Variable = Dts.Variables("ExtractStartTime") Dim varExtractEndTime As Variable = Dts.Variables("ExtractEndTime") Dim varIntervalID As Variable = Dts.Variables("IntervalID") Dim varDataReady As Variable = Dts.Variables("DataReady") Dim sLog As String Dim sEventMessage As String Dim sMachine As String Dim sSource As String Dts.VariableDispenser.LockForRead("PackageName") Dts.VariableDispenser.LockForRead("StartTime") Dts.VariableDispenser.LockForRead("ExecutionInstanceGUID") Dts.VariableDispenser.LockForRead("ExtractStartTime") Dts.VariableDispenser.LockForRead("ExtractEndTime") Dts.VariableDispenser.LockForRead("IntervalID") Dts.VariableDispenser.LockForRead("DataReady") sLog = "Application" sSource = varPackageName.Value().ToString sEventMessage = "Extract Complete" _ & Chr(10) _ & "=============================================" & Chr(10) _ & "The Package: " + varPackageName.Value().ToString _ & Chr(10) _ & "Started: " & varStartTime.Value().ToString _ & Chr(10) _ & "Current Time:" & System.DateTime.Now _ & Chr(10) _ & "=============================================" _ & Chr(10) _ & "Extract Start Time: " & varExtractStartTime.Value().ToString _ & Chr(10) _ & "Extract End Time: " & varExtractEndTime.Value().ToString _ & Chr(10) _ & "Interval ID: " & varIntervalID.Value().ToString _ & Chr(10) _ & "Data Ready: " & varDataReady.Value().ToString _ & Chr(10) _ & "Execution GUID: " & varInstanceID.Value().ToString sMachine = "." Dim ELog As New EventLog(sLog, sMachine, sSource) ELog.WriteEntry(sEventMessage, rmation) Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Success End SubChild Packages for the Change Data Capture for Specified Interval Package SampleThe sample child packages all use instantiated wrapper functions to query for change data for all extraction intervals after the first interval. For the first interval, they use a customized version of the generated wrapper function that allows the low end-point of the extraction interval to be expressed as an LSN value as opposed to a datetime value. The datetime boundaries of the extraction interval, as well as the LSN boundary used for the first interval, are obtained from parent package variables when the packages are launched. The packages also obtain a flag indicating whether this is the initial extraction interval so that the first interval can be handled as a special case. The basic control flow for the packages is straightforward. A Script Task is called first to construct the SQL query that will be used to query for change data. Control is then passed to the Data Flow Task to request and process the change data. The data flow task uses an OLE DB source component to perform the query, directing the returned result set to a conditional split transformation. The conditional split uses the operation returned in each result set row to direct the rows to appropriate transformations: Deletes and updates are sent to OLE DB command transformations, while inserts are directed to an OLE DB destination. The control and data flow for these child packages is shown in the figure below.Figure 5: Child Packages Using Multi-Statement TVFs for Data AccessThe Configurations used by the child extract packages use package variables from the launching master package to initialize their own package variables.Configuration NameConfiguration TypeConfiguration StringTarget ObjectTarget PropertyConfigure Data ReadyParent Package VariableUser::DataReadyDataReadyValueConfigure End TimeParent Package VariableUser::ExtractStartTimeStartTimeValueConfigure Start TimeParent Package VariableUser::ExtractEndTimeStartTimeValueConfigure Last LSNParent Package VariableUser::LastLSNLastLSNValueChild Package VariablesThe following package variables are defined for the child packages:NameData TypeScopeDescriptionDataReadyInt32CDCWorkOrderExtractCode indicating query status0 = Need to wait for capture process1 = Start time predates validity interval2 = Ready for data query ( interval > 1)3 = Ready for first data query5 = Timeout ceiling reached waiting for capture processExtractStartTimedatetimeCDCWorkOrderExtractStart time of extraction interval ( Interval > 1)ExtractEndTimedatetimeCDCWorkOrderExtractEnd time of extraction intervalLastLSNStringCDCWorkOrderExtractLSN anchor used for first extraction intervalSQLDataQueryStringCDCWorkOrderExtractQuery to use to obtain change dataChild Package TasksScript Task - Generate SQL Data QueryThe Execute SQL Task Generate SQL Query is used to set up the query for change data. It allows us to work around the inability to pass parameters directly to table valued functions in an OLE DB source. The parameters are passed to the VB script code which is used to compose the query string and return the desired select statement in the package variable SQLDataQuery.One additional issue bears mentioning. Our general strategy for systematically processing a stream of change data is to use the high end-point of the previous interval to determine the low end-point of the subsequent interval, and to compute a new high end-point based upon the needs of the application environment. This strategy works well for all intervals except the initial interval, when there is no previous interval. In general, the destination and source will not be synchronized when a decision is made to use CDC technology to apply incremental loads and the first task will be to identify an anchor for the destination. The anchor is defined as an LSN lying within the Change Data Capture validity interval of a capture instance such that (1) All changes with start LSN values up to and including that anchor are already reflected in the destination and (2) All changes with start LSN values greater than the anchor have yet to be applied. Once the anchor is determined, it is used to explicitly set the LSN boundary for the initial extraction.For this sample, a database snapshot provides data for the initial load, allowing an appropriate anchor LSN for the first extraction to be determined directly from snapshot metadata. While use of a database snapshot in preparing the initial load is not a requirement, it greatly simplifies the synchronization process. The Script Task Generate SQL Data Query is used to generate the query to be used to extract change data. It uses the passed package variable DataReady to determine whether the query is for the first interval or for a subsequent interval. If it is the first interval, it constructs a call to the function fn_net_changes_WorkOrder_First which takes an LSN value as its first parameter. Otherwise, it constructs a call to the function fn_net_changes_WorkOrder which uses two datetime values as the interval end-points.The EntryPoint property that defines the first method that executes when the script task runs is GenerateSQLQuery. The ReadOnlyVariables property setting makes the following package variables available to the script:User::DataReady,User::EndTime,User::LastLSN,User::StartTimeThe ReadWriteVariables property setting allows the following package variableto be set by the script:User::SQLDataQueryThe VB script code to support constructing the data query is shown below. Public Sub GenerateDataQuery() Dim varStartTime As Variable = Dts.Variables("User::StartTime") Dim varEndTime As Variable = Dts.Variables("User::EndTime") Dim varDataReady As Variable = Dts.Variables("User::DataReady") Dim varSQLDataQuery As Variable =Dts.Variables("User::SQLDataQuery") Dim varLastLSN As Variable = Dts.Variables("User::LastLSN") Dim iDataReady As Integer Dim sStartTime As String Dim sEndTime As String Dim sLastLSN As String Dim dStartTime As DateTime Dim dEndTime As DateTime Dts.VariableDispenser.LockForRead("User::StartTime") Dts.VariableDispenser.LockForRead("User::EndTime") Dts.VariableDispenser.LockForRead("User::DataReady") Dts.VariableDispenser.LockForWrite("User::SQLDataQuery") Dts.VariableDispenser.LockForWrite("User::LastLSN") iDataReady = varDataReady.Value() dStartTime = varStartTime.Value() sStartTime = dStartTime.ToString("G", DateTimeFormatInfo.InvariantInfo) dEndTime = varEndTime.Value() sEndTime = dEndTime.ToString("G", DateTimeFormatInfo.InvariantInfo) sLastLSN = varLastLSN.Value().ToString If iDataReady = 2 Then varSQLDataQuery.Value ="select * from dbo.fn_net_changes_WorkOrder('" _ & sStartTime & "','" & sEndTime & "', 'all')" Else varSQLDataQuery.Value = "select * from dbo.fn_net_changes_WorkOrder_First('" _ & sLastLSN & "','" & sEndTime & "', 'all')" End If Dts.Variables.Unlock() Dts.TaskResult = ScriptResults.Success End SubData Flow Task – Process Change DataThe Data Flow Task Process Change Data calls a table valued function to query the source for change data and then applies the change data returned in the result set to the destination.OLE DB SourceThe query used by these child processes is obtained from a package variable and makes use of an instantiated wrapper function. (Wrapper functions are discussed in detail at the end of this document.) Data Access ModeSQL Command from variableVariable NameUser::SQLDataQueryThe result set returned by the query includes the metadata column CDC_OPERATION that identifies the operation to be used when applying the change to the destination. A conditional split transformation is used to direct the rows to one of three possible components based upon the following defined conditions:OrderOutput NameCondition1Inserts__CDC_OPERATION == "I"2Updates__CDC_OPERATION == "UN"3Deletes__CDC_OPERATION == "D"OLE DB Command Data Mart DeletesThe Delete flow is directed to the OLE DB Command Data Mart Deletes. The following command is used to apply the changes within this flow to the destination:delete from CDCSample.WorkOrder where WorkOrderID = ? The command requires a column from the result set in order to successfully apply the delete. This is the primary key columns for the table. The delete command above is for the child package processing changes to the WorkOrder table, which has a single primary key column WorkOrderID.The mapping of result set columns to command parameters can be seen in the following diagram:Figure 6: Result Set Mapping for DeletesOLE DB Command Data Mart UpdatesThe Update flow is directed to the OLE DB Command Data Mart Updates. The following command is used to apply the changes within this flow to the destination:update CDCSample.WorkOrder setProductID = ?,OrderQty = ?,ScrappedQty = ?,StartDate = ?,EndDate = ?,DueDate = ?,ScrapReasonID = ?,ModifiedDate = ?where WorkOrderID = ?This command requires all of the source columns from the result set in order to successfully apply the update. The mapping of result set columns to command parameters can be seen in the following diagram:Figure 7: Result Set Mapping for UpdatesOLE DB Destination Data Mart InsertsThe Insert flow is directed to the OLE DB Destination. The table name alone identifies the target for the insert. The following diagram shows how the result set columns are mapped to the table columns:Figure 8: Result Set Mapping for InsertsError Logging in the Child PackagesEach of the child packages makes use of the SSIS Log Provider for Windows Event Log to allow logging for the OnError event. Logging is configured using the Configure SSIS Logs dialog box. This box appears in the designer when right clicking on a selected package and choosing Logging. This dialog box also allows the current logging settings to be examined. For the child packages, this dialog box shows the SSIS Log Provider for Windows Event Log as the Provider type on the Providers and Logs tab. Under the Details tab, the OnError condition is set. Even when range validation is done up front, it is always possible that conditions will change between the time the check is performed and the time the TVF executes. Applications must always be prepared to deal with the possibility of range errors. The CDC TVFs check to insure that the extraction interval defined is fully covered by the validity interval for the capture instance and errors if this requirement is not met. In the sample, if range errors are encountered by any of the child packages, OLE DB will log the error to the event log producing an entry similar to the following. Figure 9: Range Error Posted from OLEDBThe error returned for range errors is error 313, “An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_net_changes_ …”. The figure above shows this error.Wrappers for CDC TVFsInstantiated Multi-Statement TVF WrappersWrapper functions for consuming CDC change data serve several important purposes. First, and most importantly, they allow an SSIS Data Flow task to be used in a straightforward manner to query for change data. The inline TVFs that are generated to access CDC change tables do not allow the column structure of the returned result set to be determined by an OLE DB provider. This limitation, however, can be dealt with in a straightforward fashion by wrapping the inline TVFs with a multi-statement TVF that explicitly identifies the columns returned by the CDC function. The scripted wrapper functions do precisely this.While the need to be able to determine column information about the returned result set represents the principle reason for making use of a wrapper function for the generated CDC TVF, the wrapper serves several other very useful purposes.The generated CDC query functions used to gather change data use Log Sequence Numbers (LSNs) to mark the boundary of the query window. While LSNs are invaluable in insuring that data can be retrieved from change tables in a systematic manner that guarantees no lost or repeated data, they have virtually no meaning to the application layer that wants to consume change data. Particularly for Data Mart applications, the request for change data is most typically bounded by datetime values. The CDC feature has built-in functionality to deal with systematically mapping between datetime values and LSN values. The wrapper function is an ideal place to address these mapping issues, allowing the application to define its request interval as a datetime pair, with the wrapper function performing the necessary translation between these values and the LSN values needed to query the generated CDC TVFs.In addition to shielding the SSIS application from dealing with LSN values directly, the wrapper function can perform column filtering on the data returned from the CDC TVF more efficiently than column filtering done when querying the wrapper function. The result set returned by the CDC TVF is expected to serve a variety of clients, and typically the captured columns included within a capture instance are not client specific. When the requesting package needs only a subset of the captured columns, this column filtering is best done within the wrapper code.Finally, the wrapper code allows information returned from the generated TVF to be recoded in a form that is more easily consumed by the calling application. In its simplest form, recoding can be used to convert the integer based operation codes into more meaningful single character codes that provide built in cues to their meaning, ie mapping 2 to ‘I’ for insert, 1 to ‘D’ for delete, and 4 to ‘UN’ for update new values. The more interesting case, however, is the recoding of column update information extracted from the CDC update mask.When a CDC function returning net changes is called using the row filter option ‘all with mask’, an update mask is generated that identifies all of the column values that changed due to the update. If more than one update occurred during the interval, the mask represents the aggregate changes for all updates. This ability to easily determine the columns that will change, prior to applying an update, can be extremely useful during ETL dimension processing. Because this information is typically only needed for a small subset of columns, the wrapper is an appropriate place to extract this information from the mask and return it as simple flag columns to the SSIS application layer.We will now look at a simple example of a wrapper function for the table WorkOrders in AdventureWorks2008. A Closer Look at an SSIS Wrapper FunctionUse of datetime Values in the Wrapper SignatureWhat we first note when examining a wrapper function is that its signature contains two datetime values: a start time and an end time. Typically, it is most natural for an SSIS application to use a time interval to delimit a query range, since this allows the CDC change data to be easily related to other Data Mart data. While the CDC TVFs do not allow datetime values to be used directly as query end-points, mapping functions are provided that make it possible to systematically translate datetime values into corresponding LSNs. The wrapper functions provide an ideal location in which to imbed this mapping logic.In the generated wrapper functions scripted by the procedure sys.sp_cdc_generate_wrapper_function, the following convention is used to guarantee that no data is repeated or skipped. The caller of the wrapper agrees to pass the end time of the previous data query as the start time of the next query, without modification. Within the wrapper, the calls to the mapping functions are always constructed in exactly the same way. This makes it possible to insure that given the same time value passed in the previous call, the mapping function will be able to regenerate the same LSN value in the subsequent call. This is key to guaranteeing that the resulting LSN based intervals do not have breaks or overlap. Finally, the function sys.fn_cdc_increment_lsn is used to increment the previous end lsn by one to obtain the next start LSN value. Note that it is guaranteed that there are no LSNs that lie between an LSN and that LSN value incremented by 1. Note also that the LSNs passed to the TVFs define a closed interval, so that all change entries having LSN values within the defined interval, including the boundary values, will be included in the returned result.Below we have extracted the portion of the wrapper function that addresses the mapping of datetime values to LSN values to define the extraction interval. Note that passing a null value for the start time value signals the wrapper function to use the current low end point of the capture instance validity interval as the interval start time. Similarly, a null end time value will cause the high end point of the capture instance to be used as the high end point for the extraction interval. This serves a couple of useful purposes. When you are designing the package to process the result set returned by the wrapper function, you can set the default string that defines the call to use null parameters. This allows you to get needed column information for the result set without worrying too much about providing a valid extraction interval. The call to the wrapper function using null values will only fail if the cdc.lsn_time_mapping table has no entries.Some of the additional checking performed by the wrapper function deserves explanation. We know that the CDC LSN based query functions will return a non-zero value in @@error if the LSN range does not fall within the validity interval of the capture instance. We would like the same to be true when the LSN based query functions are called from within a generated wrapper function. In order to insure this, we need to check the datetime range explicitly, prior to mapping it to an LSN value. If the datetime range value is not within bounds, it is mapped to NULL forcing the range error when the underlying TVF is called. Also note the following check:If @from_lsn is not null and @to_lsn is not null and (@from_lsn = sys.fn_cdc_increment_lsn( @to_lsn ))returnThis is a legitimate condition that can occur when there are no entries in cdc.lsn_time_mapping in the interval between the start and end time. This causes the function sys.fn_cdc_map_time_to_lsn(‘largest less than or equal’, @start_time) and sys.fn_cdc_map_time_to_lsn(‘largest less than or equal’, @end_time) to return the same LSN value. Incrementing the start LSN by one causes the condition to evaluate to true. In this case, it is appropriate to return with an empty result set.CREATE function [dbo].[fn_net_changes_WorkOrder] ( @start_time datetime = null, @end_time datetime = null, @row_filter_option nvarchar(30) = N'all')…begindeclare @from_lsn binary(10), @to_lsn binary(10)if (@start_time is null)select @from_lsn = [sys].[fn_cdc_get_min_lsn]('WorkOrder')elsebeginif ([sys].[fn_cdc_map_lsn_to_time([sys].[fn_cdc_get_ min_lsn]('WorkOrder') > @start_timeor ([sys].[fn_cdc_map_lsn_to_time]([sys].[fn_cdc_get_max_lsn]()) < @start_time)select @from_lsn = nullelse select @from_lsn = [sys].[fn_cdc_increment_lsn]([sys].[fn_cdc_map_time_to_lsn]('largest less than or equal',@start_time))if (@end_time is null)select @to_lsn = sys.fn_cdc_get_max_lsn()elsebeginif [sys].[fn_cdc_map_lsn_to_time]([sys].[fn_cdc_get_max_lsn]() < @endt_time)select @to_lsn = nullelseselect @to_lsn = [sys].[fn_cdc_map_time_to_lsn]('largest less than or equal',@end_time)endif @from_lsn is not null and @to_lsn is not null and (@from_lsn = [sys].[fn_cdc_increment_lsn](@to_lsn))return…end Returning Column Information in the Wrapper FunctionThe wrapper function is structured as a multi-statement TVF. The result set returned by the function is defined explicitly, and allows the OLE DB provider to make column information available to the calling program. The need to include this information explicitly would make this one of the more tedious aspects of preparing wrapper functions manually. Note that the result from querying the CDC TVF is inserted into the defined table before exiting the function. The defined table includes all of the table columns plus a final column __CDC_OPERATION. This column is a more user friendly recoding of the __$Operation column returned in the CDC TVF.create function [dbo].[fn_net_changes_WorkOrder] ( @start_time datetime = null, @end_time datetime = null, @row_filter_option nvarchar(30) = N'all')returns @resultset table ([WorkOrderID] int,[ProductID] int,[OrderQty] int,[ScrappedQty] smallint,[StartDate] datetime,[EndDate] datetime,[DueDate] datetime,[ScrapReasonID] smallint,[ModifiedDate] datetime,[__CDC_OPERATION] varchar(2)) asbegin…insert into @resultsetselect [WorkOrderID],[ProductID],[OrderQty],[ScrappedQty],[StartDate],[EndDate],[DueDate],[ScrapReasonID],[ModifiedDate],case [__$operation]when 1 then 'D'when 2 then 'I'when 3 then ‘UO’when 4 then 'UN'when 5 then ‘M’else null end as [__CDC_OPERATION]from [cdc].[fn_cdc_get_net_changes_WorkOrder](@from_lsn, @to_lsn, @row_filter_option')returnend go Extracting Information from the CDC Update MaskCDC functionality includes the ability to identify all column values that changed for each identified update operation. When querying for all changes, this information is always returned. When querying for net changes, the update mask is only returned as a non-null value when the filter option ‘all with mask’ is selected. While this information can be extremely useful, its representation in mask form is awkward for SSIS to consume directly. CDC does, however, provide SQL functions to assist in extracting data from the update mask, making wrapper functions an ideal place for performing this extraction. While information from the update mask is not used within the current sample, it is nevertheless useful to show how logic to extract data from the update mask can be embedded within a simple wrapper function.While the CDC supplied mask provides information on all captured columns, it is likely that the application will only be interested in this information for a handful of columns. In the example below, the wrapper for the WorkOrders table is modified to include a final flag column that indicates on update whether the column OrderQty has changed.For this sample we made use of the default wrapper functions so were able to generate the wrappers for all defined, accessible capture instances with a single call. It is possible, however, to tailor the call to the scripting stored procedure to further customize the generated wrapper. In this case, we want to explicitly identify columns that need an update flag. We do this by coding the optional parameter @update_flag_list as a comma separated list of columns for which update information is needed.The call below generates a wrapper with an additional output column that for update operations, signals whether or not the column OrderQty was modified.exec sys.sp_cdc_generate_wrapper_function @capture_instance = 'WorkOrder', @update_flag_list = 'OrderQty' When examining the code below, note that the returned table @WorkOrder now includes an additional column OrderQty_uflag, after __CDC_OPERATION. This column will hold the additional update flag. Next, note the call to the function sys.fn_cdc_get_column_ordinal to obtain the column ordinal for the column of interest. Once the column ordinal is known, the function sys.fn_cdc_is_bit_set can be applied in the select statement for each returned update to determine whether the bit corresponding to the column is set in the returned mask. Finally, note that when the TVF dbo.fn_net_changes_Workload is invoked, the row filter option must be set to 'all with mask' to signal, in the net changes query, that the mask should be computed.create function [dbo].[fn_net_changes_WorkOrder] (@start_time datetime = null,@end_time datetime = null,@row_filter_option nvarchar(30) = N'all')returns @resultset table ([WorkOrderID] int,[ProductID] int,[OrderQty] int,[ScrappedQty] smallint,[StartDate] datetime,[EndDate] datetime,[DueDate] datetime,[ScrapReasonID] smallint,[ModifiedDate] datetime,[__CDC_OPERATION] varchar(2),[OrderQty_uflag] bit) asbegindeclare @from_lsn binary(10), @to_lsn binary(10)declare @ordinal_1 intselect @ordinal_1 = [sys].[fn_cdc_get_column_ordinal]('WorkOrder', 'OrderQty')if (@start_time is null)select @from_lsn = [sys].[fn_cdc_get_min_lsn]('WorkOrder')elsebeginif ([sys].[fn_cdc_map_lsn_to_time]([sys].[fn_cdc_get_min_lsn]('WorkOrder')) > @start_time) or([sys].[fn_cdc_map_lsn_to_time]([sys].[fn_cdc_get_max_lsn]()) < @start_time)select @from_lsn = nullelseselect @from_lsn = [sys].[fn_cdc_increment_lsn]([sys].[fn_cdc_map_time_to_lsn]('largest less than or equal',@start_time))endif (@end_time is null)select @to_lsn = [sys].[fn_cdc_get_max_lsn]()elsebeginif [sys].[fn_cdc_map_lsn_to_time]([sys].[fn_cdc_get_max_lsn]()) < @end_timeselect @to_lsn = nullelseselect @to_lsn = [sys].[fn_cdc_map_time_to_lsn]('largest less than or equal',@end_time)endif @from_lsn is not null and @to_lsn is not null and(@from_lsn = [sys].[fn_cdc_increment_lsn](@to_lsn))returninsert into @resultsetselect [WorkOrderID],[ProductID],[OrderQty],[ScrappedQty],[StartDate],[EndDate],[DueDate],[ScrapReasonID],[ModifiedDate],case [__$operation]when 1 then 'D'when 2 then 'I'when 3 then 'UO'when 4 then 'UN'when 5 then 'M'else null end as [__CDC_OPERATION],case[ __$operation]when 4 then case [__$update_mask]when null then null else [sys].[fn_cdc_is_bit_set](@ordinal_1, [__$update_mask])Endelse nullend as [OrderQty_uflag]from [cdc].[fn_cdc_get_net_changes_WorkOrder](@from_lsn, @to_lsn, @row_filter_option)returnend Customizing Wrapper Functions for the First Extraction IntervalOur strategy for synchronizing the initial load from a database snapshot forces us to deal with the first extraction interval specially, allowing the lower bound of the extraction interval to be expressed as an LSN value, while continuing to honor a datetime value as the upper bound. While these customized functions are not currently scripted when wrapper functions are generated, it is straightforward to hand modify the generated wrappers to produce a version designed specifically to handle the initial extraction interval.In this sample, the code for these functions is included explicitly in the script file CDCSetupTables.sql. The modified WorkOrder script is shown below, with the changes appearing in black. These changes include a change to the function signature, the conversion of the string representation of the LSN to a binary(10), some range validation against the capture instance validity interval, and an incremental adjustment to get all changes after the defined anchor LSN.-- Generate custom wrapper functions for initial incremental load of -- WorkOrdercreate function [dbo].[fn_net_changes_WorkOrder_First](@start_lsn_str varchar(40) = null,@end_time datetime = null,@row_filter_option nvarchar(30) = N'all')returns @resultset table ( [WorkOrderID] int, [ProductID] int,[OrderQty] int, [StockedQty] int, [ScrappedQty] smallint,[StartDate] datetime, [EndDate] datetime, [DueDate] datetime,[ScrapReasonID] smallint, [ModifiedDate] datetime,[__CDC_OPERATION] varchar(2) ) asbegindeclare @from_lsn binary(10), @to_lsn binary(10),@start_lsn binary(10) if (@start_lsn_str is null)select @from_lsn =[sys].[fn_cdc_get_min_lsn]('WorkOrder')elsebegin select @start_lsn =[dbo].[HexStrToVarBin](@start_lsn_str)if ([sys].[fn_cdc_get_min_lsn]('WorkOrder') >@start_lsn) or ([sys].[fn_cdc_get_max_lsn]() < @start_lsn)select @from_lsn = nullelseselect @from_lsn =[sys].[fn_cdc_increment_lsn](@start_lsn)endif (@end_time is null)select @to_lsn = [sys].[fn_cdc_get_max_lsn]()elsebeginif [sys].[fn_cdc_map_lsn_to_time]([sys].[fn_cdc_get_max_lsn]()) < @end_timeselect @to_lsn = nullelseselect @to_lsn = [sys].[fn_cdc_map_time_to_lsn]('largest less than or equal',@end_time)endif @from_lsn is not null and @to_lsn is not null and(@from_lsn = [sys].[fn_cdc_increment_lsn](@to_lsn))returninsert into @resultsetselect [WorkOrderID], [ProductID], [OrderQty], [StockedQty], [ScrappedQty], [StartDate], [EndDate], [DueDate], [ScrapReasonID], [ModifiedDate], case [__$operation]when 1 then 'D'when 2 then 'I'when 3 then 'UO'when 4 then 'UN'when 5 then 'M'else nullend as [__CDC_OPERATION] from [cdc].[fn_cdc_get_net_changes_WorkOrder](@from_lsn, @to_lsn, @row_filter_option) returnendGO Running the Change Data Capture for Specified Interval Package Sample in BI StudioTo run the Change Data Capture for Specified Interval Package Sample in BI Studio, open the Change Data Capture for Specified Interval Package Sample project and execute the package SetupCDCSample.dtsx. It should take several minutes to run. Once the execution environment has been set up, three of the workload tasks are launched to generate DML against the AdventureWorks2008 tables that are being tracked. These workload tasks have built in 10 second delays to guarantee that the load is spread over several minutes, regardless of the hardware that it is run on. Figure 10: Setup Package Performing Initial Load and Workload Generation ConcurrentlyWhile the workload is being applied, a task to generate the database snapshot for AdventureWorks2008 is also launched. After it completes the dataflow tasks are allowed to run against the snapshot database to initially load the target tables in AdventureWorksDW2008. Once the initial load completes, the base time for the initial extraction interval is determined and the master incremental load package is invoked. Each time the master package is invoked, it first verifies that the desired extraction interval is contained within the validity interval for the database. If necessary, it will delay and give the capture process time to process all the changes through the high end-point of the next extraction interval. Once the workload has been applied the Mark Workload Completion task sets package variables to indicate that the workload has completed and to identify a completion time. When the start time of an extraction interval exceeds that time, the loop cycling the master package will terminate.Figure 11: Setup package at Successful Run CompletionThe control flow diagram above shows the setup package at the end of a successful run. ConclusionThe Change Data Capture feature of SQL Server 2008 makes change data available in a relational format. This document describes how SSIS packages can leverage this feature to handle incremental loads to a Data Mart using a sample available on Codeplex as the vehicle to drive the discussion.For more information:SQL Server Web site: SQL Server TechCenter: SQL Server DevCenter: Did this paper help you? Please give us your feedback. Tell us on a scale of 1 (poor) to 5 (excellent), how would you rate this paper and why have you given it this rating? For example:Are you rating it high due to having good examples, excellent screenshots, clear writing, or another reason? Are you rating it low due to poor examples, fuzzy screenshots, unclear writing?This feedback will help us improve the quality of white papers we release. Send feedback. ................
................

In order to avoid copyright disputes, this page is only a partial summary.

Google Online Preview   Download