SQLServerCentral Article

Service Broker Error Handling

,

Below I have outlined a scenario that happened to one of my Service Brokers in production. I have Service Broker set up to process bulk data. To put this simply, I drop data into a heap table, tied together with a session identifier. Then I write this session identifier into the necessary Service Broker queue and let the Service Broker deal with the data transfer from heap table to production.

Imagine my surprise when I found the queue had disabled itself automatically overnight, and would not start up again!

The problem was with one particular set of data that had a conversion issue from varchar to int. SQL Server does not like casting, for example, changing the string "10,000" to an integer (notice the comma separator). However the error recorded in the SQL error log was not what I expected. What I saw was:

The activated proc '[MySchema].[uspMyQueueProcessingProc]' running on queue 'DBName.MySchema.SomeQueue' output the following:  'The current transaction cannot be committed and cannot support operations that write to the log file. Roll back the transaction.'

What could be causing this?  I was sure that I handled errors correctly, and any messages that could not be processed would remove themselves from the queue. This was not so, although I did follow Microsoft's Service Broker example, I found the way I was handling transactions to be incorrect when an error occurred.

The pattern I followed was to have an outer stored procedure (uspMyQueueProcessingProc) that deals with main transaction and reading from the Service Broker queue.  This in turn calls an inner stored procedure (uspProcessSession) to process the data.

Old Pattern

The previous transaction handling pattern was:

1) Call the outer stored procedure uspMyQueueProcessingProc

2) Begin a transaction

3) Read Message from Service Broker queue

4) Get the session identifier from the XML message

5) Call the inner stored uspProcessSession with the session identifier

      a) Bookmark the current transaction (SavePoint1)

      b) Do any necessary SQL statements

      c) If an error occurs check XACT_STATE.  If not equal to -1 (i.e. the transaction is still committable) rollback to SavePoint1

      d) Re-raise the error

6) Catch the error thrown from uspProcessSession and then:

      a) If the message is one we recognise then commit the transaction if transaction count is > 0.

      b) If we do not recognise the message then rollback the transaction if transaction count is > 0.

So far so good, but what happens when SQL Server decides that uspProcessSession requires a full rollback due to an error regardless of the save point? This is exactly where my problem was, SQL Server was behaving correctly but I was not handling the situation correctly as we now have an uncommitable transaction (i.e. there is a clue in the aforementioned error).  This means I can no longer call commit in 6a above but I still need to get the message off the queue.

At the point of failure, i.e. 5c above what happens when the transaction is uncommitable?  I have not handled this scenario in the inner stored proc.

Below is the original (anonymised) code that follows the steps outlined above.  Pay attention to the second COMMIT TRANSACTION in uspMyQueueProcessingProc, and the error handling block in uspProcessSession as these will be updated later.

Original Stored Procedures

The Outer Stored Proc

-- 1) The outer stored proc
CREATE PROCEDURE [Data].[uspProcessScrapingSessionQueue]
AS
BEGIN
  SET NOCOUNT ON;
  
  DECLARE
    @ConversationHandle uniqueidentifier = NULL,
    @MessageBody        varbinary(MAX),
    @MessageTypeName    nvarchar(256),
    @Xml                xml, -- the queue is set up to receive well formed XML <sessions><sessionidentifier>1</sessionidentifier></sessions>
    @MessageRecognised  bit;
    BEGIN TRANSACTION -- 2)
  
    BEGIN TRY 
      -- 3) Read message from the queue
      WAITFOR(
        RECEIVE TOP(1)
          @MessageTypeName    = message_type_name,
          @MessageBody        = message_body,
          @ConversationHandle = conversation_handle
        FROM
    MySchema.SomeQueue
        ), TIMEOUT 3000;
    -- 4) Get the session identifier
    IF (@MessageTypeName = '//domain.com/services/MyMessage')
    BEGIN
      SET @MessageRecognised = 1;
      SET @Xml = CAST(@MessageBody AS xml);
      
      DECLARE @SessionIdentifier int;
      SELECT @SessionIdentifier = @Xml.value('(/Sessions/SessionIdentifier)[1]', 'int');
      
    -- 5) Call the inner stored procedure
      EXEC MySchema.uspProcessSession
        @SessionIdentifier = @SessionIdentifier;
    END
    ELSE IF (@MessageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error' OR
      @MessageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
    BEGIN
        -- this is from the MS documentation for service broker!
      END CONVERSATION @ConversationHandle
    END;
        
    COMMIT TRANSACTION;     
  END TRY
  BEGIN CATCH
    -- 6) Catch the error
    DECLARE
      @XACT_STATE int = XACT_STATE();
      
    IF (@MessageRecognised = 1)
    BEGIN
      -- 6a) This is a message I recognise        
      IF (@@TRANCOUNT > 0)
      BEGIN
        COMMIT TRANSACTION;
      END;
    END
    ELSE BEGIN
      -- 6b) This is a message I do not recognise   
      IF (@XACT_STATE = -1 OR @XACT_STATE = 1)
      BEGIN
        IF (@@TRANCOUNT > 0)
        BEGIN
          ROLLBACK TRANSACTION;
        END;
      END;
    END;
  END CATCH;
END;

The Inner Stored Proc

-- 5) The inner stored proc
CREATE PROCEDURE [MySchema].[uspProcessSession]
(
  @SessionIdentifier  int
)
AS
BEGIN
  SET NOCOUNT ON;
  
  BEGIN TRY
    -- 5a) Bookmark the transaction
    SAVE TRANSACTION SavePoint1;
    
    -- 5b) Do necessary SQL statements
  END TRY
  BEGIN CATCH
    DECLARE
      @ErrorSeverity  int,
      @ErrorState   int,
      @ErrorMessage nvarchar(2048),
      @XACT_STATE   int;
    SELECT
      @ErrorSeverity  = ERROR_SEVERITY(),
      @ErrorState   = ERROR_STATE(),
      @ErrorMessage = ERROR_MESSAGE(),
      @XACT_STATE   = XACT_STATE();
-- 5c) Check XACT_STATE()
    IF (@XACT_STATE <> -1)
      ROLLBACK TRAN SavePoint1;
-- 5d) Re-raise the error
    RAISERROR (
      @ErrorMessage,
      @ErrorSeverity,
      @ErrorState ) WITH LOG;     
  END CATCH;
END;

Although the above approach appeared seemed to work most of the time and ironically I've used this pattern in the past with no problems, it was doomed when an uncommitable transaction came about.

I thought I would share my solution with the SQL Community. Firstly to help anyone out there who is new to Service Broker, and secondly to see if this is the correct approach. Constructive criticism is welcome.

New Pattern

Here is the new pattern for my process.

1) Call the outer stored procedure uspMyQueueProcessingProc

2) Begin a transaction

3) Read Message from Service Broker queue

4) Get the session identifier from the XML message

5) Call the inner stored uspProcessSession with the session identifier

      a) Bookmark the current transaction (SavePoint1)

       b) Do any necessary SQL statements

       c) If an error occurs check XACT_STATE.  

       d) If equal to -1 the transaction is doomed so it must be rolled back completely **

       e) If equal to 1 the transaction is commitable, so we can happily rollback back to the SavePoint1

       f) Re-raise the error

6) Catch the error thrown from uspProcessSession and then:

       a) If the message is one we recognise then commit the transaction if transaction count is > 0.  If transaction count is 0 end the conversation **

       b) If we do not recognise the message then rollback the transaction if transaction count is > 0.

Note: 6a - this will get the poisoned message off the queue either through a commit, or ending the conversation.

The modified stored procedures are given below following the new steps above

Modified Stored Procedures

The Outer Stored Proc

      -- 1) The outer stored proc
      CREATE PROCEDURE [MySchema].[uspMyQueueProcessingProc]
      AS
      BEGIN
        SET NOCOUNT ON;
        
        DECLARE
          @ConversationHandle uniqueidentifier = NULL,
          @MessageBody    varbinary(MAX),
          @MessageTypeName  nvarchar(256),
          @Xml        xml, -- the queue is set up to receive well formed XML <sessions><sessionidentifier>1</sessionidentifier>lt;/sessions>
          @MessageRecognised  bit;
          
        -- 2) Begin the transacstion
        BEGIN TRANSACTION
        
        BEGIN TRY
          -- 3) Read message from the queue
          WAITFOR(
            RECEIVE TOP(1)
            @MessageTypeName  = message_type_name,
            @MessageBody    = message_body,
            @ConversationHandle = conversation_handle
          FROM
            MySchema.SomeQueue
          ), TIMEOUT 3000;
      -- 4) Get the session identifier
          IF (@MessageTypeName = '//domain.com/services/MyMessage')
          BEGIN
            SET @MessageRecognised = 1;
            SET @Xml = CAST(@MessageBody AS xml);
            
            DECLARE @SessionIdentifier int;
            SELECT @SessionIdentifier = @Xml.value('(/Sessions/SessionIdentifier)[1]', 'int');
            
        -- 5) Call the inner stored procedure
            EXEC MySchema.uspProcessSession
              @SessionIdentifier = @SessionIdentifier;
          END
          ELSE IF (@MessageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error' OR
      @MessageTypeName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
      BEGIN
            -- this is from the MS documentation for service broker!
            END CONVERSATION @ConversationHandle
          END;
              
          IF (@@TRANCOUNT > 0)
      BEGIN
            COMMIT TRANSACTION;     
          END;
        END TRY
        BEGIN CATCH 
          DECLARE
            @XACT_STATE int = XACT_STATE();
            
          IF (@MessageRecognised = 1)
          BEGIN             
       -- 6a) Commit otherwise end the conversation to get the message off the queue **
            IF (@@TRANCOUNT > 0)
            BEGIN
              COMMIT TRANSACTION;
            END
            ELSE BEGIN
              END CONVERSATION @ConversationHandle WITH ERROR = 127 DESCRIPTION = 'Unable to process message'
            END;
          END
          ELSE BEGIN
       -- 6b) rollback
            IF (@XACT_STATE = -1 OR @XACT_STATE = 1)
            BEGIN
              IF (@@TRANCOUNT > 0)
              BEGIN
                ROLLBACK TRANSACTION;
              END;
            END;
          END;
        END CATCH;
      END;
    

The Inner Stored Proc

      CREATE PROCEDURE [MySchema].[uspProcessSession]
      (
        @SessionIdentifier  int
      )
      AS
      BEGIN
        SET NOCOUNT ON;
        
        BEGIN TRY
          -- 5a) Bookmark the transaction
          SAVE TRANSACTION SavePoint1;
      -- 5b) Do any necessary SQL statements
        END TRY
        BEGIN CATCH
          DECLARE
            @ErrorSeverity  int,
            @ErrorState   int,
            @ErrorMessage nvarchar(2048),
            @XACT_STATE   int;
          SELECT
            @ErrorSeverity  = ERROR_SEVERITY(),
            @ErrorState   = ERROR_STATE(),
            @ErrorMessage = ERROR_MESSAGE(),
            @XACT_STATE   = XACT_STATE();
      -- 5c) Check XACT_STATE() 
          IF (@XACT_STATE = -1)
       -- 5d) Transaction is doomed **
            ROLLBACK TRAN;
          IF (@XACT_STATE = 1)
       -- 5e) Transaction is commited to rollback to the save point
            ROLLBACK TRANSACTION SavePoint1; 
      -- 5f) Re-raise the error.
          RAISERROR (
            @ErrorMessage,
            @ErrorSeverity,
            @ErrorState ) WITH LOG;     
        END CATCH;
      END;
    

In development I simulated a poison message that gave the original error.  Then implemented the new pattern above.  The message was gracefully handled and removed from the queue. I also added a table to record failed session identifiers so I can analyse the problems that caused them to fail.

And so far nothing has been stuck on the queue!

Kind Regards, Paul M.

Rate

4.75 (4)

You rated this post out of 5. Change rating

Share

Share

Rate

4.75 (4)

You rated this post out of 5. Change rating