// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"sync"
"testing"
"time"
"code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert"
)
func TestChannelQueue(t *testing.T) {
handleChan := make(chan *testData)
handle := func(data ...Data) []Data {
for _, datum := range data {
testDatum := datum.(*testData)
handleChan <- testDatum
}
return nil
nilFn := func(_ func()) {}
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: 0,
MaxWorkers: 10,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
Name: "TestChannelQueue",
},
Workers: 0,
}, &testData{})
assert.NoError(t, err)
assert.Equal(t, 5, queue.(*ChannelQueue).WorkerPool.boostWorkers)
go queue.Run(nilFn, nilFn)
test1 := testData{"A", 1}
go queue.Push(&test1)
result1 := <-handleChan
assert.Equal(t, test1.TestString, result1.TestString)
assert.Equal(t, test1.TestInt, result1.TestInt)
err = queue.Push(test1)
assert.Error(t, err)
func TestChannelQueue_Batch(t *testing.T) {
assert.True(t, len(data) == 2)
QueueLength: 20,
BatchLength: 2,
BlockTimeout: 0,
BoostTimeout: 0,
BoostWorkers: 0,
Workers: 1,
test2 := testData{"B", 2}
queue.Push(&test1)
go queue.Push(&test2)
result2 := <-handleChan
assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt)
func TestChannelQueue_Pause(t *testing.T) {
lock := sync.Mutex{}
var queue Queue
var err error
pushBack := false
lock.Lock()
if pushBack {
if pausable, ok := queue.(Pausable); ok {
pausable.Pause()
lock.Unlock()
return data
queueShutdown := []func(){}
queueTerminate := []func(){}
terminated := make(chan struct{})
queue, err = NewChannelQueue(handle,
BatchLength: 1,
go func() {
queue.Run(func(shutdown func()) {
defer lock.Unlock()
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate)
})
close(terminated)
}()
// Shutdown and Terminate in defer
defer func() {
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
for _, callback := range callbacks {
callback()
log.Info("Finally terminating")
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
pausable, ok := queue.(Pausable)
if !assert.True(t, ok) {
return
paused, _ := pausable.IsPausedIsResumed()
select {
case <-paused:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
queue.Push(&test2)
var result2 *testData
case result2 = <-handleChan:
assert.Fail(t, "handler chan should be empty")
assert.Nil(t, result2)
pausable.Resume()
_, resumed := pausable.IsPausedIsResumed()
case <-resumed:
assert.Fail(t, "Queue should be resumed")
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "handler chan should contain test2")
pushBack = true
_, resumed = pausable.IsPausedIsResumed()
assert.Fail(t, "Queue is not resumed")
paused, _ = pausable.IsPausedIsResumed()
case <-handleChan:
assert.Fail(t, "handler chan should not contain test1")
assert.Fail(t, "queue should be paused")
pushBack = false
case result1 = <-handleChan:
assert.Fail(t, "handler chan should contain test1")
queueShutdown = queueShutdown[:0]
// Now shutdown the queue
// terminate the queue
queueShutdown = queueTerminate[:0]
case <-terminated:
case <-time.After(10 * time.Second):
assert.Fail(t, "Queue should have terminated")