|
|
|
SSC Rookie
      
Group: General Forum Members
Last Login: Yesterday @ 7:31 PM
Points: 37,
Visits: 233
|
|
I would like to call stored procs asynchronously, but there seems to be a delay in service broker so that not all of the procedures finish at around the same time. Actually, the delay is significant at several seconds. I have max readers higher than the amount of procedures I am calling. I have tried using the same conversation, multiple conversations, receiving more than top(1) --- although for high concurrency top(1) should be best --- and using waitfor with a timeout. Here's what it looks like:
create table AsyncExecResults ( token uniqueidentifier primary key, start_time datetime null, finish_time datetime null) go
create queue AsyncExecQueue; go
create service AsyncExecService on queue AsyncExecQueue (DEFAULT); GO
if object_id('usp_AsyncExecActivated') is not null drop procedure usp_AsyncExecActivated; go
create procedure usp_AsyncExecActivated as set nocount on; declare @h uniqueidentifier, @messageTypeName sysname, @messageBody varbinary(max), @xmlBody xml, @procedureName sysname, @startTime datetime, @finishTime datetime, @token uniqueidentifier;
waitfor( receive top(1) @h = conversation_handle, @messageTypeName = message_type_name, @messageBody = message_body from AsyncExecQueue ), TIMEOUT 5000
if (@h is not null) begin if (@messageTypeName = N'DEFAULT') begin select @xmlBody = CAST(@messageBody as xml); select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP; exec(@procedureName); select @finishTime = CURRENT_TIMESTAMP; select @token = conversation_id from sys.conversation_endpoints where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token) values( @startTime, @finishTime, @token); end else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') begin end conversation @h; end end go
alter queue AsyncExecQueue with activation ( procedure_name = usp_AsyncExecActivated , max_queue_readers = 25 , execute as owner , status = on ), status = on, retention = off; go
if object_id('usp_AsyncExecInvoke') is not null drop procedure usp_AsyncExecInvoke; go
create procedure usp_AsyncExecInvoke @procedureName sysname, @token uniqueidentifier output as declare @h uniqueidentifier, @xmlBody xml set nocount on;
begin dialog conversation @h from service AsyncExecService to service N'AsyncExecService', 'current database' with encryption = off;
select @xmlBody = ( select @procedureName as name for xml path('procedure'), type);
send on conversation @h (@xmlBody);
go
if object_id('usp_MyLongRunningProcedure') is not null drop procedure usp_MyLongRunningProcedure; go
create procedure usp_MyLongRunningProcedure as begin waitfor delay '00:00:10'; end go
truncate table AsyncExecResults;
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
select * from AsyncExecResults order by finish_time;
Code borrowed heavily from here for reference. I have really taken it down to just the bare-bones on purpose here so that I can test performance. What I would like is for there to be as little delay between the first start-time and last finish-time when results are sorted in ascending order. I have done some research and it seems the issue is that service broker is not constantly scanning the queue and then activating whatever is on it. It waits for a bit and then scans again. However, after I run all of the waitfor delay procedures, they should all enter the queue. So by the time SB checks a second time, they should surely all be there for SB to dispatch activation procedures.
Thanks for any help!
|
|
|
|
|
SSC-Addicted
      
Group: General Forum Members
Last Login: Sunday, May 12, 2013 4:01 AM
Points: 406,
Visits: 1,364
|
|
Your async handler routine should not exit after each message: Service broker expects its handlers to continue processing messages for as long as there are more messages available. As you discovered, it only rechecks after 5 seconds to see if the queue has emptied. Only if it hasn't then it starts another handler, for as far as the max_queue_readers setting permits it.
So the problem you need to fix is first of all in your handler procedure. Try this for example (not tested):
if object_id('usp_AsyncExecActivated') is not null drop procedure usp_AsyncExecActivated; go
create procedure usp_AsyncExecActivated as set nocount on; declare @h uniqueidentifier, @messageTypeName sysname, @messageBody varbinary(max), @xmlBody xml, @procedureName sysname, @startTime datetime, @finishTime datetime, @token uniqueidentifier;
while 1 = 1 begin
begin tran;
waitfor( receive top(1) @h = conversation_handle, @messageTypeName = message_type_name, @messageBody = message_body from AsyncExecQueue ), TIMEOUT 5000; if not @@rowcount > 0 begin commit tran; break; -- exit while 1 = 1 end if (@messageTypeName = N'DEFAULT') begin select @xmlBody = CAST(@messageBody as xml); select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP; exec(@procedureName); select @finishTime = CURRENT_TIMESTAMP; select @token = conversation_id from sys.conversation_endpoints where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token) values( @startTime, @finishTime, @token); end else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') begin end conversation @h; end commit tran; end go
This will process your messages sequentially, until for more than 5 seconds the queue could not be emptied by a single instance. If the queue is not empty 5 seconds after the 1st instance got started a 2nd instance will be started, and 2 messages will be processed simultaneously. Then after 10 seconds and the queue is still not empty, a 3rd instance is started and 3 messages are processed at the same time (provided your server has at least 3 cpu's, otherwise you'll just be context switching) and so on, until either the maximum number of max_queue_readers is reached or the queue becomes empty for at least 5 seconds (= the timeout value on your waitfor( receive) statement). In practice 25 queue readers is probably way to high.
Posting Data Etiquette - Jeff Moden Posting Performance Based Questions - Gail Shaw Hidden RBAR - Jeff Moden Cross Tabs and Pivots - Jeff Moden Catch-all queries - Gail Shaw
If you don't have time to do it right, when will you have time to do it over?
|
|
|
|
|
SSC-Addicted
      
Group: General Forum Members
Last Login: Sunday, May 12, 2013 4:01 AM
Points: 406,
Visits: 1,364
|
|
For those trying to duplicate this experiment, for example to learn some Service broker basics from it, here is the same experimental code with documented improvements in it:
create database t2 go
use master go
alter database t2 set enable_broker go
use t2 go
create table AsyncExecResults ( token uniqueidentifier primary key, start_time datetime null, finish_time datetime null) go
create queue AsyncExecQueue; go
create service AsyncExecService authorization dbo on queue AsyncExecQueue ( [DEFAULT] -- default is NOT a keyword here. i.e. it MUST be -- enclosed by square brackets. ); GO
if object_id('usp_AsyncExecActivated') is not null drop procedure dbo.usp_AsyncExecActivated; go
if object_id('usp_AsyncExecActivated') is not null drop procedure dbo.usp_AsyncExecActivated; go
alter procedure dbo.usp_AsyncExecActivated as set nocount on; declare @h uniqueidentifier, @messageTypeName sysname, @messageBody varbinary(max), @xmlBody xml, @procedureName sysname, @startTime datetime, @finishTime datetime, @token uniqueidentifier;
while 1 = 1 begin
begin tran;
waitfor( receive top(1) @h = conversation_handle, @messageTypeName = message_type_name, @messageBody = message_body from AsyncExecQueue ), TIMEOUT 5000; if not @@rowcount > 0 begin commit tran; break; -- exit while 1 = 1 end if (@messageTypeName = N'DEFAULT') begin select @xmlBody = CAST(@messageBody as xml); select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP; exec(@procedureName); select @finishTime = CURRENT_TIMESTAMP; select @token = conversation_id from sys.conversation_endpoints where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token) values( @startTime, @finishTime, @token); -- The chosen protocol says the conversation ends after the 1st -- message is received. So let SSSB know we're done by ending the -- conversation after each received message. Not ending the conversation -- will leave all conversations open. When you get into the millions -- of open conversations (see sys.conversation_endpoints), your server -- will go slower and slower. end conversation @h; end else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') begin end conversation @h; end
-- Don't forget to handle error messages too. -- These also indicate the end of a conversation, just like EndDialog. else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error') begin -- Read the xml document that we were sent in the message body. declare @nCode int; declare @vchDescription nvarchar(2000); declare @xmlError xml; -- (document
select @xmlError = convert(xml, @messageBody); with xmlnamespaces( default 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') select @nCode = Errors.error.value( 'Code[1]', 'int'), @vchDescription = Errors.error.value( 'Description[1]', 'nvarchar(max)') from @xmlError.nodes( '/Error[1]') as Errors(error);
-- Any text output by a SSSB queue reader will by default end up in the sql log, -- so I just write out the output with nowait to get errors shown in the sql log. -- Using "with log" would put the error in the sql log twice: once because of -- the with log, plus once more because all output from an auto activated procedure -- is written into the log. So don't specify with log here. raiserror('Service broker error received. Error %d:%s. (%s)', 0, 0, @nCode, @vchDescription, '%PROC%') with nowait;
-- An Error message means the conversation has ended, just like with a EndDialog message. -- So we tell SSSB we got the message and have it clean up our end of the conversation too. end conversation @h; end commit tran; end go
alter queue dbo.AsyncExecQueue with activation ( procedure_name = usp_AsyncExecActivated , max_queue_readers = 25 , execute as owner , status = on ), status = on, retention = off; go
if object_id('usp_AsyncExecInvoke') is not null drop procedure dbo.usp_AsyncExecInvoke; go
alter procedure dbo.usp_AsyncExecInvoke @procedureName sysname, @token uniqueidentifier output as declare @h uniqueidentifier, @xmlBody xml set nocount on;
begin dialog conversation @h from service AsyncExecService to service N'AsyncExecService'--, 'current database' with encryption = off;
select @xmlBody = ( select @procedureName as name for xml path('procedure'), type);
send on conversation @h (@xmlBody);
go
if object_id('usp_MyLongRunningProcedure') is not null drop procedure dbo.usp_MyLongRunningProcedure; go
create procedure dbo.usp_MyLongRunningProcedure as begin waitfor delay '00:00:10'; end go
truncate table AsyncExecResults;
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
declare @token uniqueidentifier; exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output; go
select * from AsyncExecResults order by start_time, finish_time;
Posting Data Etiquette - Jeff Moden Posting Performance Based Questions - Gail Shaw Hidden RBAR - Jeff Moden Cross Tabs and Pivots - Jeff Moden Catch-all queries - Gail Shaw
If you don't have time to do it right, when will you have time to do it over?
|
|
|
|