SQL Clone
SQLServerCentral is supported by Redgate
 
Log in  ::  Register  ::  Not logged in
 
 
 


Delay in Asynchronous Procedure Calls


Delay in Asynchronous Procedure Calls

Author
Message
blueazul
blueazul
SSC-Enthusiastic
SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)SSC-Enthusiastic (177 reputation)

Group: General Forum Members
Points: 177 Visits: 319
I would like to call stored procs asynchronously, but there seems to be a delay in service broker so that not all of the procedures finish at around the same time. Actually, the delay is significant at several seconds. I have max readers higher than the amount of procedures I am calling. I have tried using the same conversation, multiple conversations, receiving more than top(1) --- although for high concurrency top(1) should be best --- and using waitfor with a timeout. Here's what it looks like:




create table AsyncExecResults (
token uniqueidentifier primary key,
start_time datetime null,
finish_time datetime null)
go

create queue AsyncExecQueue;
go

create service AsyncExecService on queue AsyncExecQueue (DEFAULT);
GO

if object_id('usp_AsyncExecActivated') is not null
drop procedure usp_AsyncExecActivated;
go

create procedure usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;

waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000

if (@h is not null)
begin
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');

select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;

select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;

insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
end

else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
end
go

alter queue AsyncExecQueue
with activation (
procedure_name = usp_AsyncExecActivated
, max_queue_readers = 25
, execute as owner
, status = on
), status = on, retention = off;
go

if object_id('usp_AsyncExecInvoke') is not null
drop procedure usp_AsyncExecInvoke;
go

create procedure usp_AsyncExecInvoke
@procedureName sysname,
@token uniqueidentifier output
as
declare @h uniqueidentifier,
@xmlBody xml
set nocount on;

begin dialog conversation @h
from service AsyncExecService
to service N'AsyncExecService', 'current database'
with encryption = off;

select @xmlBody = (
select @procedureName as name
for xml path('procedure'), type);

send on conversation @h (@xmlBody);

go

if object_id('usp_MyLongRunningProcedure') is not null
drop procedure usp_MyLongRunningProcedure;
go

create procedure usp_MyLongRunningProcedure
as
begin
waitfor delay '00:00:10';
end
go

truncate table AsyncExecResults;

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

select * from AsyncExecResults
order by finish_time;




Code borrowed heavily from here for reference.
I have really taken it down to just the bare-bones on purpose here so that I can test performance. What I would like is for there to be as little delay between the first start-time and last finish-time when results are sorted in ascending order. I have done some research and it seems the issue is that service broker is not constantly scanning the queue and then activating whatever is on it. It waits for a bit and then scans again. However, after I run all of the waitfor delay procedures, they should all enter the queue. So by the time SB checks a second time, they should surely all be there for SB to dispatch activation procedures.

Thanks for any help!
R.P.Rozema
R.P.Rozema
UDP Broadcaster
UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)

Group: General Forum Members
Points: 1455 Visits: 1701
Your async handler routine should not exit after each message: Service broker expects its handlers to continue processing messages for as long as there are more messages available. As you discovered, it only rechecks after 5 seconds to see if the queue has emptied. Only if it hasn't then it starts another handler, for as far as the max_queue_readers setting permits it.

So the problem you need to fix is first of all in your handler procedure. Try this for example (not tested):
if object_id('usp_AsyncExecActivated') is not null
drop procedure usp_AsyncExecActivated;
go

create procedure usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;

while 1 = 1
begin

begin tran;

waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000;

if not @@rowcount > 0
begin
commit tran;
break; -- exit while 1 = 1
end

if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');

select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;

select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;

insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
end

else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
commit tran;
end
go


This will process your messages sequentially, until for more than 5 seconds the queue could not be emptied by a single instance. If the queue is not empty 5 seconds after the 1st instance got started a 2nd instance will be started, and 2 messages will be processed simultaneously. Then after 10 seconds and the queue is still not empty, a 3rd instance is started and 3 messages are processed at the same time (provided your server has at least 3 cpu's, otherwise you'll just be context switching) and so on, until either the maximum number of max_queue_readers is reached or the queue becomes empty for at least 5 seconds (= the timeout value on your waitfor( receive) statement).
In practice 25 queue readers is probably way to high.



Posting Data Etiquette - Jeff Moden
Posting Performance Based Questions - Gail Shaw
Hidden RBAR - Jeff Moden
Cross Tabs and Pivots - Jeff Moden
Catch-all queries - Gail Shaw


If you don't have time to do it right, when will you have time to do it over?
R.P.Rozema
R.P.Rozema
UDP Broadcaster
UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)UDP Broadcaster (1.5K reputation)

Group: General Forum Members
Points: 1455 Visits: 1701
For those trying to duplicate this experiment, for example to learn some Service broker basics from it, here is the same experimental code with documented improvements in it:

create database t2
go

use master
go

alter database t2 set enable_broker
go

use t2
go


create table AsyncExecResults (
token uniqueidentifier primary key,
start_time datetime null,
finish_time datetime null)
go

create queue AsyncExecQueue;
go

create service AsyncExecService
authorization dbo
on queue AsyncExecQueue
( [DEFAULT] -- default is NOT a keyword here. i.e. it MUST be
-- enclosed by square brackets.
);
GO

if object_id('usp_AsyncExecActivated') is not null
drop procedure dbo.usp_AsyncExecActivated;
go


if object_id('usp_AsyncExecActivated') is not null
drop procedure dbo.usp_AsyncExecActivated;
go

alter procedure dbo.usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;

while 1 = 1
begin

begin tran;

waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000;

if not @@rowcount > 0
begin
commit tran;
break; -- exit while 1 = 1
end

if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');

select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;

select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;

insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);

-- The chosen protocol says the conversation ends after the 1st
-- message is received. So let SSSB know we're done by ending the
-- conversation after each received message. Not ending the conversation
-- will leave all conversations open. When you get into the millions
-- of open conversations (see sys.conversation_endpoints), your server
-- will go slower and slower.
end conversation @h;
end

else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end

-- Don't forget to handle error messages too.
-- These also indicate the end of a conversation, just like EndDialog.
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
begin
-- Read the xml document that we were sent in the message body.
declare @nCode int;
declare @vchDescription nvarchar(2000);
declare @xmlError xml; -- (document

select @xmlError = convert(xml, @messageBody);

with xmlnamespaces( default 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
select
@nCode = Errors.error.value( 'Code[1]', 'int'),
@vchDescription = Errors.error.value( 'Description[1]', 'nvarchar(max)')
from @xmlError.nodes( '/Error[1]') as Errors(error);

-- Any text output by a SSSB queue reader will by default end up in the sql log,
-- so I just write out the output with nowait to get errors shown in the sql log.
-- Using "with log" would put the error in the sql log twice: once because of
-- the with log, plus once more because all output from an auto activated procedure
-- is written into the log. So don't specify with log here.
raiserror('Service broker error received. Error %d:%s. (%s)', 0, 0, @nCode, @vchDescription, '%PROC%') with nowait;

-- An Error message means the conversation has ended, just like with a EndDialog message.
-- So we tell SSSB we got the message and have it clean up our end of the conversation too.
end conversation @h;
end
commit tran;
end
go

alter queue dbo.AsyncExecQueue
with activation (
procedure_name = usp_AsyncExecActivated
, max_queue_readers = 25
, execute as owner
, status = on
), status = on, retention = off;
go

if object_id('usp_AsyncExecInvoke') is not null
drop procedure dbo.usp_AsyncExecInvoke;
go

alter procedure dbo.usp_AsyncExecInvoke
@procedureName sysname,
@token uniqueidentifier output
as
declare @h uniqueidentifier,
@xmlBody xml
set nocount on;

begin dialog conversation @h
from service AsyncExecService
to service N'AsyncExecService'--, 'current database'
with encryption = off;

select @xmlBody = (
select @procedureName as name
for xml path('procedure'), type);

send on conversation @h (@xmlBody);

go

if object_id('usp_MyLongRunningProcedure') is not null
drop procedure dbo.usp_MyLongRunningProcedure;
go

create procedure dbo.usp_MyLongRunningProcedure
as
begin
waitfor delay '00:00:10';
end
go

truncate table AsyncExecResults;

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go

select * from AsyncExecResults
order by start_time, finish_time;





Posting Data Etiquette - Jeff Moden
Posting Performance Based Questions - Gail Shaw
Hidden RBAR - Jeff Moden
Cross Tabs and Pivots - Jeff Moden
Catch-all queries - Gail Shaw


If you don't have time to do it right, when will you have time to do it over?
Go


Permissions

You can't post new topics.
You can't post topic replies.
You can't post new polls.
You can't post replies to polls.
You can't edit your own topics.
You can't delete your own topics.
You can't edit other topics.
You can't delete other topics.
You can't edit your own posts.
You can't edit other posts.
You can't delete your own posts.
You can't delete other posts.
You can't post events.
You can't edit your own events.
You can't edit other events.
You can't delete your own events.
You can't delete other events.
You can't send private messages.
You can't send emails.
You can read topics.
You can't vote in polls.
You can't upload attachments.
You can download attachments.
You can't post HTML code.
You can't edit HTML code.
You can't post IFCode.
You can't post JavaScript.
You can post emoticons.
You can't post or upload images.

Select a forum

































































































































































SQLServerCentral


Search