UDA parallelism issue

  • I have this simple UDA:

    using System;

    using System.Data;

    using System.Data.SqlClient;

    using System.Data.SqlTypes;

    using Microsoft.SqlServer.Server;

    using System.IO;

    using System.Text;

    [Serializable]

    [Microsoft.SqlServer.Server.SqlUserDefinedAggregate(

    Format.UserDefined,

    MaxByteSize = 8000

    )

    ]

    public struct MY_AVG : IBinarySerialize

    {

    private int m_uCount;

    private SqlDecimal m_uAvg;

    private SqlDecimal m_uValue;

    public void Init()

    {

    m_uCount = 0;

    m_uValue = 0;

    }

    public void Accumulate(SqlDecimal Value)

    {

    if (!Value.IsNull)

    {

    ++m_uCount; //number of values

    // m_uValue = (SqlDecimal)Value;

    m_uValue += Value;

    //m_uValue += (SqlDecimal)Value;

    }

    }

    public void Merge(MY_AVG Group)

    {

    this.m_uValue = Group.m_uValue;

    this.m_uCount = Group.m_uCount;

    }

    public SqlString Terminate()

    {

    return m_uAvg.ToSqlString();

    }

    public void Read(BinaryReader r)

    {

    m_uAvg = r.ReadDecimal();

    //precision 1 means m_sMedian = NULL

    if (m_uAvg.Precision == 1)

    {

    m_uAvg = new SqlDecimal();

    }

    }

    public void Write(BinaryWriter w)

    {

    if (m_uCount != 0)

    {

    //m_uAvg = SqlDecimal.Divide(this.m_uValue, this.m_uCount);

    m_uAvg = this.m_uValue / this.m_uCount;

    w.Write((Decimal)this.m_uAvg);

    }

    }

    }

    I execute the following query:

    select dbo.MY_AVG(F585850) FROM T585879_1900_1

    which returns NULL. The input table doesn't have NULL values in F585850 column. This query will use Merge method to gather all streams and produce the final result.

    If I use the MAXDOP hint the result returned is correct (200001)

    select dbo.MY_AVG(F585850) FROM T585879_1900_1 OPTION (MAXDOP 1)

    Obviously when using MAXDOP, the uda's Merge method is not called. Now, I set some breakpoints in the Merge method and I could notice that the m_uValue of the Group object is always NULL. The code is very simple.

    Any idea how can i fix this (without maxdop).

    Any suggestion would be appreciated.

  • IIRC, the Merge call is usually preceded by one or more Writes and Reads, and what I can't help but notice in your code is this possible sequence:

    Accumulate: sets Count, Value, but neither uses nor sets Avg

    Write: writes Avg, using Count, Value, but not using Avg

    Read: reads & sets Avg, but not Count or Value

    Merge: copies in Count, Value, but not Avg...

    At this point all previous Avg has been lost...

    and because only Avg was initially saved in Write, Count & Value have been lost also...

    In other words, during transitions, you are writing to Avg, but then you never use it along with any new Counts & Values again.

    [font="Times New Roman"]-- RBarryYoung[/font], [font="Times New Roman"] (302)375-0451[/font] blog: MovingSQL.com, Twitter: @RBarryYoung[font="Arial Black"]
    Proactive Performance Solutions, Inc.
    [/font]
    [font="Verdana"] "Performance is our middle name."[/font]

  • Barry,

    Thanks for your input.

    I managed to change the UDA based on your suggestions ant it appears to be working. The code looks like this now (basically m_uAvg is only calculated in the Terminate method, while m_uCount and m_uValue are handled in both Write and Read methods:

    using System;

    using System.Data;

    using System.Data.SqlClient;

    using System.Data.SqlTypes;

    using Microsoft.SqlServer.Server;

    using System.IO;

    using System.Text;

    [Serializable]

    [Microsoft.SqlServer.Server.SqlUserDefinedAggregate(

    Format.UserDefined,

    MaxByteSize = 8000

    )

    ]

    public struct MY_AVG : IBinarySerialize

    {

    private int m_uCount;

    private SqlDecimal m_uAvg;

    private SqlDecimal m_uValue;

    public void Init()

    {

    m_uCount = 0;

    m_uValue = 0;

    }

    public void Accumulate(SqlDecimal Value)

    {

    if (!Value.IsNull)

    {

    ++m_uCount; //number of values

    m_uValue += Value;

    }

    }

    public void Merge(MY_AVG Group)

    {

    this.m_uValue += Group.m_uValue;

    this.m_uCount += Group.m_uCount;

    }

    public SqlString Terminate()

    {

    m_uAvg = SqlDecimal.Divide(this.m_uValue, this.m_uCount);

    //precision 1 means m_sMedian = NULL

    if (m_uAvg.Precision == 1)

    {

    m_uAvg = new SqlDecimal();

    }

    return m_uAvg.ToSqlString();

    }

    public void Read(BinaryReader r)

    {

    m_uValue = r.ReadDecimal();

    m_uCount = r.ReadInt32();

    }

    public void Write(BinaryWriter w)

    {

    if (m_uCount != 0)

    {

    w.Write((Decimal)this.m_uValue);

    w.Write((Int32)this.m_uCount);

    }

    }

    }

  • Great Hrubaru, I'm glad it worked out! 🙂

    [font="Times New Roman"]-- RBarryYoung[/font], [font="Times New Roman"] (302)375-0451[/font] blog: MovingSQL.com, Twitter: @RBarryYoung[font="Arial Black"]
    Proactive Performance Solutions, Inc.
    [/font]
    [font="Verdana"] "Performance is our middle name."[/font]

  • There are still a few bugs to be worked out in that code:

  • The Write method does not write anything if the aggregate is empty. The Read method will still try to read from the stream in this case. You should always serialize the state of the aggregate, even if it has processed no rows. As written, the Read method will throw an exception when trying to de-serialize an empty aggregate.
  • You should return NULL if no rows are aggregated to be consistent with the built-in aggregates. This means returning SqlDecimal.Null - the existing code seems to check the precision for some reason and returns an empty object (which is not the same thing at all).
  • It seems odd to return a String as the result of aggregating Decimals - should it not return a Decimal?
  • The Merge method should be coded to cope with receiving a NULL partial aggregate.
  • Paul

  • Paul,

    Thanks for your observations. Let me try to answer them one by one:

    1)The Write method does not write anything if the aggregate is empty. The Read method will still try to read from the stream in this case. You should always serialize the state of the aggregate, even if it has processed no rows. As written, the Read method will throw an exception when trying to de-serialize an empty aggregate.

    Actually I checked something like

    create table my(col1 int)

    insert into my values(null)

    select dbo.MY_AVG(col1) AS F586117 FROM my

    The Read method will not throw any error (m_Count will be 0). However, you are correct, there is a bug, in the sense that a Divide by zero error is thrown. An extra check for m_uCount ==0 in the Terminate() was added.

    2)You should return NULL if no rows are aggregated to be consistent with the built-in aggregates. This means returning SqlDecimal.Null - the existing code seems to check the precision for some reason and returns an empty object (which is not the same thing at all).

    If no rows are aggregated, then the Accumulate method will not do anything (Value is NULL), and the m_uCount and m_uValue will remain 0 (as initialized in the Init method). The result off the UDA will be NULL (see 1)

    3)It seems odd to return a String as the result of aggregating Decimals - should it not return a Decimal?

    Actually one reason for returning a string is that when I deploy it, VS will by default put numeric(18,0) as a return value (if I use decimal). The main reason is that I do not want to hard code a NUMERIC(p,s) in the return value. I just want it to return a string (max precision and scale as it comes from C#, which is 28-29) and then CAST it to whatever I need in Sql Server.

    Specifically I do not want something like:

    Create aggregate dbo.SYNYGY_AVG(@Value numeric(24,10))

    RETURNS [numeric](p,s)

    external name [SYNYGY_AVG].[SYNYGY_AVG]

    4)The Merge method should be coded to cope with receiving a NULL partial aggregate.

    I believe in this case 1) will apply.

    Ionut

  • Ionut,

    You misread my post. I am saying that your aggregate will throw an exception in the Read method if no rows have been aggregated. This is different from aggregating one row that contains NULL.

    Before I address your other comments, one by one, please run your aggregate in place of the standard AVG, in the following:

    DECLARE @T

    TABLE (decimal_value DECIMAL(9,4) NOT NULL);

    INSERT @T (decimal_value) VALUES (94.67);

    INSERT @T (decimal_value) VALUES (45.62);

    INSERT @T (decimal_value) VALUES (77.49);

    INSERT @T (decimal_value) VALUES (36.61);

    SELECT AVG(decimal_value)

    FROM @T

    WHERE decimal_value <= 30;

    Notice that no rows from table variable @T match the predicate.

    Paul

  • Paul,

    Based on your initial feedback, I tried two scenarios:

    1)input table with only NULL values

    2)input table with no rows.

    Both failed, but the error did not occur in the Read(), but instead in the Terminate() (divide by 0).

    I changed the Terminate method so that divide by zero error does not occur.

    Now, both scenarios return NULL.

    I tried also the code you sent, and it works fine now (NULL is returned)

    This is the UDA code:

    using System;

    using System.Data;

    using System.Data.SqlClient;

    using System.Data.SqlTypes;

    using Microsoft.SqlServer.Server;

    using System.IO;

    using System.Text;

    [Serializable]

    [Microsoft.SqlServer.Server.SqlUserDefinedAggregate(

    Format.UserDefined,

    MaxByteSize = 8000

    )

    ]

    public struct MY_AVG : IBinarySerialize

    {

    private int m_uCount;

    private SqlDecimal m_uAvg;

    private SqlDecimal m_uValue;

    public void Init()

    {

    m_uCount = 0;

    m_uValue = 0;

    }

    public void Accumulate(SqlDecimal Value)

    {

    if (!Value.IsNull)

    {

    ++m_uCount; //number of values

    m_uValue += Value;

    }

    }

    public void Merge(SYNYGY_AVG Group)

    {

    this.m_uValue += Group.m_uValue;

    this.m_uCount += Group.m_uCount;

    }

    public SqlString Terminate()

    {

    if (this.m_uCount == 0)

    {

    m_uAvg = new SqlDecimal();

    }

    else

    {

    m_uAvg = SqlDecimal.Divide(this.m_uValue, this.m_uCount);

    }

    return m_uAvg.ToSqlString();

    }

    public void Read(BinaryReader r)

    {

    m_uValue = r.ReadDecimal();

    m_uCount = r.ReadInt32();

    }

    public void Write(BinaryWriter w)

    {

    if (m_uCount != 0)

    {

    w.Write((Decimal)this.m_uValue);

    w.Write((Int32)this.m_uCount);

    }

    }

    }

    Ionut

  • Ionut,

    I have ignored point 1 for the moment, since your response was based on an incorrect assumption.

    Your original comments in black. My response in blue...

    If no rows are aggregated, then the Accumulate method will not do anything (Value is NULL), and the m_uCount and m_uValue will remain 0 (as initialized in the Init method). The result off the UDA will be NULL.

    No. If no rows are aggregated, the Accumulate method is never called. You must code to cope with that. The proper way to return a database NULL from C# is to return the static SqlDecimal.Null or SqlString.Null - not an empty object or C# null reference.

    Actually one reason for returning a string is that when I deploy it, VS will by default put numeric(18,0) as a return value (if I use decimal). The main reason is that I do not want to hard code a NUMERIC(p,s) in the return value. I just want it to return a string (max precision and scale as it comes from C#, which is 28-29) and then CAST it to whatever I need in Sql Server.

    How is the parameter to the aggregate defined in the CREATE AGGREGATE statement? You're not aggregating string representations of numbers are you?

    Paul

  • Ionut Hrubaru


    Both failed, but the error did not occur in the Read(), but instead in the Terminate() (divide by 0). I changed the Terminate method so that divide by zero error does not occur.

    Now, both scenarios return NULL. I tried also the code you sent, and it works fine now (NULL is returned)

    I will have to try that - it surprises me. Nevertheless, do you agree with the following statements:

    1. You cannot control when SQL Server calls the Read(BinaryReader r) or Write(BinaryWriter w) methods.

    2. If Write is called when m_uCount == 0, nothing is written to the BinaryWriter stream.

    3. If Read is called when m_uCount == 0, you will try to read m_uValue and m_uCount from the stream.

    4. The stream you are trying to read from, is the one written in step 2.

    5. You wrote nothing to that stream.

    6. Reading past the end of a stream will throw an exception.

    7. The fact that SQL Server seems not to call Read or Write, if it knows no values were aggregated, is an implementation detail in SQL Server that you have no control over.

    8. Defensive coding is a good idea.

    ...?

    Paul

  • What you say makes perfect sense.

    What I was saying is simply that I couldn't find your scenario to fail as you pointed out. But I do agree that it's better to be safe and code in such a way that you minimize the possibility for future issues.

    Now, about point 7 (The fact that SQL Server seems not to call Read or Write, if it knows no values were aggregated, is an implementation detail in SQL Server that you have no control over), my logic tells me that the Write method is called, nothing is written to the stream, at which point I believe Read method is not called anymore (since nothing was written). Could I be correct in my assumption?

    IOnut

  • Ionut Hrubaru (3/9/2010)


    My logic tells me that the Write method is called, nothing is written to the stream, at which point I believe Read method is not called anymore (since nothing was written). Could I be correct in my assumption?

    The point is that you have no control of when or if SQL Server calls Read or Write - so it makes sense to write the code to cope with that unknown factor. In fact, it involves writing less code, not more...see my code below (just a quick edit of yours).

    using System;

    using System.Data;

    using System.Data.SqlClient;

    using System.Data.SqlTypes;

    using Microsoft.SqlServer.Server;

    using System.IO;

    using System.Text;

    [Serializable]

    [Microsoft.SqlServer.Server.SqlUserDefinedAggregate

    (

    Format.UserDefined, // User-defined serialization

    MaxByteSize = 8000, // Maximum serialized size

    IsInvariantToDuplicates = false, // Duplicate values affect the result

    IsInvariantToNulls = true, // NULLs do not affect the result

    IsInvariantToOrder = true, // Order does not affect the result

    IsNullIfEmpty = true, // Returns NULL if no values aggregated

    Name = "MY_AVG" // Aggregate name

    )

    ]

    public struct MY_AVG : IBinarySerialize

    {

    private long count; // Number of items aggregated

    private decimal sum; // Running total of items aggregated

    public void Init()

    {

    // Reset private fields

    this.count = 0L;

    this.sum = 0M;

    }

    public void Accumulate(SqlDecimal Param)

    {

    // Ignore NULLs

    if (!Param.IsNull)

    {

    // Add one to count

    count++;

    // Add this value to the running sum

    sum += Param.Value;

    }

    }

    public void Merge(MY_AVG PartialResult)

    {

    // Received a partial result from another instance of this struct

    // (Parallelism or partial aggregates in the query plan)

    if (PartialResult.count > 0L)

    {

    // Add count from the partial result to this

    this.count += PartialResult.count;

    // Add sum from the partial result to this

    this.sum += PartialResult.sum;

    }

    }

    public SqlString Terminate()

    {

    if (this.count > 0L)

    {

    try

    {

    // Compute the average by dividing the sum by the count

    return Convert.ToString(Decimal.Divide(this.sum, this.count));

    }

    catch (DivideByZeroException)

    {

    throw;

    }

    catch (OverflowException)

    {

    throw;

    }

    }

    else

    {

    // Return a NULL

    return SqlString.Null;

    }

    }

    // Called by SQL Server to serialize this instance

    public void Write(BinaryWriter w)

    {

    w.Write(this.count);

    w.Write(this.sum);

    }

    // Called by SQL Server to de-serialize this instance

    public void Read(BinaryReader r)

    {

    this.count = r.ReadInt64();

    this.sum = r.ReadDecimal();

    }

    }

    Paul

    edit: comments added

  • Looks cleaner 🙂

    Thanks for all the feedback Paul.

    Ionut

  • Oh good, I was hoping Paul would take a look at this Ionut, he knows a lot more about this than I do. 🙂

    [font="Times New Roman"]-- RBarryYoung[/font], [font="Times New Roman"] (302)375-0451[/font] blog: MovingSQL.com, Twitter: @RBarryYoung[font="Arial Black"]
    Proactive Performance Solutions, Inc.
    [/font]
    [font="Verdana"] "Performance is our middle name."[/font]

  • Viewing 14 posts - 1 through 13 (of 13 total)

    You must be logged in to reply to this topic. Login to reply