February 23, 2012 at 8:57 pm
I would like to call stored procs asynchronously, but there seems to be a delay in service broker so that not all of the procedures finish at around the same time. Actually, the delay is significant at several seconds. I have max readers higher than the amount of procedures I am calling. I have tried using the same conversation, multiple conversations, receiving more than top(1) --- although for high concurrency top(1) should be best --- and using waitfor with a timeout. Here's what it looks like:
create table AsyncExecResults (
token uniqueidentifier primary key,
start_time datetime null,
finish_time datetime null)
go
create queue AsyncExecQueue;
go
create service AsyncExecService on queue AsyncExecQueue (DEFAULT);
GO
if object_id('usp_AsyncExecActivated') is not null
drop procedure usp_AsyncExecActivated;
go
create procedure usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;
waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000
if (@h is not null)
begin
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;
select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
end
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
end
go
alter queue AsyncExecQueue
with activation (
procedure_name = usp_AsyncExecActivated
, max_queue_readers = 25
, execute as owner
, status = on
), status = on, retention = off;
go
if object_id('usp_AsyncExecInvoke') is not null
drop procedure usp_AsyncExecInvoke;
go
create procedure usp_AsyncExecInvoke
@procedureName sysname,
@token uniqueidentifier output
as
declare @h uniqueidentifier,
@xmlBody xml
set nocount on;
begin dialog conversation @h
from service AsyncExecService
to service N'AsyncExecService', 'current database'
with encryption = off;
select @xmlBody = (
select @procedureName as name
for xml path('procedure'), type);
send on conversation @h (@xmlBody);
go
if object_id('usp_MyLongRunningProcedure') is not null
drop procedure usp_MyLongRunningProcedure;
go
create procedure usp_MyLongRunningProcedure
as
begin
waitfor delay '00:00:10';
end
go
truncate table AsyncExecResults;
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
select * from AsyncExecResults
order by finish_time;
Code borrowed heavily from here[/url] for reference.
I have really taken it down to just the bare-bones on purpose here so that I can test performance. What I would like is for there to be as little delay between the first start-time and last finish-time when results are sorted in ascending order. I have done some research and it seems the issue is that service broker is not constantly scanning the queue and then activating whatever is on it. It waits for a bit and then scans again. However, after I run all of the waitfor delay procedures, they should all enter the queue. So by the time SB checks a second time, they should surely all be there for SB to dispatch activation procedures.
Thanks for any help!
September 18, 2012 at 8:03 am
Your async handler routine should not exit after each message: Service broker expects its handlers to continue processing messages for as long as there are more messages available. As you discovered, it only rechecks after 5 seconds to see if the queue has emptied. Only if it hasn't then it starts another handler, for as far as the max_queue_readers setting permits it.
So the problem you need to fix is first of all in your handler procedure. Try this for example (not tested):
if object_id('usp_AsyncExecActivated') is not null
drop procedure usp_AsyncExecActivated;
go
create procedure usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;
while 1 = 1
begin
begin tran;
waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000;
if not @@rowcount > 0
begin
commit tran;
break; -- exit while 1 = 1
end
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;
select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
end
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
commit tran;
end
go
This will process your messages sequentially, until for more than 5 seconds the queue could not be emptied by a single instance. If the queue is not empty 5 seconds after the 1st instance got started a 2nd instance will be started, and 2 messages will be processed simultaneously. Then after 10 seconds and the queue is still not empty, a 3rd instance is started and 3 messages are processed at the same time (provided your server has at least 3 cpu's, otherwise you'll just be context switching) and so on, until either the maximum number of max_queue_readers is reached or the queue becomes empty for at least 5 seconds (= the timeout value on your waitfor( receive) statement).
In practice 25 queue readers is probably way to high.
September 19, 2012 at 1:47 am
For those trying to duplicate this experiment, for example to learn some Service broker basics from it, here is the same experimental code with documented improvements in it:
create database t2
go
use master
go
alter database t2 set enable_broker
go
use t2
go
create table AsyncExecResults (
token uniqueidentifier primary key,
start_time datetime null,
finish_time datetime null)
go
create queue AsyncExecQueue;
go
create service AsyncExecService
authorization dbo
on queue AsyncExecQueue
( [DEFAULT] -- default is NOT a keyword here. i.e. it MUST be
-- enclosed by square brackets.
);
GO
if object_id('usp_AsyncExecActivated') is not null
drop procedure dbo.usp_AsyncExecActivated;
go
if object_id('usp_AsyncExecActivated') is not null
drop procedure dbo.usp_AsyncExecActivated;
go
alter procedure dbo.usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;
while 1 = 1
begin
begin tran;
waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000;
if not @@rowcount > 0
begin
commit tran;
break; -- exit while 1 = 1
end
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;
select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
-- The chosen protocol says the conversation ends after the 1st
-- message is received. So let SSSB know we're done by ending the
-- conversation after each received message. Not ending the conversation
-- will leave all conversations open. When you get into the millions
-- of open conversations (see sys.conversation_endpoints), your server
-- will go slower and slower.
end conversation @h;
end
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
-- Don't forget to handle error messages too.
-- These also indicate the end of a conversation, just like EndDialog.
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
begin
-- Read the xml document that we were sent in the message body.
declare @nCode int;
declare @vchDescription nvarchar(2000);
declare @xmlError xml; -- (document
select @xmlError = convert(xml, @messageBody);
with xmlnamespaces( default 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
select
@nCode = Errors.error.value( 'Code[1]', 'int'),
@vchDescription = Errors.error.value( 'Description[1]', 'nvarchar(max)')
from @xmlError.nodes( '/Error[1]') as Errors(error);
-- Any text output by a SSSB queue reader will by default end up in the sql log,
-- so I just write out the output with nowait to get errors shown in the sql log.
-- Using "with log" would put the error in the sql log twice: once because of
-- the with log, plus once more because all output from an auto activated procedure
-- is written into the log. So don't specify with log here.
raiserror('Service broker error received. Error %d:%s. (%s)', 0, 0, @nCode, @vchDescription, '%PROC%') with nowait;
-- An Error message means the conversation has ended, just like with a EndDialog message.
-- So we tell SSSB we got the message and have it clean up our end of the conversation too.
end conversation @h;
end
commit tran;
end
go
alter queue dbo.AsyncExecQueue
with activation (
procedure_name = usp_AsyncExecActivated
, max_queue_readers = 25
, execute as owner
, status = on
), status = on, retention = off;
go
if object_id('usp_AsyncExecInvoke') is not null
drop procedure dbo.usp_AsyncExecInvoke;
go
alter procedure dbo.usp_AsyncExecInvoke
@procedureName sysname,
@token uniqueidentifier output
as
declare @h uniqueidentifier,
@xmlBody xml
set nocount on;
begin dialog conversation @h
from service AsyncExecService
to service N'AsyncExecService'--, 'current database'
with encryption = off;
select @xmlBody = (
select @procedureName as name
for xml path('procedure'), type);
send on conversation @h (@xmlBody);
go
if object_id('usp_MyLongRunningProcedure') is not null
drop procedure dbo.usp_MyLongRunningProcedure;
go
create procedure dbo.usp_MyLongRunningProcedure
as
begin
waitfor delay '00:00:10';
end
go
truncate table AsyncExecResults;
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
select * from AsyncExecResults
order by start_time, finish_time;
Viewing 3 posts - 1 through 3 (of 3 total)
You must be logged in to reply to this topic. Login to reply
This website stores cookies on your computer.
These cookies are used to improve your website experience and provide more personalized services to you, both on this website and through other media.
To find out more about the cookies we use, see our Privacy Policy