Technical Article

PivotByDate clr stored procedure

,

This clr stored procedure will pivot any temp table by date.

The temp table must contain 1 to n NVarchar,Varchar or char columns, which the output will be grouped by.

The temp table must contain 1 to N numeric (not amy type char or any type of date) columns.

The temp table must contain exactly one column of date type. Probably any date type would work but I've only tested it with datetime.

Just include the .cs file in your existing clr library and deploy.

I haven't included a project because  I figure most people will just include it in their library.

To run:

exec PivotByDate '#temp'

Note the quotes around the table name.

Enjoy!

//------------------------------------------------------------------------------
// <copyright file="CSSqlStoredProcedure.cs" company="Microsoft">
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
//------------------------------------------------------------------------------
using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.Collections.Generic;
using System.Collections;

public partial class StoredProcedures
{
    public class ReverseDateComparer : IComparer<DateTime>
    {
        public int Compare(DateTime x, DateTime y)
        {
            return -1 * DateTime.Compare(x, y);
        }
    }

    [Microsoft.SqlServer.Server.SqlProcedure]
    public static void PivotByDate(SqlString tempTableName)
    {
        const string fmt = "{0} {1:MM/dd/yyyy}";
        var pivotDictionary = new Dictionary<String, SortedDictionary<DateTime, Object[]>>();
        var sqlMetaData = new List<SqlMetaData>();
        var stringFields = new List<String>();
        var numericFields = new List<String>();
        var typesOfNumericFields = new List<Type>();
        var distinctDates = new List<DateTime>();
        string firstRowName = null;
        if ((tempTableName.Value == null))
            throw new Exception("Must specify a temporary table name to use PivotByDate." + Environment.NewLine + "The table must contain one column of DateTime type, N columns of non-numeric type and N columns of numeric type. The numeric columns will be pivoted by the date in the datetime column.");
        using (SqlConnection conn = new SqlConnection("context connection=true"))
        {
            conn.Open();
            SqlCommand selectCmd = new SqlCommand();

            selectCmd.CommandText = "Select * from " + tempTableName.Value;

            selectCmd.Connection = conn;
            using (var dataReader = selectCmd.ExecuteReader())
            {
                object[] readValues;
                object[] numericValues;
                int? numericFieldCount = null;
                DataTable schemaTable = dataReader.GetSchemaTable();
                if (schemaTable == null)
                    throw new Exception("Cannot retrieve scema table!");
                foreach (DataRow row in schemaTable.Rows)
                    if ((Type)row["datatype"] == typeof(string))
                        stringFields.Add(row["ColumnName"].ToString());
                    else if ((Type)row["datatype"] != typeof(DateTime))
                    {
                        numericFields.Add(row["ColumnName"].ToString());
                        typesOfNumericFields.Add((Type)row["datatype"]);
                    }
                while (dataReader.Read())
                {
                    readValues = new object[dataReader.FieldCount];
                    dataReader.GetValues(readValues);
                    numericFieldCount = NumericFieldCount(numericFieldCount, readValues);
                    if (!numericFieldCount.HasValue)
                        throw new Exception("No Numeric fields to pivot by!");
                    numericValues = new object[numericFieldCount.Value];
                    int lengthOfReadValues = readValues.Length;
                    var pair = GetDictionaryKeysAndNumericFields(lengthOfReadValues, readValues, numericValues);
                    if (firstRowName == null)
                        firstRowName = pair.Key;
                    if (!pivotDictionary.ContainsKey(pair.Key))
                        pivotDictionary.Add(pair.Key, new SortedDictionary<DateTime, object[]>());
                    if (!pivotDictionary[pair.Key].ContainsKey(pair.Value))
                    {
                        AddDateToDictionary(pivotDictionary, pair, numericValues, distinctDates);
                    }
                    else
                    {
                        UpdateDictionaryForDate(numericValues, pivotDictionary, pair, typesOfNumericFields);
                    }
                }

            }
            conn.Close();
        }

        AddStringFieldsToMetaData(stringFields, sqlMetaData);
        AddDateFieldsToMetaData(distinctDates, firstRowName, numericFields, sqlMetaData, fmt);
        PipeResultsToOutput(sqlMetaData, pivotDictionary, typesOfNumericFields, distinctDates);
        Cleanup(pivotDictionary, sqlMetaData, stringFields, numericFields, typesOfNumericFields);
    }

    private static void PipeResultsToOutput(List<SqlMetaData> sqlMetaData, Dictionary<string, SortedDictionary<DateTime, object[]>> pivotDictionary, List<Type> typesOfNumericFields, List<DateTime> distinctDates)
    {
        SqlPipe pipe = SqlContext.Pipe;
        SqlDataRecord rec = new SqlDataRecord(sqlMetaData.ToArray());
        pipe.SendResultsStart(rec);
        foreach (var v in pivotDictionary)
        {
            string[] s = v.Key.Split('~');
            int fieldcounter = s.Length;
            for (int k = 0; k < fieldcounter; k++)
                rec.SetValue(k, s[k]);
            foreach (var q in v.Value)
            {
                if (distinctDates.Contains(q.Key))
                {
                    int len = q.Value.Length;
                    for (int r = 0; r < len; r++) //each (var x in q.Value)
                    {
                        object x = q.Value[r];
                        if (Convert.ToInt64(x) == 0)
                            rec.SetValue(fieldcounter, string.Empty);
                        else
                            rec.SetValue(fieldcounter,
                                         typesOfNumericFields[r] == typeof(Int64)
                                             ? Convert.ToInt64(x).ToString("0")
                                             : Convert.ToDecimal(x).ToString("0.00"));
                        fieldcounter++;
                    }
                }
                else
                {
                    int len = typesOfNumericFields.Count;
                    for (int r = 0; r < len; r++) //each (var x in q.Value)
                    {
                        rec.SetValue(fieldcounter, string.Empty);
                        fieldcounter++;
                    }
                }
            }
            pipe.SendResultsRow(rec);
        }
        pipe.SendResultsEnd();
    }

    private static void UpdateDictionaryForDate(object[] numericValues, Dictionary<string, SortedDictionary<DateTime, object[]>> pivotDictionary, KeyValuePair<string, DateTime> pair,
                                                List<Type> typesOfNumericFields)
    {
        for (int fieldCounter = 0; fieldCounter < numericValues.Length; fieldCounter++)
        {
            object[] ra = pivotDictionary[pair.Key][pair.Value];
            if (typesOfNumericFields[fieldCounter] == typeof(decimal))
            {
                ra[fieldCounter] =
                    Decimal.Add(((decimal)pivotDictionary[pair.Key][pair.Value][fieldCounter]),
                                ((decimal)numericValues[fieldCounter]));
            }
            else
                ra[fieldCounter] = (Int64)pivotDictionary[pair.Key][pair.Value][fieldCounter] +
                                   ((Int64)numericValues[fieldCounter]);
        }
    }

    private static void AddDateToDictionary(Dictionary<string, SortedDictionary<DateTime, object[]>> pivotDictionary, KeyValuePair<string, DateTime> pair, object[] numericValues,
                                            List<DateTime> distinctDates)
    {
        pivotDictionary[pair.Key].Add(pair.Value, numericValues);
        if (!distinctDates.Contains(pair.Value))
            distinctDates.Add(pair.Value);
    }

    private static KeyValuePair<string, DateTime> GetDictionaryKeysAndNumericFields(int lengthOfReadValues, object[] readValues, object[] numericValues)
    {
        DateTime dateTimeKey = DateTime.MinValue;
        int numericFieldIndex = 0;
        string dictionaryKey = string.Empty;
        Type type;
        for (int fieldCounter = 0; fieldCounter < lengthOfReadValues; fieldCounter++)
        {
            type = readValues[fieldCounter].GetType();
            if (type == typeof(string))
                dictionaryKey += readValues[fieldCounter] + "~";
            else if (type == typeof(DateTime))
                dateTimeKey = (DateTime)readValues[fieldCounter];
            else
            {
                numericValues[numericFieldIndex] = readValues[fieldCounter];
                numericFieldIndex++;
            }
        }
        var result = new KeyValuePair<string, DateTime>(dictionaryKey.Substring(0, dictionaryKey.Length - 1), dateTimeKey);

        return result;
    }

    private static int? NumericFieldCount(int? numericFieldCount, object[] readValues)
    {
        if (numericFieldCount == null)
        {
            numericFieldCount = 0;
            foreach (object singleValue in readValues)
                if ((singleValue.GetType() != typeof(string) && (singleValue.GetType() != typeof(DateTime))))
                    numericFieldCount++;
        }
        return numericFieldCount;
    }

    private static void Cleanup(Dictionary<string, SortedDictionary<DateTime, object[]>> pivotDictionary, List<SqlMetaData> sqlMetaData, List<string> stringFields, List<string> numericFields,
                                List<Type> typesOfNumericFields)
    {
        foreach (var v in pivotDictionary)
            v.Value.Clear();
        pivotDictionary.Clear();
        sqlMetaData.Clear();
        stringFields.Clear();
        numericFields.Clear();
        typesOfNumericFields.Clear();
    }

    private static void AddDateFieldsToMetaData(List<DateTime> distinctDates, string firstRowName, List<string> numericFields,
                                                List<SqlMetaData> sqlMetaData, string fmt)
    {
        distinctDates.Sort();
        int dfMax = numericFields.Count;
        foreach (var dd in distinctDates)
        {
            for (int j = 0; j < dfMax; j++)
            {
                sqlMetaData.Add(new SqlMetaData(string.Format(fmt, numericFields[j], dd), SqlDbType.NVarChar, 40));
            }
        }
    }

    private static void AddStringFieldsToMetaData(List<string> stringFields, List<SqlMetaData> sqlMetaData)
    {
        foreach (var stringField in stringFields)
        {
            sqlMetaData.Add(new SqlMetaData(stringField, SqlDbType.NVarChar, -1));
        }
    }
}

Rate

You rated this post out of 5. Change rating

Share

Share

Rate

You rated this post out of 5. Change rating