Bulk data load using OLEDB Template Libraries
OLEDB provides a way to access data in Component Object Model (COM)
environment. OLEDB template library wraps around OLEDB interfaces and simplifies
the data access. Broadly, this template library can be classified into two
categories:
- Provider Templates
- Consumer Templates
Provider is a component which exposes a set of COM interfaces for a data
store, if you are writing a provider, you should be using the provider
templates. Your data store access program consumes the provider interfaces;
hence it is called the consumer.
The sample of this article is a consumer, which consumes OLEDB interfaces that
are exposed by the SQLOLEDB provider. The SQLOLEDB provider is a native provide
for Microsoft SQL Server and shipped along with SQL Server as well as with MDAC
libraries. The sample uses both OLEDB template library and direct OLEDB
interfaces because some of the SQLOLEDB provider specific interfaces are not
abstracted in the OLEDB template library.
Scenario
The scenario that is considered for this article is, to bulk load data from a
pre-defined data source to a dynamic data schema. The pre-defined data are
hardwired in the sample.
In the given scenario where you do not know the target table name to bulk load
data until the table name is entered by the user, you need to use
CDynamicAccessor template. This template enables you to access data when you do
not know the table schema during the compile time. Similarly to bulk load data,
SQLOLEDB provider specific IRowsetFastLoad interface is used.
Initialization
Before calling any OLEDB interface, the COM layer has to be initialized. To
initialize the COM layer call CoInitializeEx() method as shown below:
…………………………………………………………….
#if _WIN32_WINNT >= 0x0400
HRESULT hRes = CoInitializeEx(NULL, COINIT_MULTITHREADED);
#else
HRESULT hRes = CoInitialize(NULL);
#endif
…………………………………………………………….
Opening the Database Connection
Before opening a database connection, you need to set the connection
properties like, instance name, database name, user name and password. The
following code snippet uses windows authentication.
…………………………………………………………….
CDBPropSet dbconnection;
CDataSource m_cds;
dbconnection.AddProperty(DBPROP_AUTH_INTEGRATED, OLESTR(“SSP1”);
dbconnection.AddProperty(DBPROP_INIT_DATASOURCE,OLESTR(“instance name”)
dbconnection.AddProperty(DBPROP_INIT_CATALOG,”db name”);
hr = m_cds.Open(_T("SQLOLEDB.1"), &dbconnection);
…………………………………………………………….
Opening the Session
Before opening the session, initialize the session for the bulk load
operation. To set the session property for the connection, get the property
interface by querying the connection. Please note, the connection was opened by
using OLEDB template, to set the bulk load session property for that connection,
you need to use OLEDB interface directly.
…………………………………………………………….
CComPtr<IDBProperties> m_spProps;
hr = m_cds.m_spInit->QueryInterface(IID_IDBProperties, (void**)&m_spProps);
CDBPropSet m_dbinitProps;
m_dbinitProps.SetGUID(DBPROPSET_SQLSERVERDATASOURCE);
m_dbinitProps.AddProperty(SSPROP_ENABLEFASTLOAD, true);
hr = m_spProps->SetProperties(1, &m_dbinitProps);
…………………………………………………………….
You can set the following properties to enable the bulk load operation for a
session
SSPROP_FASTLOADKEEPIDENTITY, to preserve the identify information and
SSPROP_FASTLOADKEEPNULLS, to preserve the null value.
Once the connection’s session property is set, open the session as shown in the
following code snippet:
…………………………………………………………….
hr = m_session.Open(m_cds);
…………………………………………………………….
Bulk Load Interface
Bulk load interface is associated with a table; in this scenario the target
table comes from the user input. To obtain a bulk load (or fast load interface)
query the session’s rowset property as shown below:
……………………………………………………………
#define DBINITCONSTANTS
…….
DBID tid;
tid.uName.pwszName=<table name>;
tid.eKind = DBKIND_NAME;
CComPtr<IRowsetFastLoad> m_pIFastLoad;
m_session.m_spOpenRowset->OpenRowset(NULL,& tid, NULL, IID_IRowsetFastLoad,0,
NULL, (LPUNKNOWN*) &m_pIFastLoad);
…………………………………………………………….
The interface IRowsetFastLoad is SQLOLEDB provider specific and it is defined in
sqloledb.h file. In order to use this interface, define DBINITCONSTANTS macro in
your consumer application; otherwise you will get a compilation error.
Binding Columns and Getting Accessor
The IRowsetFastLoad interface exposes table schema in a tabular form and the
consumer which consumes this interface can get the schema information and bind
to that schema.
What is binding?
Binding is a data structure used by the consumer to communicate data to the
provider. So each binding contains the information about the consumer buffer,
like data length, precision etc.
Since, in this case the table is not known until runtime, we need to use dynamic
binding method that is implemented in CDynamicAccessor template. If you look
into atldbcli.h, you will find the following method:
…………………………………………………………….
BindColumns(IUnknown* pUnk)
…………………………………………………………….
This method can take an unknown interface, in this case IRowsetFastLoad. >From
the IRowsetFastLoad interface, this method gets the column information and binds
each column of a given table.
This method binds BLOB and string columns and calls CAccessorBase::Bind() method
to bind the rest of the column types. After binding, this method allocates the
buffer for CAccessorBase::m_pBuffer.
At last, BindColumns() method calls CAccessorBase::BindEntries() method to
create the accessor.
So, what is an accessor?
Accessor is a data structure; it contains the layout of data row for a data
store. For each column an accessor contains binding, in other words accessor is
a collection of bindings.
Laying out Data
Binding of each column comprises three parts which are value, length and
status. Consumer need to set all three parts for each column. Value part
represents the actual data value, length part represents the data length and the
status part must be one of the following:
DBSTATUS_S_OK: The value is not null and the provider has to use consumer’s
value
DBSTATUS_S_ISNULL: The value is NULL
DBSTATUS_S_IGNORE: The provider ignores the value and length and does not change
the column value
DBSTATUS_S_DEFAULT: The provider has to use the default value from
IRowsetFastLoad:: InsertRow() method and ignores the consumer data buffer value
and length.
To layout data effectively, the CDynamicAccessor provides set of methods as
shown below:
…………………………………………………………….
CDynamicAccessor cda;
…..
cda.SetLength(colOrdinal,colLength);
cda.SetStatus(colOrdinal, DBSTATUS_S_OK);
cda.SetValue(colOrdinal,colValue);
…………………………………………………………….
The CDynamicAccessor::SetLength() method takes column ordinal and the respective
data length.
What is happening inside the SetLength() method?
SetLength() method changes the column ordinal into an index of the column
info array m_pColumnInfo by calling CDynamicAccessor:: TranslateColumnNo()
method. The column info array is a member of CDynamicAccessor. So when you pass
first column, whose ordinal value set to ‘1’, gets translated into 0. After this
translation, SetLength() method calls CDynamicAccessor ::_GetDataPtr() method to
get the buffer pointer and copies the data length.
What is happening inside the SetStatus()method?
Like, SetLength() this method also translates the column ordinal into an
index of the column info array and gets the buffer pointer by calling
CDynamicAccessor ::_GetDataPtr() to copy the status.
Why SetValue() cannot be used to copy the data?
To copy the actual data value, Microsoft recommends not to use SetValue()
method because CDynamicAccessor::SetValue() does not handle the string value
correctly. For more information read the KB article 201390 from http://support.microsoft.com/?id=201390.
As MS KB 201390 suggested, I changed the code as follows:
…………………………………………………………….
cda.TranslateColumnNo(colOrdinal);
void *pDataBuffer=(void*)cda._GetDataPtr(colOrdinal);
memcpy(pDataBuffer,colValue,colLength); …………………………………………………………….
So, before laying out the value, you need to translate the column number into
column info array index by calling the CDynamicAccessor:: TranslateColumnNo()
method explictly. After this translation call CDynamicAccessor:: _GetDataPtr()
method to get the buffer and then directly copy the data into that buffer.
Inserting data row
After you laid out data for each column, call the IRowsetFastLoad:: InsertRow()
method as shown below:
…………………………………………………………….
m_pIFastLoad->InsertRow(cda.GetHAccessor(0),cda.GetBuffer());
hr=m_pIFastLoad->Commit(FALSE);
…………………………………………………………….
Note, the InsertRow() method calls the CDynamicAccessor::GetHAccessor() method
to get the accessor handle and CDynamicAccessor::GetBuffer() to get the buffer.
To do the batch insert, call IRowsetFastLoad::Commi()by passing FALSE parameter.
If the value true is passed the batch of inserted or loaded data will be
committed. So to commit the loaded data, call the commit method as shown below:
…………………………………………………………….
hr=m_pIFastLoad->Commit(TRUE);
…………………………………………………………….
Un-Initialization
Finally release the accessor handle and close the accessor, session and
connection objects by calling the respective close() method as shown below:
…………………………………………………………….
cda.Close();
m_pIFastLoad.Release();
m_session.Close();
m_cds.Close();
CoUninitialize();
…………………………………………………………….
Sample
To build the sample successfully, you need to include MDAC 2.7, oledb include
and lib folders in your project settings. Here is the sample code:
--------------------------include
file----------------------------------------------------------
#ifndef _FASTLOAD_H_
#define _FASTLOAD_H_
#include <iostream>
//#define UNICODE
#define _WIN32_DCOM//For CoInitializeEx
#define DBINITCONSTANTS //For CLSIDToProgID
//#define INITGUID
#include <atldbcli.h>
#include <sqloledb.h>
using namespace std;
#include <string>
class CFastLoad:public CComModule
{
public:
CFastLoad()
{}
void InitFastLoadATL();
void UnInitFastLoadATL();
HRESULT OpenDataSource(const string& instance,
const string& database,
const string& tablename);
BOOL BindData(void* colValue,int colLength,ULONG colOrdinal);
HRESULT InsertData();
HRESULT FinishLoad();
CDataSource m_cds;
CSession m_session;
DBID m_TableId;
CComPtr<IRowsetFastLoad> m_pIFastLoad;
CDynamicAccessor cda;
BYTE *pvData;
};
#endif
--------------Source File--------------------------------------
#include "fastload.h"
OLECHAR* ConvertToWCHAR(char *psz)
{
if(!psz)
return NULL;
LONG bstrLen=NULL;
bstrLen = MultiByteToWideChar(CP_ACP, 0, psz, -1, NULL, 0);
WCHAR* pwszBuffer = (WCHAR*)CoTaskMemAlloc((bstrLen+1)*sizeof(WCHAR));
MultiByteToWideChar(CP_ACP, 0, psz, -1, pwszBuffer, bstrLen+1);
return pwszBuffer;
}
void CFastLoad::InitFastLoadATL()
{
#if _WIN32_WINNT >= 0x0400
HRESULT hRes = CoInitializeEx(NULL, COINIT_MULTITHREADED);
#else
HRESULT hRes = CoInitialize(NULL);
#endif
}
void CFastLoad::UnInitFastLoadATL()
{
cda.Close();
m_pIFastLoad.Release();
m_session.Close();
m_cds.Close();
CoUninitialize();
}
HRESULT CFastLoad::OpenDataSource(const string& instance,
const string& database,
const string& tablename)
{
CDBPropSet dbconnection;
HRESULT hr;
OLECHAR* w_instance=ConvertToWCHAR((char*)instance.data()); ;
OLECHAR* w_database=ConvertToWCHAR((char*)database.data());;
dbconnection.SetGUID(DBPROPSET_DBINIT);
dbconnection.AddProperty(DBPROP_AUTH_INTEGRATED, OLESTR("SSPI"));
dbconnection.AddProperty(DBPROP_INIT_DATASOURCE,w_instance);
dbconnection.AddProperty(DBPROP_INIT_CATALOG,w_database);
hr = m_cds.Open(_T("SQLOLEDB.1"), &dbconnection);
if(FAILED(hr))
{
printf("Unable to get the connection");
return hr;
}
CoTaskMemFree(w_instance);
CoTaskMemFree(w_database);
// Enable Fast Load
CComPtr<IDBProperties> m_spProps;
hr = m_cds.m_spInit->QueryInterface(IID_IDBProperties, (void**)&m_spProps);
if (FAILED(hr)) return hr;
CDBPropSet m_dbinitProps;
m_dbinitProps.SetGUID(DBPROPSET_SQLSERVERDATASOURCE);
m_dbinitProps.AddProperty(SSPROP_ENABLEFASTLOAD, true);
hr = m_spProps->SetProperties(1, &m_dbinitProps);
if (FAILED(hr)) return hr;
hr = m_session.Open(m_cds);
if (FAILED(hr)) return hr;
// Open the table
m_TableId.uName.pwszName=ConvertToWCHAR((char*)tablename.data());
m_TableId.eKind = DBKIND_NAME;
hr = m_session.m_spOpenRowset->OpenRowset(NULL,&m_TableId,NULL,
IID_IRowsetFastLoad,
0, NULL,
(LPUNKNOWN*) &m_pIFastLoad);
if (FAILED(hr)) return hr;
hr=cda.BindColumns(m_pIFastLoad);
if (FAILED(hr)) return hr;
return hr;
}
BOOL CFastLoad::BindData(void* colValue,int colLength,ULONG colOrdinal)
{
if(!cda.SetLength(colOrdinal,colLength)) return FALSE;
if(colLength == 0 && colValue == NULL) /* blanks & NULLs need to distinguished
*/
{
if(!cda.SetStatus(colOrdinal,3)) return FALSE;
}
else
{
if(!cda.SetStatus(colOrdinal,0)) return FALSE;
}
// if(!cda.SetValue(colOrdinal,colValue))return FALSE;
cda.TranslateColumnNo(colOrdinal);
void *pDataBuffer=(void*)cda._GetDataPtr(colOrdinal);
memcpy(pDataBuffer,colValue,colLength);
return TRUE;
}
HRESULT CFastLoad::InsertData()
{
HRESULT hr;
if (m_pIFastLoad != NULL)
{
hr = m_pIFastLoad->InsertRow(cda.GetHAccessor(0),//cda.m_pAccessorInfo->hAccessor,
cda.GetBuffer());
if (FAILED(hr)) return hr;
hr=m_pIFastLoad->Commit(FALSE);
if (FAILED(hr)) return hr;
}
return hr;
}
HRESULT CFastLoad::FinishLoad()
{
HRESULT hr;
hr=m_pIFastLoad->Commit(TRUE);
if (FAILED(hr)) return hr;
hr=cda.ReleaseAccessors(m_pIFastLoad);
return hr;
}
void main(int argc, char* argv[])
{
string database;
string instance;
string tablename;
while(*argv)
{
string token = *argv;
if(token=="-d")
{
argv++;
database=*argv;
}
else if(token=="-s")
{
argv++;
instance =*argv;
}
else if(token=="-t")
{
argv++;
tablename =*argv;
}
else if(token=="-h")
{
printf("Enter your options: For example:\n"
"fastload.exe -s <instance name> -d <dbname> -t <table name>\n");
exit(0);
}
argv++;
}
CFastLoad cfl;
cfl.InitFastLoadATL();
cfl.OpenDataSource(instance,database,tablename);
int x=20;
cfl.BindData(&x,4,1);
cfl.BindData("Hello World",11,2);
cfl.InsertData();
x=30;
cfl.BindData(&x,4,1);
cfl.BindData("Hello OLEDB",11,2);
cfl.InsertData();
cfl.FinishLoad();
cfl.UnInitFastLoadATL();
}