Delay in Asynchronous Procedure Calls

  • I would like to call stored procs asynchronously, but there seems to be a delay in service broker so that not all of the procedures finish at around the same time. Actually, the delay is significant at several seconds. I have max readers higher than the amount of procedures I am calling. I have tried using the same conversation, multiple conversations, receiving more than top(1) --- although for high concurrency top(1) should be best --- and using waitfor with a timeout. Here's what it looks like:

    create table AsyncExecResults (

    token uniqueidentifier primary key,

    start_time datetime null,

    finish_time datetime null)

    go

    create queue AsyncExecQueue;

    go

    create service AsyncExecService on queue AsyncExecQueue (DEFAULT);

    GO

    if object_id('usp_AsyncExecActivated') is not null

    drop procedure usp_AsyncExecActivated;

    go

    create procedure usp_AsyncExecActivated

    as

    set nocount on;

    declare @h uniqueidentifier,

    @messageTypeName sysname,

    @messageBody varbinary(max),

    @xmlBody xml,

    @procedureName sysname,

    @startTime datetime,

    @finishTime datetime,

    @token uniqueidentifier;

    waitfor(

    receive top(1)

    @h = conversation_handle,

    @messageTypeName = message_type_name,

    @messageBody = message_body

    from AsyncExecQueue

    ), TIMEOUT 5000

    if (@h is not null)

    begin

    if (@messageTypeName = N'DEFAULT')

    begin

    select @xmlBody = CAST(@messageBody as xml);

    select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');

    select @startTime = CURRENT_TIMESTAMP;

    exec(@procedureName);

    select @finishTime = CURRENT_TIMESTAMP;

    select @token = conversation_id

    from sys.conversation_endpoints

    where conversation_handle = @h;

    insert AsyncExecResults (start_time, finish_time, token)

    values(

    @startTime,

    @finishTime,

    @token);

    end

    else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')

    begin

    end conversation @h;

    end

    end

    go

    alter queue AsyncExecQueue

    with activation (

    procedure_name = usp_AsyncExecActivated

    , max_queue_readers = 25

    , execute as owner

    , status = on

    ), status = on, retention = off;

    go

    if object_id('usp_AsyncExecInvoke') is not null

    drop procedure usp_AsyncExecInvoke;

    go

    create procedure usp_AsyncExecInvoke

    @procedureName sysname,

    @token uniqueidentifier output

    as

    declare @h uniqueidentifier,

    @xmlBody xml

    set nocount on;

    begin dialog conversation @h

    from service AsyncExecService

    to service N'AsyncExecService', 'current database'

    with encryption = off;

    select @xmlBody = (

    select @procedureName as name

    for xml path('procedure'), type);

    send on conversation @h (@xmlBody);

    go

    if object_id('usp_MyLongRunningProcedure') is not null

    drop procedure usp_MyLongRunningProcedure;

    go

    create procedure usp_MyLongRunningProcedure

    as

    begin

    waitfor delay '00:00:10';

    end

    go

    truncate table AsyncExecResults;

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    select * from AsyncExecResults

    order by finish_time;

    Code borrowed heavily from here[/url] for reference.

    I have really taken it down to just the bare-bones on purpose here so that I can test performance. What I would like is for there to be as little delay between the first start-time and last finish-time when results are sorted in ascending order. I have done some research and it seems the issue is that service broker is not constantly scanning the queue and then activating whatever is on it. It waits for a bit and then scans again. However, after I run all of the waitfor delay procedures, they should all enter the queue. So by the time SB checks a second time, they should surely all be there for SB to dispatch activation procedures.

    Thanks for any help!

  • Your async handler routine should not exit after each message: Service broker expects its handlers to continue processing messages for as long as there are more messages available. As you discovered, it only rechecks after 5 seconds to see if the queue has emptied. Only if it hasn't then it starts another handler, for as far as the max_queue_readers setting permits it.

    So the problem you need to fix is first of all in your handler procedure. Try this for example (not tested):

    if object_id('usp_AsyncExecActivated') is not null

    drop procedure usp_AsyncExecActivated;

    go

    create procedure usp_AsyncExecActivated

    as

    set nocount on;

    declare @h uniqueidentifier,

    @messageTypeName sysname,

    @messageBody varbinary(max),

    @xmlBody xml,

    @procedureName sysname,

    @startTime datetime,

    @finishTime datetime,

    @token uniqueidentifier;

    while 1 = 1

    begin

    begin tran;

    waitfor(

    receive top(1)

    @h = conversation_handle,

    @messageTypeName = message_type_name,

    @messageBody = message_body

    from AsyncExecQueue

    ), TIMEOUT 5000;

    if not @@rowcount > 0

    begin

    commit tran;

    break; -- exit while 1 = 1

    end

    if (@messageTypeName = N'DEFAULT')

    begin

    select @xmlBody = CAST(@messageBody as xml);

    select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');

    select @startTime = CURRENT_TIMESTAMP;

    exec(@procedureName);

    select @finishTime = CURRENT_TIMESTAMP;

    select @token = conversation_id

    from sys.conversation_endpoints

    where conversation_handle = @h;

    insert AsyncExecResults (start_time, finish_time, token)

    values(

    @startTime,

    @finishTime,

    @token);

    end

    else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')

    begin

    end conversation @h;

    end

    commit tran;

    end

    go

    This will process your messages sequentially, until for more than 5 seconds the queue could not be emptied by a single instance. If the queue is not empty 5 seconds after the 1st instance got started a 2nd instance will be started, and 2 messages will be processed simultaneously. Then after 10 seconds and the queue is still not empty, a 3rd instance is started and 3 messages are processed at the same time (provided your server has at least 3 cpu's, otherwise you'll just be context switching) and so on, until either the maximum number of max_queue_readers is reached or the queue becomes empty for at least 5 seconds (= the timeout value on your waitfor( receive) statement).

    In practice 25 queue readers is probably way to high.



    Posting Data Etiquette - Jeff Moden[/url]
    Posting Performance Based Questions - Gail Shaw[/url]
    Hidden RBAR - Jeff Moden[/url]
    Cross Tabs and Pivots - Jeff Moden[/url]
    Catch-all queries - Gail Shaw[/url]


    If you don't have time to do it right, when will you have time to do it over?

  • For those trying to duplicate this experiment, for example to learn some Service broker basics from it, here is the same experimental code with documented improvements in it:

    create database t2

    go

    use master

    go

    alter database t2 set enable_broker

    go

    use t2

    go

    create table AsyncExecResults (

    token uniqueidentifier primary key,

    start_time datetime null,

    finish_time datetime null)

    go

    create queue AsyncExecQueue;

    go

    create service AsyncExecService

    authorization dbo

    on queue AsyncExecQueue

    ( [DEFAULT] -- default is NOT a keyword here. i.e. it MUST be

    -- enclosed by square brackets.

    );

    GO

    if object_id('usp_AsyncExecActivated') is not null

    drop procedure dbo.usp_AsyncExecActivated;

    go

    if object_id('usp_AsyncExecActivated') is not null

    drop procedure dbo.usp_AsyncExecActivated;

    go

    alter procedure dbo.usp_AsyncExecActivated

    as

    set nocount on;

    declare @h uniqueidentifier,

    @messageTypeName sysname,

    @messageBody varbinary(max),

    @xmlBody xml,

    @procedureName sysname,

    @startTime datetime,

    @finishTime datetime,

    @token uniqueidentifier;

    while 1 = 1

    begin

    begin tran;

    waitfor(

    receive top(1)

    @h = conversation_handle,

    @messageTypeName = message_type_name,

    @messageBody = message_body

    from AsyncExecQueue

    ), TIMEOUT 5000;

    if not @@rowcount > 0

    begin

    commit tran;

    break; -- exit while 1 = 1

    end

    if (@messageTypeName = N'DEFAULT')

    begin

    select @xmlBody = CAST(@messageBody as xml);

    select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');

    select @startTime = CURRENT_TIMESTAMP;

    exec(@procedureName);

    select @finishTime = CURRENT_TIMESTAMP;

    select @token = conversation_id

    from sys.conversation_endpoints

    where conversation_handle = @h;

    insert AsyncExecResults (start_time, finish_time, token)

    values(

    @startTime,

    @finishTime,

    @token);

    -- The chosen protocol says the conversation ends after the 1st

    -- message is received. So let SSSB know we're done by ending the

    -- conversation after each received message. Not ending the conversation

    -- will leave all conversations open. When you get into the millions

    -- of open conversations (see sys.conversation_endpoints), your server

    -- will go slower and slower.

    end conversation @h;

    end

    else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')

    begin

    end conversation @h;

    end

    -- Don't forget to handle error messages too.

    -- These also indicate the end of a conversation, just like EndDialog.

    else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error')

    begin

    -- Read the xml document that we were sent in the message body.

    declare @nCode int;

    declare @vchDescription nvarchar(2000);

    declare @xmlError xml; -- (document

    select @xmlError = convert(xml, @messageBody);

    with xmlnamespaces( default 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')

    select

    @nCode = Errors.error.value( 'Code[1]', 'int'),

    @vchDescription = Errors.error.value( 'Description[1]', 'nvarchar(max)')

    from @xmlError.nodes( '/Error[1]') as Errors(error);

    -- Any text output by a SSSB queue reader will by default end up in the sql log,

    -- so I just write out the output with nowait to get errors shown in the sql log.

    -- Using "with log" would put the error in the sql log twice: once because of

    -- the with log, plus once more because all output from an auto activated procedure

    -- is written into the log. So don't specify with log here.

    raiserror('Service broker error received. Error %d:%s. (%s)', 0, 0, @nCode, @vchDescription, '%PROC%') with nowait;

    -- An Error message means the conversation has ended, just like with a EndDialog message.

    -- So we tell SSSB we got the message and have it clean up our end of the conversation too.

    end conversation @h;

    end

    commit tran;

    end

    go

    alter queue dbo.AsyncExecQueue

    with activation (

    procedure_name = usp_AsyncExecActivated

    , max_queue_readers = 25

    , execute as owner

    , status = on

    ), status = on, retention = off;

    go

    if object_id('usp_AsyncExecInvoke') is not null

    drop procedure dbo.usp_AsyncExecInvoke;

    go

    alter procedure dbo.usp_AsyncExecInvoke

    @procedureName sysname,

    @token uniqueidentifier output

    as

    declare @h uniqueidentifier,

    @xmlBody xml

    set nocount on;

    begin dialog conversation @h

    from service AsyncExecService

    to service N'AsyncExecService'--, 'current database'

    with encryption = off;

    select @xmlBody = (

    select @procedureName as name

    for xml path('procedure'), type);

    send on conversation @h (@xmlBody);

    go

    if object_id('usp_MyLongRunningProcedure') is not null

    drop procedure dbo.usp_MyLongRunningProcedure;

    go

    create procedure dbo.usp_MyLongRunningProcedure

    as

    begin

    waitfor delay '00:00:10';

    end

    go

    truncate table AsyncExecResults;

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    declare @token uniqueidentifier;

    exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;

    go

    select * from AsyncExecResults

    order by start_time, finish_time;



    Posting Data Etiquette - Jeff Moden[/url]
    Posting Performance Based Questions - Gail Shaw[/url]
    Hidden RBAR - Jeff Moden[/url]
    Cross Tabs and Pivots - Jeff Moden[/url]
    Catch-all queries - Gail Shaw[/url]


    If you don't have time to do it right, when will you have time to do it over?

Viewing 3 posts - 1 through 2 (of 2 total)

You must be logged in to reply to this topic. Login to reply