How to implement a work queue in SQL server

How to implement a work queue in SQL server script is attached just below. Any comment is welcome.

/******************************************************************************
 * Creates WorkQueue table                                                    *
 ******************************************************************************/
CREATE TABLE dbo.WorkQueue
 (
 WQID int NOT NULL,
 Field1 varchar(10) NULL,
 Field2 varchar(10) NULL,
 Field3 varchar(10) NULL,
 Field4 varchar(50) NULL,
 Field5 varchar(50) NULL,
 FieldN varchar(50) NULL,
 Stage1Flag tinyint NOT NULL,
 Stage2Flag tinyint NOT NULL,
 Stage3Flag tinyint NOT NULL
 )  ON [PRIMARY]
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 DF_WorkQueue_Stage1Flag DEFAULT 0 FOR Stage1Flag
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 DF_WorkQueue_Stage2Flag DEFAULT 0 FOR Stage2Flag
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 DF_WorkQueue_Stage3Flag DEFAULT 0 FOR Stage3Flag
GO

ALTER TABLE dbo.WorkQueue ADD CONSTRAINT
 PK_WorkQueue PRIMARY KEY CLUSTERED
 (
 WQID
 ) ON [PRIMARY]
GO

/******************************************************************************
 * Creates WorkQueue index which will be used by TOP procedures               *
 * ATTENTION: Do not delete this index, since it is masterkey to the process  *
 ******************************************************************************/
CREATE  INDEX [IX_WorkQueue_Stages] ON [dbo].[WorkQueue]([Stage1Flag], [Stage2Flag], [Stage3Flag]) ON [PRIMARY]
GO

/******************************************************************************
 * Creates WorkQueueErrors to hold errors which might occurs eventually       *
 ******************************************************************************/
CREATE TABLE dbo.WorkQueueErrors
 (
 WQEID int NOT NULL IDENTITY (1, 1),
 WQID int NOT NULL,
 Stage tinyint NOT NULL,
 Error varchar(255) NOT NULL
 )  ON [PRIMARY]
GO

ALTER TABLE dbo.WorkQueueErrors ADD CONSTRAINT
 PK_WorkQueueErrors PRIMARY KEY CLUSTERED
 (
 WQEID
 ) ON [PRIMARY]

GO

ALTER TABLE dbo.WorkQueueErrors ADD CONSTRAINT
 FK_WorkQueueErrors_WorkQueue FOREIGN KEY
 (
 WQID
 ) REFERENCES dbo.WorkQueue
 (
 WQID
 )
GO

/******************************************************************************
 * Creates procedure WorkQueueStage1Top                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage1Top  -- Last Input First Output (LIFO)
AS
SELECT TOP 1 dbo.WorkQueue.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages))
WHERE (Stage1Flag = 0) -- 0 = not processed
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage1Pop                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage1Pop
(@WQID int,
 @Stage1Flag tinyint -- ATTENTION: parameter must be set to 1 (processed without error) or 2 (processed with error(s))
)
AS
UPDATE dbo.WorkQueue
SET Stage1Flag = @Stage1Flag
WHERE WQID = @WQID
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage2Top                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage2Top  -- Last Input First Output (LIFO)
AS
SELECT TOP 1 dbo.WorkQueue.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages))
WHERE (Stage1Flag = 1) AND -- 1 = processed without error
      (Stage2Flag = 0)     -- 0 = not processed
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage2Pop                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage2Pop
(@WQID int,
 @Stage2Flag tinyint -- ATTENTION: parameter must be set to 1 (processed without error) or 2 (processed with error(s))
)
AS
UPDATE dbo.WorkQueue
SET Stage2Flag = @Stage2Flag
WHERE WQID = @WQID
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage3Top                                        *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage3Top  -- Last Input First Output (LIFO)
AS
SELECT TOP 1 dbo.WorkQueue.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages))
WHERE (Stage1Flag = 1) AND -- 1 = processed without error
      (Stage2Flag = 1) AND -- 1 = processed without error
      (Stage3Flag = 0)     -- 0 = not processed
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueStage3Pop                                       *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueStage3Pop
(@WQID int,
 @Stage3Flag tinyint -- ATTENTION: parameter must be set to 1 (processed without error) or 2 (processed with error(s))
)
AS
UPDATE dbo.WorkQueue
SET Stage3Flag = @Stage3Flag
WHERE WQID = @WQID
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsPush                                      *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsPush
(@WQID int,
 @Stage tinyint,
 @Error varchar(255)
)
AS
INSERT INTO dbo.WorkQueueErrors
(WQID, Stage, Error)
VALUES
(@WQID, @Stage, @Error)
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsGetStage1                                 *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsGetStage1
AS
SELECT dbo.WorkQueue.*,
       dbo.WorkQueueErrors.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages)) INNER JOIN
     dbo.WorkQueueErrors ON dbo.WorkQueue.WQID = dbo.WorkQueueErrors.WQID
WHERE (dbo.WorkQueue.Stage1Flag = 2) AND -- 2 = processed with error(s)
      (dbo.WorkQueueErrors.Stage = 1)
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsGetStage2                                 *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsGetStage2
AS
SELECT dbo.WorkQueue.*,
       dbo.WorkQueueErrors.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages)) INNER JOIN
     dbo.WorkQueueErrors ON dbo.WorkQueue.WQID = dbo.WorkQueueErrors.WQID
WHERE (dbo.WorkQueue.Stage1Flag = 1) AND -- 1 = processed without error
      (dbo.WorkQueue.Stage2Flag = 2) AND -- 2 = processed with error(s)
      (dbo.WorkQueueErrors.Stage = 2)
RETURN
GO

/******************************************************************************
 * Creates procedure WorkQueueErrorsGetStage3                                 *
 ******************************************************************************/
CREATE PROCEDURE WorkQueueErrorsGetStage3
AS
SELECT dbo.WorkQueue.*,
       dbo.WorkQueueErrors.*
FROM dbo.WorkQueue WITH (READPAST, ROWLOCK, UPDLOCK, INDEX(IX_WorkQueue_Stages)) INNER JOIN
     dbo.WorkQueueErrors ON dbo.WorkQueue.WQID = dbo.WorkQueueErrors.WQID
WHERE (dbo.WorkQueue.Stage1Flag = 1) AND -- 1 = processed without error
      (dbo.WorkQueue.Stage2Flag = 1) AND -- 1 = processed without error
      (dbo.WorkQueue.Stage3Flag = 2) AND -- 2 = processed with error(s)
      (dbo.WorkQueueErrors.Stage = 3)
RETURN
GO

5 Comments

  • Pardon my ignorance, but what is processing the work queue? Or were your intentions here at just providing a store and retrieval mechanism and letting something else do the actual processing?

  • It would be easier on the eyes to post this as an article and just provide a short post with a link to it. That way the main feed page does not get too large or hard to read :)

  • Aaron,



    I posted just the implementation. As my work queues often involves 1) getting a record, 2) inputing/processing the record in a mainframe system and 3) then setting it as processed, I usually create Visual Basic programs which consumes, processes and sets the work queue through ADO callings to stored procedures similar to the ones I posted.



  • Roy,



    Thanks for your comment. I am new to the blog world. When setting/administering my blog I saw there was tabs for posts and articles, but I did not figure out what the differences between articles and posts and how to use articles at all. Would you mind commenting more on articles in .Text?

  • Luciano,

    It would be nice if in further posts you extend this and show examples of you could do your implementation.



    I do have some questions/suggestions on why you may have done a few things though. First of all, why did you chose only 3 stages? Would it be more extensible if you instead each row was just one stage, however you had another field that identified a previous row that it was a depedency of. Now assuming you did 3 stages per row to cut down on the number of rows that will be in the table, it makes sense as the larger the table the longer it takes to parse. Then speaking of a large table, what type of cleanup work do you recomend doing on the queue? (how long do records stay in there, do you archive them for future reference, etc)

Comments have been disabled for this content.