Qt-多线程之Concurrent

概述

在使用多线程时,我们经常会子类化QThread或将QObject``moveToThread来实现。

Qt中提供了一种高级方法,即QtConcurrentQtConcurrent模块提供了包括runmapfilter函数。

Concurrent Run

函数原型:

1
2
3
4
5
6
7
// (1)
template <typename T>
QFuture<T> QtConcurrent::run(Function function, ...)

// (2)
template <typename T>
QFuture<T> QtConcurrent::run(QThreadPool *pool, Function function, ...)

原型(1)的效果即为在另一个线程中运行function函数,缺省的为参数,即:

1
2
3
4
5
6
extern void fun(int i, double d, const QString& str)

int a;
double b;
QString c;
Qt::Concurrent::run(fun, a, b, c);

而原型(2)则在(1)的基础上增加了一个线程池的指针参数,即可以传入一个专有线程池用于执行函数。

返回值

由函数原型可以看到返回值为QFuture模板变量,则可以使用以下方式获取返回值:

1
2
3
4
extern QString fun();

QFuture<QString> future = QtConcurrent::run(fun);
QString str = future.result();

但是注意,QFuture::result()会阻塞当前线程,直到结果可用。

可以使用QFutureWatcher监测该QFuture直到发出finished信号,且QtConcurrent返回的QFuture不支持canceledpaused

其他特性

  1. QtConcurrent::run还可以接收成员函数的指针,常量成员函数一般传递常量引用,而非常量成员函数一般传递指针,如下:
1
2
3
4
5
// run byteArray.split(',') in other thread
QByteArray byteArray = "hello world";
QFuture<QList<QByteArray> > future = QtConcurrent::run(byteArray, &QByteArray::split, ',');

QList<QByteArray> result = future.result();
  1. 也可以使用Lambda表达式作为参数:
1
2
3
4
QFuture<void> future = QtConcurrent::run([=]()->void
{
// Code in this block will run in another thread
});

Concurrent Map and Map-Reduce

map相关的函数主要应用场景是在单独的线程里对容器中的每一项进行操作。这些函数主要分为三类:

  1. QtConcurrent::map:直接操作容器中的每一项。
  2. QtConcurrent::mapped:操作容器中的每一项,将处理结果返回一个新的容器,原容器不变。
  3. QtConcurrent::mappedReduced:在mapped的基础上将处理结果进一步传递给一个函数继续处理。

QtConcurrent::map

函数原型:

1
2
3
4
5
6
7
// (1)
template <typename Sequence, typename MapFunctor>
QFuture<void> QtConcurrent::map(Sequence &sequence, MapFunctor function)

// (2)
template <typename Iterator, typename MapFunctor>
QFuture<void> QtConcurrent::map(Iterator begin, Iterator end, MapFunctor function)

原型(1)直接传入一个容器,对其中每个元素执行function,而原型(2)则是传入迭代器,执行相同操作。

注意

  • 传入的function原型必须是U function(T &t)的形式(UT可以是任何类型,但是T必须与传入的容器存储的类型相同)。
  • 因为是直接修改,所以该函数不会返回任何QFuture结果,但仍然可以使用QFutureWatcher来监视返回的QFuture
1
2
3
4
5
6
7
8
void add(int &number)
{
number += number;
}

QList<int> numberList = {1, 2, 3, 4, 5};
QFuture<void> future = QtConcurrent::map(numberList, add);
// numberList will be {2, 4, 6, 8, 10}

QtConcurrent::mapped

函数原型:

1
2
3
4
5
6
7
// (1)
template <typename Sequence, typename MapFunctor>
QFuture<typename QtPrivate::MapResultType<void, MapFunctor>::ResultType> QtConcurrent::mapped(const Sequence &sequence, MapFunctor function)

// (2)
template <typename Iterator, typename MapFunctor>
QFuture<typename QtPrivate::MapResultType<void, MapFunctor>::ResultType> QtConcurrent::mapped(Iterator begin, Iterator end, MapFunctor function)

两个原型的差别与上面相同,(1)是传入容器,(2)传入的是迭代器。且都是为容器内每个对象执行函数,但不会改变容器的每一项,会将结果返回到一个新的容器。

注意

  1. 传入的function原型必须是U function(const T &t)的形式(UT可以是任何类型,但是T必须与传入的容器存储的类型相同);
  2. 返回的结果将会存储到QFuture中,可以使用QFuture::const_iteratorQFutureIterator访问:
1
2
3
4
5
6
7
8
9
10
11
int add(const int &number)
{
return number + number;
}

QList<int> numberList = {1, 2, 3, 4, 5};
QFuture<int> future = QtConcurrent::map(numberList, add);
// numberList still {1, 2, 3, 4, 5}

QFutureIterator<int> i(future);
// like {2, 4, 6, 8, 10}

QtConcurrent::mappedReduced

函数原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
// (1)
template <typename ResultType, typename Sequence, typename MapFunctor, typename ReduceFunctor>
QFuture<ResultType> QtConcurrent::mappedReduced(const Sequence &sequence,
MapFunctor mapFunction,
ReduceFunctor reduceFunction,
QtConcurrent::ReduceOptions reduceOptions = ReduceOptions(UnorderedReduce | SequentialReduce))

// (2)
template <typename ResultType, typename Iterator, typename MapFunctor, typename ReduceFunctor>
QFuture<ResultType> QtConcurrent::mappedReduced(Iterator begin, Iterator end,
MapFunctor mapFunction,
ReduceFunctor reduceFunction,
QtConcurrent::ReduceOptions reduceOptions = ReduceOptions(UnorderedReduce | SequentialReduce))

原型的差异依旧与上面相同,一个传入容器,一个则是迭代器。且两者的内部操作都是对传入的容器中对象执行完mapFunctor的结果,传入ReduceFunctor中再执行一次。

注意

  1. 传入的mapFunction需满足U function(const T &t)的形式(UT可以是任何类型,但是T必须与传入的容器存储的类型相同);
  2. reduceFunction则需要满足V function(T &result, const U &intermediate)格式,T为最终结果类型,UmapFunction返回的类型,而V则是没有使用到的类型和返回值;
  3. reduceOptions则有三种:
    1. QtConcurrent::UnorderedReduce = 0x1:以任意顺序完成;
    2. QtConcurrent::OrderedReduce = 0x2:按原始序列的顺序完成;
    3. QtConcurrent::SequentialReduce = 0x3:按顺序完成:一次只有一个线程进入reduce函数。

在下面的代码中,原始数据list的每一项都被fun1函数处理,再将结果传到fun2函数继续处理,其中intermedia就是上一步的结果,最终会以result这个变量输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int fun1(const int &number)
{
return number + number;
}

void fun2(int &result, const int &intermedia)
{
result += media;
}

QList<int> numberList = {1, 2, 3, 4, 5};
QFuture<int> future = QtConcurrent::mappedReduced(numberList, fun1, fun2);
// numberList still {1, 2, 3, 4, 5}
// future.result() is 30 = (1 + 1) + (2 + 2) + (3 + 3) + (4 + 4) +(5 + 5)

Concurrent Filter and Filter-Reduce

filter相关的函数也与map相关函数类似,也是对容器中元素进行处理,只是filter注重筛选元素。也是分为三种:

  1. QtConcurrent::filter:直接对容器进行筛选,结果反馈到容器中;
  2. QtConcurrent::filtered:对容器进行筛选,但结果以新容器返回;
  3. QtConcurrent::filteredReduced:在filtered的基础上,将筛选后的结果处理为单一的值。

QtConcurrent::filter

函数原型:

1
2
template <typename Sequence, typename KeepFunctor>
QFuture<void> QtConcurrent::filter(Sequence &sequence, KeepFunctor filterFunction)

filter仅支持传入容器而不支持迭代器(Qt5.15),对每个对象执行filterFunciton,根据返回值决定对象的去留。

注意

传入的filterFunction仅支持bool function(const T &t)格式,T必须与容器存储的对象类型匹配,如果要保留则返回true,反之则返回false

1
2
3
4
5
6
7
8
9
10
11
12
bool fun(const int &number)
{
if(number % 2)
{
return true;
}
return false;
}

QList<int> numberList = {1, 2, 3, 4, 5};
QtConcurrent::filter(numberList, fun);
// numberList will be {1, 3, 5}

QtConcurrent::filtered

函数原型:

1
2
3
4
5
6
7
// (1)
template <typename Sequence, typename KeepFunctor>
QFuture<typename Sequence::value_type> QtConcurrent::filtered(const Sequence &sequence, KeepFunctor filterFunction)

// (2)
template <typename Iterator, typename KeepFunctor>
QFuture<typename qValueType<Iterator>::value_type> QtConcurrent::filtered(Iterator begin, Iterator end, KeepFunctor filterFunction)

与上面类似,filtered的两个函数原型分别是传入容器和迭代器的差异。两者都会将容器内对象进行筛选,并将结果返回到新容器当中。

注意

传入的filterFunctionfilter的一致,都仅支持bool function(const T &t)格式,T必须与容器存储的对象类型匹配,如果要保留到新容器则返回true,反之则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool fun(const int &number)
{
if(number % 2)
{
return true;
}
return false;
}

QList<int> numberList = {1, 2, 3, 4, 5};
QFuture<int> future = QtConcurrent::filtered(numberList, fun);
// numberList still {1, 2, 3, 4, 5}

QFutureIterator<int> i(future);
// like {2, 4, 6, 8, 10}

QtConcurrent::filteredReduced

函数原型:

1
2
3
4
5
6
7
// (1)
template <typename ResultType, typename Sequence, typename KeepFunctor, typename ReduceFunctor>
QFuture<ResultType> QtConcurrent::filteredReduced(const Sequence &sequence, KeepFunctor filterFunction, ReduceFunctor reduceFunction, QtConcurrent::ReduceOptions reduceOptions = ReduceOptions(UnorderedReduce | SequentialReduce))

// (2)
template <typename ResultType, typename Iterator, typename KeepFunctor, typename ReduceFunctor>
QFuture<ResultType> QtConcurrent::filteredReduced(Iterator begin, Iterator end, KeepFunctor filterFunction, ReduceFunctor reduceFunction, QtConcurrent::ReduceOptions reduceOptions = ReduceOptions(UnorderedReduce | SequentialReduce))

filteredReduced也有两个原型,差异在于容器与迭代器,不过多赘述。此函数会将容器中的对象先通过filterFunction,再将这些结果通过reduceFunction,最终反馈到返回的QFuture中。

注意

  1. 传入的filterFunction原型与filterfiltered一致;
  2. 传入的reduceFunction原型为V function(T &result, const U &intermediate)T为最终结果类型,UmapFunction返回的类型,而V则是没有使用到的类型和返回值;
  3. reduceOptionsmappedReduced一致,不过多赘述,下面看示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool fun1(const int &number)
{
if(number % 2)
{
return true;
}
return false;
}

void fun2(int &result, const int &intermedia)
{
result += intermedia;
}

QList<int> numberList = {1, 2, 3, 4, 5};
QFuture<int> future = QtConcurrent::filteredReduced(numberList, fun1, fun2);
// numberList still {1, 2, 3, 4, 5}
// future.result() is 9 = 1 + 3 + 5

Blocking

上述包括mapmappedmappedReducedfilterfilteredfilteredReduced都是在另外一个线程中执行的。因为操作是异步的,所以不会立即返回值,如果立即去使用容器,则会出现:

  1. 无变化:mapfilter还未对原容器执行操作;
  2. 指针越界:mappedmappedReducedfilteredfilteredReduced操作未完成,返回为空。

使用以上函数时需要对返回值进行判断:

1
2
3
4
5
6
7
...
QFuture<int> future = QtConcurrent::map(list, fun);
// use future do something must wait for future is finished
while(!future.isFinished())
{
}
}

上面的函数都有blocking的写法,即等待结果返回后再执行下一步:

1
2
3
QFuture<int> future = QtConcurrent::blockingMap(list, fun);
// use future do something without waiting
}

blocking系列包括:

  • QtConcurrent::blockingMap
  • QtConcurrent::blockingMapped
  • QtConcurrent::blockingMappedReduced
  • QtConcurrent::blockingfilter
  • QtConcurrent::blockingfiltered
  • QtConcurrent::blockingfilteredReduced

使用方法与非blocking一致