RECEIVE TOP 1000 Not retrieving 1000 messages

  • Hi All - I've had no luck asking this in Ask SSC, so I thought I'd try here.

    I have a queue with 10,000+ messages. But when I run the stored proc which does a Receive TOP(1000), it is only retrieving 30-60 records at a time? What am I missing here?

    I've tried playing with the TOP(x) and the timeout but it doesn't seem to alter how many records are retrieved. Here's the currenct Receive sql

    WAITFOR (

    RECEIVE TOP(1000)

    queuing_order,

    conversation_handle,

    message_type_name,

    message_body

    FROM [Transfer].[LogMessageQueue]

    INTO @receive_table

    ), TIMEOUT 100000;

  • Is this in Service Broker? That's usually when I see that WAITFOR construct.


    - Craig Farrell

    Never stop learning, even if it hurts. Ego bruises are practically mandatory as you learn unless you've never risked enough to make a mistake.

    For better assistance in answering your questions[/url] | Forum Netiquette
    For index/tuning help, follow these directions.[/url] |Tally Tables[/url]

    Twitter: @AnyWayDBA

  • Yes, the Receive command is only used against Service Broker queues.

  • Just doublechecking. I've seen odd uses of things before and I hate making assumptions, so I tend to ask a lot of clarifying questions. Saves time in the end.

    In your queue of 10,000+ items, are these previously picked up conversations that aren't closed yet, or is this a test queue that's pre-loaded and has never been worked against? Activation thread procedures will only pick up un-communicated/responding messages. If these conversations don't receive a conversation end message at the end, they hang forever. Also the queue doesn't immediately clear out closed conversations either, so this may depend on volume vs. the cleanup structure.

    The first time your activation procedure comes online into a queue of that size with new messages, it should pick up its 1,000 messages. Otherwise it should only get (up to 1000) your new entries. This would be my most likely guess as to why you're not getting what you're expecting without further information.

    Is there anything particular about this queue that might be considered non-default? I may be making some assumptions that are poor here as well if it's set up in an odd way.


    - Craig Farrell

    Never stop learning, even if it hurts. Ego bruises are practically mandatory as you learn unless you've never risked enough to make a mistake.

    For better assistance in answering your questions[/url] | Forum Netiquette
    For index/tuning help, follow these directions.[/url] |Tally Tables[/url]

    Twitter: @AnyWayDBA

  • Evil Kraig F (8/28/2014)


    Just doublechecking. I've seen odd uses of things before and I hate making assumptions, so I tend to ask a lot of clarifying questions. Saves time in the end.

    Yeah, it's sometimes difficult with brief question to gain an understanding of the OPs level of understanding of the technologies they're trying to use.

    In your queue of 10,000+ items, are these previously picked up conversations that aren't closed yet, or is this a test queue that's pre-loaded and has never been worked against? Activation thread procedures will only pick up un-communicated/responding messages. If these conversations don't receive a conversation end message at the end, they hang forever. Also the queue doesn't immediately clear out closed conversations either, so this may depend on volume vs. the cleanup structure.

    Basically, this was in production and I needed to disable the activation because the activated proc had a bug in it (in the business logic side - not message processing side). I was in the process of testing my change by manually running the activated proc (prior to reactivating it) when I noticed that the queue count only decreased by a small amount (not 1000). So I put some debug messages in the proc to output the count on the table vars at certain points to track down why the 1000 messages were being thrown away - only to find it traced back to the original Receive only getting 30-60 messages at a time.

    This is part of a "one way" Service broker logging mechanism. The one thing that would suggest that this wasn't because of un-communicated/responding messages is that when I didn't get a timely response to my AskSSC question, I had to turn the activation back on. I watched the message count in the queue and it counted down to zero after a minute or two which to me indicates that it picked up ALL the messages in the queue in increments.

    The first time your activation procedure comes online into a queue of that size with new messages, it should pick up its 1,000 messages. Otherwise it should only get (up to 1000) your new entries. This would be my most likely guess as to why you're not getting what you're expecting without further information.

    Is there anything particular about this queue that might be considered non-default? I may be making some assumptions that are poor here as well if it's set up in an odd way.

    As above this is not what I saw, it was pretty plain to see that the activated stored proc was grabbing the messages in small numbers (no where near 1000).

    I don't think there is anything particularly non-default about the queue. When I set this up, I read a variety of blogs etc about how to setup "one-way" messaging properly (meaning not fire and forget). I also implemented conversation session re-use as this was recommended for high traffic.

    You're probably going to need the script I think to help further. I'll try to sanitise it and post it up.

  • Here is the script

    IF NOT EXISTS(SELECT * FROM sys.schemas where name = 'ServiceBroker')

    EXEC ('CREATE SCHEMA [ServiceBroker]')

    GO

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[ServiceBroker].[SessionConversations]') AND type in (N'U'))

    DROP TABLE [ServiceBroker].[SessionConversations]

    GO

    CREATE TABLE [ServiceBroker].[SessionConversations] (

    SPID INT NOT NULL

    , FromService SYSNAME NOT NULL

    , ToService SYSNAME NOT NULL

    , OnContract SYSNAME NOT NULL

    , Handle UNIQUEIDENTIFIER NOT NULL

    , PRIMARY KEY (SPID, FromService, ToService, OnContract)

    , UNIQUE (Handle));

    GO

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[ServiceBroker].[UnSentMessages]') AND type in (N'U'))

    DROP TABLE [ServiceBroker].[UnSentMessages]

    GO

    CREATE TABLE [ServiceBroker].[UnSentMessages] ( message_type_name SYSNAME, message_body VARCHAR(MAX) );

    GO

    -- Table to store processing errors.

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[ServiceBroker].[Exceptions]') AND type in (N'U'))

    DROP TABLE [ServiceBroker].[Exceptions];

    GO

    CREATE TABLE [ServiceBroker].[Exceptions] (

    error_conversation UNIQUEIDENTIFIER,

    error_number INT,

    error_message VARCHAR(4000),

    error_severity INT,

    error_state INT,

    error_procedure SYSNAME NULL,

    error_line INT,

    doomed_transaction TINYINT,

    error_time datetime default(getdate())

    )

    GO

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[ServiceBroker].[Send]') AND type in (N'P', N'PC'))

    DROP PROCEDURE [ServiceBroker].[Send]

    GO

    CREATE PROCEDURE [ServiceBroker].[Send] (

    @fromService SYSNAME,

    @toService SYSNAME,

    @onContract SYSNAME,

    @messageType SYSNAME,

    @messageBody VARBINARY(MAX))

    AS

    BEGIN

    SET NOCOUNT ON;

    DECLARE @handle UNIQUEIDENTIFIER;

    DECLARE @counter INT;

    DECLARE @error INT;

    SELECT @counter = 1;

    BEGIN TRANSACTION;

    -- Will need a loop to retry in case the conversation is

    -- in a state that does not allow transmission

    --

    WHILE (1=1)

    BEGIN

    -- Seek an eligible conversation in [ServiceBroker].[SessionConversations]

    --

    SELECT @handle = Handle

    FROM [ServiceBroker].[SessionConversations] WITH (UPDLOCK)

    WHERE SPID = @@SPID

    AND FromService = @fromService

    AND ToService = @toService

    AND OnContract = @OnContract;

    IF @handle IS NULL

    BEGIN

    -- Need to start a new conversation for the current @@spid

    --

    BEGIN DIALOG CONVERSATION @handle

    FROM SERVICE @fromService

    TO SERVICE @toService

    ON CONTRACT @onContract

    WITH ENCRYPTION = OFF;

    -- Set an one hour timer on the conversation

    --

    BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 1200;

    INSERT INTO [ServiceBroker].[SessionConversations]

    (SPID, FromService, ToService, OnContract, Handle)

    VALUES

    (@@SPID, @fromService, @toService, @onContract, @handle);

    END;

    -- Attempt to SEND on the associated conversation

    --

    SEND ON CONVERSATION @handle

    MESSAGE TYPE @messageType

    (@messageBody);

    SELECT @error = @@ERROR;

    IF @error = 0

    BEGIN

    -- Successful send, just exit the loop

    --

    BREAK;

    END

    SELECT @counter = @counter+1;

    IF @counter > 10

    BEGIN

    -- We failed 10 times in a row, something must be broken

    --

    RAISERROR (

    N'Failed to SEND on a conversation for more than 10 times. Error %i.'

    , 16, 1, @error) WITH LOG;

    BREAK;

    END

    -- Delete the associated conversation from the table and try again

    --

    DELETE FROM [ServiceBroker].[SessionConversations]

    WHERE Handle = @handle;

    SELECT @handle = NULL;

    END

    COMMIT;

    END

    GO

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[ServiceBroker].[ResendPending]') AND type in (N'P', N'PC'))

    DROP PROCEDURE [ServiceBroker].[ResendPending]

    GO

    -- Resends all pending messages in sys.transmission_queue

    -- belonging to an old colversation on a new conversation.

    CREATE PROCEDURE [ServiceBroker].[ResendPending] (

    @old_handle UNIQUEIDENTIFIER,

    @initiator_service sysname,

    @target_service sysname,

    @contract sysname)

    AS

    BEGIN

    SET NOCOUNT ON;

    DECLARE @message_type_name SYSNAME;

    DECLARE @message_body VARCHAR(MAX);

    -- Get a new dialog.

    DECLARE @handle UNIQUEIDENTIFIER;

    BEGIN DIALOG CONVERSATION @handle

    FROM SERVICE @initiator_service

    TO SERVICE @target_service

    ON CONTRACT @contract

    WITH ENCRYPTION = OFF;

    -- Declare a cursor to iterate over all the pending messages.

    -- It is important to keep the message order and to keep the original message type.

    DECLARE cursor_pending CURSOR LOCAL FORWARD_ONLY READ_ONLY

    FOR SELECT message_type_name, message_body

    FROM sys.transmission_queue

    WHERE conversation_handle = @old_handle

    ORDER BY message_sequence_number;

    OPEN cursorPending;

    FETCH NEXT FROM cursor_pending INTO @message_type_name, @message_body;

    WHILE (@@FETCH_STATUS = 0)

    BEGIN

    -- Resend the message on the new conversation

    SEND ON CONVERSATION @handle MESSAGE TYPE @message_type_name (@message_body);

    FETCH NEXT FROM cursor_pending INTO @message_type_name, @message_body;

    END

    CLOSE cursor_pending;

    DEALLOCATE cursor_pending;

    -- Signal end of stream to target.

    SEND ON CONVERSATION @handle MESSAGE TYPE [ServiceBroker_EndOfStream];

    END

    GO

    IF EXISTS (SELECT * FROM sys.services WHERE name = N'LogInitiatorService')

    DROP SERVICE [LogInitiatorService]

    GO

    IF EXISTS (SELECT * FROM sys.services WHERE name = N'LogService')

    DROP SERVICE [LogService]

    GO

    IF EXISTS (SELECT * FROM sys.service_queues WHERE name = N'LogMessageInitiatorQueue')

    DROP QUEUE [LogMessageInitiatorQueue]

    GO

    IF EXISTS (SELECT * FROM sys.service_queues WHERE name = N'LogMessageQueue')

    DROP QUEUE [LogMessageQueue]

    GO

    IF EXISTS (SELECT * FROM sys.service_contracts WHERE name = N'LogContract')

    DROP CONTRACT [LogContract]

    GO

    IF EXISTS (SELECT * FROM sys.service_message_types WHERE name = N'LogMessage')

    DROP MESSAGE TYPE [LogMessage]

    GO

    IF EXISTS (SELECT * FROM sys.service_message_types WHERE name = N'ServiceBroker_EndOfStream')

    DROP MESSAGE TYPE [ServiceBroker_EndOfStream]

    GO

    CREATE MESSAGE TYPE

    [LogMessage]

    VALIDATION = NONE;

    --VALIDATION = WELL_FORMED_XML;

    CREATE MESSAGE TYPE

    [ServiceBroker_EndOfStream]

    VALIDATION = NONE;

    --VALIDATION = WELL_FORMED_XML;

    CREATE CONTRACT [LogContract]

    AUTHORIZATION dbo

    ([LogMessage] SENT BY ANY, -- SENT BY INITIATOR,

    [ServiceBroker_EndOfStream] SENT BY ANY -- SENT BY INITIATOR

    );

    GO

    CREATE QUEUE [LogMessageQueue]

    WITH

    STATUS = ON,

    RETENTION = OFF

    GO

    CREATE QUEUE [LogMessageInitiatorQueue]

    WITH

    STATUS = ON,

    RETENTION = OFF;

    GO

    CREATE SERVICE [LogService]

    ON QUEUE [LogMessageQueue]([LogContract]);

    GO

    CREATE SERVICE [LogInitiatorService]

    ON QUEUE [LogMessageInitiatorQueue]([LogContract]);

    GO

    IF EXISTS (SELECT * FROM sys.routes WHERE name = N'RPMTransferLogRoute')

    DROP ROUTE [RPMTransferLogRoute]

    GO

    CREATE ROUTE RPMTransferLogRoute

    WITH

    SERVICE_NAME = 'LogService',

    ADDRESS = 'LOCAL'

    GO

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[LogMessage]') AND type in (N'P', N'PC'))

    DROP PROCEDURE [LogMessage]

    GO

    CREATE PROCEDURE [LogMessage]

    AS

    BEGIN

    SET NOCOUNT ON;

    INSERT INTO tempLog (message) values ('[LogMessage]- Called');

    DECLARE @messageStageTable TABLE (

    JobId int,

    JobTypeName varchar(50),

    EventId int,

    LocalDTS datetime,

    WorkStation varchar(255),

    StatusID int,

    [Message] varchar(max),

    MessageDate datetime,

    XMLMessage xml

    )

    DECLARE @newEvents TABLE (

    JobId int,

    EventId int,

    PRIMARY KEY (JobId, EventId)

    )

    -- Variable table for received messages.

    DECLARE @receive_table TABLE(

    queuing_order BIGINT,

    conversation_handle UNIQUEIDENTIFIER,

    message_type_name SYSNAME,

    message_body VARCHAR(MAX));

    -- Cursor for processing non-data related messages

    DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY

    FOR SELECT

    conversation_handle,

    message_type_name,

    message_body

    FROM @receive_table

    WHERE message_type_name in ('http://schemas.microsoft.com/SQL/ServiceBroker/Error', 'ServiceBroker_EndOfStream')

    ORDER BY queuing_order;

    DECLARE @conversation_handle UNIQUEIDENTIFIER,

    @message_type_name SYSNAME,

    @message_body VARCHAR(MAX);

    -- Count processed messages.

    DECLARE @message_counter BIGINT;

    SET @message_counter = 0;

    -- Error variables.

    DECLARE @error_number INT;

    DECLARE @error_message VARCHAR(4000);

    DECLARE @error_severity INT;

    DECLARE @error_state INT;

    DECLARE @error_procedure SYSNAME;

    DECLARE @error_line INT;

    -- Receive messages for available conversation groups.

    BEGIN TRY

    BEGIN TRANSACTION;

    -- Receive 1000 available messages into the table.

    -- Wait 5 seconds for messages.

    WAITFOR (

    RECEIVE TOP(1000)

    queuing_order,

    conversation_handle,

    message_type_name,

    message_body

    FROM [LogMessageQueue]

    INTO @receive_table

    ), TIMEOUT 5000;

    IF @@ROWCOUNT <> 0

    BEGIN

    -- Process the messages.

    OPEN message_cursor;

    WHILE (1=1)

    BEGIN

    FETCH NEXT FROM message_cursor

    INTO @conversation_handle,

    @message_type_name,

    @message_body;

    IF (@@FETCH_STATUS != 0) BREAK;

    -- Process a message.

    -- If an exception occurs, catch and attempt to recover.

    BEGIN TRY

    IF @message_type_name = 'ServiceBroker_EndOfStream'

    BEGIN

    -- Initiator is signaling end of message stream: end the dialog.

    END CONVERSATION @conversation_handle;

    END

    ELSE IF @message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'

    BEGIN

    -- If the message_type_name indicates that the message is an error,

    -- record the error and end the conversation.

    WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)

    SELECT

    @error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),

    @error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(@conversation_handle, @error_number, @error_message, NULL, NULL, NULL, NULL, 0);

    END CONVERSATION @conversation_handle;

    END

    END TRY

    BEGIN CATCH

    SET @error_number = ERROR_NUMBER();

    SET @error_message = ERROR_MESSAGE();

    SET @error_severity = ERROR_SEVERITY();

    SET @error_state = ERROR_STATE();

    SET @error_procedure = ERROR_PROCEDURE();

    SET @error_line = ERROR_LINE();

    IF XACT_STATE() = -1

    BEGIN

    -- The transaction is doomed. Only rollback possible.

    -- This could disable the queue if done 5 times consecutively!

    ROLLBACK TRANSACTION;

    -- Record the error.

    BEGIN TRANSACTION;

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 1);

    COMMIT;

    -- For this level of error, it is best to exit the proc

    -- and give the queue monitor control.

    -- Breaking to the outer catch will accomplish this.

    RAISERROR ('Message processing error', 16, 1);

    END

    ELSE IF XACT_STATE() = 1

    BEGIN

    -- Record error and continue processing messages.

    -- Failing message could also be put aside for later processing here.

    -- Otherwise it will be discarded.

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 0);

    END

    END CATCH

    END

    CLOSE message_cursor;

    -- Business Logic Removed here

    END

    COMMIT;

    DELETE @receive_table;

    END TRY

    BEGIN CATCH

    -- Process the error and exit the proc to give the queue monitor control

    SET @error_number = ERROR_NUMBER();

    SET @error_message = ERROR_MESSAGE();

    SET @error_severity = ERROR_SEVERITY();

    SET @error_state = ERROR_STATE();

    SET @error_procedure = ERROR_PROCEDURE();

    SET @error_line = ERROR_LINE();

    IF XACT_STATE() = -1

    BEGIN

    -- The transaction is doomed. Only rollback possible.

    -- This could disable the queue if done 5 times consecutively!

    ROLLBACK TRANSACTION;

    -- Record the error.

    BEGIN TRANSACTION;

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 1);

    COMMIT;

    END

    ELSE IF XACT_STATE() = 1

    BEGIN

    -- Record error and commit transaction.

    -- Here you could also save anything else you want before exiting.

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 0);

    COMMIT;

    END

    END CATCH

    END;

    GO

    -- Activated store proc for the initiator to receive messages.

    -- Dialogs are gracefully ended by the target after receiving

    -- an end_of_stream message from the initiator; the end dialog

    -- message is then processed here. This method is recommended

    -- to avoid "fire and forget" message loss. One message per

    -- invocation is OK here for expected low-volume load.

    IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[InitiatorQueueProcessor]') AND type in (N'P', N'PC'))

    DROP PROCEDURE [InitiatorQueueProcessor]

    GO

    CREATE PROCEDURE [InitiatorQueueProcessor]

    AS

    BEGIN

    SET NOCOUNT ON;

    DECLARE @conversation_handle UNIQUEIDENTIFIER,

    @message_type_name SYSNAME,

    @message_body VARCHAR(MAX);

    -- Error variables.

    DECLARE @error_number INT;

    DECLARE @error_message VARCHAR(4000);

    DECLARE @error_severity INT;

    DECLARE @error_state INT;

    DECLARE @error_procedure SYSNAME;

    DECLARE @error_line INT;

    BEGIN TRY

    BEGIN TRANSACTION;

    -- Wait 5 seconds for a message.

    WAITFOR (

    RECEIVE TOP(1)

    @conversation_handle = conversation_handle,

    @message_type_name = message_type_name,

    @message_body = message_body

    FROM [LogMessageInitiatorQueue]), TIMEOUT 5000;

    IF @conversation_handle IS NOT NULL

    BEGIN

    DELETE FROM [ServiceBroker].[SessionConversations]

    WHERE [Handle] = @conversation_handle;

    IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer'

    BEGIN;

    SEND ON CONVERSATION @conversation_handle

    MESSAGE TYPE [EndOfStream];

    END

    IF @message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'

    BEGIN

    -- Target is ending dialog normally.

    END CONVERSATION @conversation_handle;

    END

    ELSE IF @message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'

    BEGIN

    -- Record the error.

    WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb)

    SELECT

    @error_number = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Code)[1]', 'INT'),

    @error_message = CAST(@message_body AS XML).value('(//ssb:Error/ssb:Description)[1]', 'VARCHAR(4000)');

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(@conversation_handle, @error_number,@error_message, NULL, NULL, NULL, NULL, 0);

    -- Can messages be resent?

    IF (@error_number IN (-8489, -8462, -9719, -28052))

    BEGIN

    -- Resend the messages on a new dialog.

    EXEC [ServiceBroker].[ResendPending]

    @conversation_handle,

    [LogInitiatorService],

    [LogService],

    [LogContract]

    END

    ELSE

    BEGIN

    -- Save the messages in a side table to be processed later.

    INSERT INTO [ServiceBroker].[UnSentMessages]

    SELECT message_type_name, message_body

    FROM sys.transmission_queue

    WHERE conversation_handle = @conversation_handle;

    END

    -- End the conversation.

    END CONVERSATION @conversation_handle;

    END

    END

    COMMIT;

    END TRY

    BEGIN CATCH

    SET @error_number = ERROR_NUMBER();

    SET @error_message = ERROR_MESSAGE();

    SET @error_severity = ERROR_SEVERITY();

    SET @error_state = ERROR_STATE();

    SET @error_procedure = ERROR_PROCEDURE();

    SET @error_line = ERROR_LINE();

    IF XACT_STATE() = -1

    BEGIN

    -- The transaction is doomed. Only rollback possible.

    -- Note: 5 consecutive rollbacks will disable the queue!

    ROLLBACK TRANSACTION;

    -- Record the error.

    BEGIN TRANSACTION;

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(NULL, @error_number, @error_message,@error_severity, @error_state, @error_procedure, @error_line, 1);

    COMMIT;

    END

    ELSE IF XACT_STATE() = 1

    BEGIN

    -- Record error and commit transaction.

    INSERT INTO [ServiceBroker].[Exceptions]

    (error_conversation, error_number,error_message, error_severity, error_state, error_procedure,error_line, doomed_transaction)

    VALUES(NULL, @error_number, @error_message,

    @error_severity, @error_state, @error_procedure, @error_line, 0);

    COMMIT;

    END

    END CATCH

    END;

    GO

    ALTER QUEUE [LogMessageQueue]

    WITH STATUS=ON,

    ACTIVATION (

    STATUS=ON,

    PROCEDURE_NAME = [LogMessage],

    MAX_QUEUE_READERS = 1,

    EXECUTE AS SELF) ;

    GO

    ALTER QUEUE [LogMessageInitiatorQueue]

    WITH STATUS=ON,

    ACTIVATION (

    STATUS=ON,

    PROCEDURE_NAME = [InitiatorQueueProcessor],

    MAX_QUEUE_READERS = 1,

    EXECUTE AS SELF) ;

    GO

  • Adam Hardy-416657 (8/28/2014)


    [snip] only to find it traced back to the original Receive only getting 30-60 messages at a time.

    So you're definately not losing them due to communication loss or poison message storage. Good, that means you've nailed it down to the pickup. Excellent troubleshooting.

    The one thing that would suggest that this wasn't because of un-communicated/responding messages is that when I didn't get a timely response to my AskSSC question, I had to turn the activation back on.

    Most folks won't go near troubleshooting Service Broker simply because of the few million moving parts making it tough to do without direct access to the machine. Add to that the majority of people playing with it are typically new to messaging in general and it turns into a mess. AskSSC lends itself to quick, one-shot answers. I'm unfortunately not surprised you didn't get a response there.

    Hopefully even if I can't help you others on this board with more expertise will pop into the conversation with what we clarify and get you to a solution, or at least an understanding.

    I watched the message count in the queue and it counted down to zero after a minute or two which to me indicates that it picked up ALL the messages in the queue in increments.

    That's odd. At first glance, and a skim of the code you posted later (that's going to take me a bit to grok, obviously, in my spare time), I can't see any direct reason you should be getting chunked pickup. A seemingly random question, what's the server and/or database's default isolation level? I'd assume it's read committed but want to confirm, and I didn't see a directly set one in your activation proc above.

    I don't think there is anything particularly non-default about the queue. When I set this up, I read a variety of blogs etc about how to setup "one-way" messaging properly (meaning not fire and forget). I also implemented conversation session re-use as this was recommended for high traffic.

    Nothing particularly strange then. Good.

    You're probably going to need the script I think to help further. I'll try to sanitise it and post it up.

    After skimming down it, I'm going to need to sink my teeth into it before I can intelligently discuss your setup further. Unfortunately, I'm not going to get the chance to do that this evening. I know you're feeling pressure on this, so I'll try to sit down with it in the morning if business doesn't end up with another one of its emergencies and really dig into it.


    - Craig Farrell

    Never stop learning, even if it hurts. Ego bruises are practically mandatory as you learn unless you've never risked enough to make a mistake.

    For better assistance in answering your questions[/url] | Forum Netiquette
    For index/tuning help, follow these directions.[/url] |Tally Tables[/url]

    Twitter: @AnyWayDBA

  • Hi Craig,

    Thanks heaps for the discussion. Don't bust your balls in getting an answer back, currently the messaging is running fine in production, the small batches are not causing a huge issue other than causing more rows being added that are necessary.

    It's mainly, if I see something that doesn't look right, even when things look like they are operating fine, I investigate further rather than leaving it.

    I'm going to take my sanitised script and see if I can't create the same behaviour I'm seeing in production.

  • The answer is deceptively simple. RECEIVE will only pull back up to 1000 messages from a single Conversation from a queue. You can check by doing a

    SELECT conversation_handle, COUNT(*) FROM dbo.mymessagequeue GROUP BY conversation_handle

    (replace "conversation" and "mymessagequeue" with the appropriate names

    From BoL at this link http://msdn.microsoft.com/en-us/library/ms186963.aspx

    All messages that are returned by a RECEIVE statement belong the same conversation group. The RECEIVE statement locks the conversation group for the messages that are returned until the transaction that contains the statement finishes. A RECEIVE statement returns messages that have a status of 1. The result set returned by a RECEIVE statement is implicitly ordered:

    -If messages from multiple conversations meet the WHERE clause conditions, the RECEIVE statement returns all messages from one conversation before it returns messages for any other conversation. The conversations are processed in descending priority level order.

    -For a given conversation, a RECEIVE statement returns messages in ascending message_sequence_number order.

  • I must also smile when I read that bit of code. It seems that Remus's example code is what everyone learns Service Broker on. And it's also the most straightforward of examples.

  • Thanks heaps Venoym,

    When Craig mentioned conversations, I had a gut feeling the answer was going to be something like that, but didn't have enough experience with Service broker to know where to start looking.

    Remus' series was so well written and informative. By pulling his case studies together into a solution I learnt a lot about Service Broker in a short time. 🙂

  • Hi again Venoym,

    Could I get your advice on an appropriate fix?

    The functionality behind this service broker implementation is to log messages occuring from remote sites. Basically the remote sites log information about processing particular jobs back to the controlling server (via a webservice). These messages get stored in the database along with each job. I've tried with this 1000 batching mechanism to reduce the number of message records stored for each job, essentially the messages get concatenated and stored on one record, rather than one message per record.

    But because of this behaviour, the batching it is not having the desired effect.

    I was thinking, If I changed the conversation store, to store the ID of each job and grabbed the conversation for the job, rather than for the SPID, could that be a fix for this situation?

  • Adam Hardy-416657 (9/1/2014)


    Hi again Venoym,

    Could I get your advice on an appropriate fix?

    The functionality behind this service broker implementation is to log messages occuring from remote sites. Basically the remote sites log information about processing particular jobs back to the controlling server (via a webservice). These messages get stored in the database along with each job. I've tried with this 1000 batching mechanism to reduce the number of message records stored for each job, essentially the messages get concatenated and stored on one record, rather than one message per record.

    But because of this behaviour, the batching it is not having the desired effect.

    I was thinking, If I changed the conversation store, to store the ID of each job and grabbed the conversation for the job, rather than for the SPID, could that be a fix for this situation?

    The best way I found to handle service broker is to fire the messages across as soon as you can, then store them in a table on the receive side. That makes it far easier on the send side. On the receive side, store each message in its own row in the table. You can even pair it with the Conversation_handle, but keep in mind that those can be reused, or duplicated at a different date.

    My own usage was an auditing setup for SQL 2005 to 2012 that would record table audits for Insert/Update/Delete. What I would do is to take each row being audited and package it in XML with the Table and row information (each element was a column name). Then on the Audit side, I would shred the XML and insert into a Key-Value table the Column Name, the Pre value and the Post value. By the way, XQuery is tons slower than OPENQUERY when you need to shred unknown XML elements. I was assured that the XML was well formed by adding the "XML" tag to the Messages and Contracts. There are various details of my implementation that changed over time (a separate Pre and post message vs. 1 message that had both) based on business needs.

    A few notes from personal experience on the Remus standard code:

    1. Conversations have a tendency of closing while you are trying to send when using a timeout on the conversation. Do a check to see if the conversation is Open and not about to expire before attempting to use it.

    2. Setup a log table on the Send side, that way if an error happens you can log the message and the error and keep going.

    3. Service Broker exceptions will always terminate any open transaction (make the transaction uncommittable by being a severity 16), so be mindful and attempt to handle errors BEFORE they exception.

  • Thanks Venoym. I admit, I had forgotten that, if I'd ever known it. So, I've learned a good amount here as well. 🙂


    - Craig Farrell

    Never stop learning, even if it hurts. Ego bruises are practically mandatory as you learn unless you've never risked enough to make a mistake.

    For better assistance in answering your questions[/url] | Forum Netiquette
    For index/tuning help, follow these directions.[/url] |Tally Tables[/url]

    Twitter: @AnyWayDBA

  • venoym (9/2/2014)


    By the way, XQuery is tons slower than OPENQUERY when you need to shred unknown XML elements. I was assured that the XML was well formed by adding the "XML" tag to the Messages and Contracts. There are various details of my implementation that changed over time (a separate Pre and post message vs. 1 message that had both) based on business needs.

    I'm assuming you meant OpenXML? as as far as I can see OPENQUERY doesn't shred XML?

Viewing 15 posts - 1 through 15 (of 18 total)

You must be logged in to reply to this topic. Login to reply