OpenMP Programming(1. Basic)

发布于 2023-05-03  132 次阅读


OpenMP

是一个可以(特别)方便写出并行代码的工具。对于非科班简直不要太友好。

!Not! MSVC

MSVC对OpenMP的支持可谓一拖十。所以为了完成我们的OMP教学,建议使用GCC/Clang/ICC.

Example: Hello-OpenMP

我们如果想并行地运行一段代码,那么我们可以简单的使用#pragma omp parallel并加上一个scope来表示我们要并行这个scope内的代码。

下面是并行打出Hello World的例子,其omp_get_thread_num函数用来得到当前线程的ID,这个函数来自头文件\<omp.h>.

#include <cstdio>
#include <omp.h>
int main(int argc, char* argv[]) {
#pragma omp parallel 
    {
        printf("Hello World... from thread = %d\n", omp_get_thread_num());
    }
    return 0;
}

Parallel For

其实更多的时候,我们希望并行for语句。举个例子,对于一个巨大的循环,我们希望把循环平均切割成指定线程数量的个数,这样我们可以充分利用每个CPU核心的缓存和算力。

这在OpenMP中是简单的,我们只需要一行就可以搞定。我们可以通过简单的打印来证明它确实并行了。

#include <cstdio>
#include <omp.h>
int main(int argc, char* argv[]) {
#pragma omp parallel for
    for(int i = 0; i < 1024; ++i){
        printf("%d ", i);
    }
    // ...20 551 21 22 23 552 24 25 26 553 27 554 28...
    return 0;
}

这里有一个更好的例子,就是估算\pi

根据公式

\int_0^1 \frac{4}{1 + x^2} dx = \pi \approx \Delta x
\sum \frac{4}{1 + n^2}

我们就可以对0到1范围采样来估算积分值。

我们可以让每一个线程得到部分和,然后再用主线程把它们加起来,最后乘\Delta x即可。

#include <cstdio>
#include <omp.h>
#define NUM_THREADS 2
static long num_steps = 100000;
double step;

int main()
{
    int i, nthreads;
    double pi,sum[NUM_THREADS],t1,t2;
    step = 1.0/(double) num_steps;  

    t1 = omp_get_wtime();
    omp_set_num_threads(NUM_THREADS); // set num of threads could be launched
    #pragma omp parallel
    {
        int i,id, nthrds;
        double x;
        id = omp_get_thread_num();
        nthrds = omp_get_num_threads();
        if (id==0) nthreads = nthrds;
        for (i=id;i<num_steps;i=i+nthrds) {
            x = (i+0.5)*step;
            sum[id] = sum[id] + 4.0/(1.0+x*x);
        }
    }
    for (i=0,pi=0;i<nthreads;i++) pi += sum[i] * step;
    t2 = omp_get_wtime();
    printf("pi: %f \n", pi);
    printf("finished in %f milisecond\n", (t2-t1)*1000);
}

注意到,这里使用了omp_set_num_threads,这个函数的意思是对未指定线程数的parallel块,使用这个函数指定的线程数来进行并行。omp_get_num_threads的意思是得到上述值。

我们使用i += nthrds是为了保证num_steps都可以被遍历到。

False Sharing

在这里我们尝试加速上述代码,最简单的方式就是将宏NUM_THREADS调大。你很快就会发现,当调大到一定程度时,程序消耗的时间居然和线程数不再线性相关了!这是因为发生了False Sharing,中文名叫伪共享。

False Sharing发生的原因是CPU和内存速度差距太大,人们为了解决这一问题发明了Cascaded Cache,Cache用于缓存内存中某一块区域,这一块我们称之为缓存行(Cache Line),当两个CPU核修改同一个缓存行的时候,为了防止发生数据不同步的错误,CPU会强行将内存的数据和缓存行同步,然后另一个CPU核必须重新读取内存的数据放到Cache。

发现问题了没,这就是缓存行重叠导致的,为了避免缓存行重叠,我们只需要让不同的CPU核占据的数据占满整数个缓存行就行了。这一处理方式叫做Padding.

下面是重写的Padding后的代码。

#include <cstdio>
#include <omp.h>
#define NUM_THREADS 2
#define PAD 8 //depend on L1 cache line size
static long num_steps = 100000;
double step;

int main()
{
    int i, nthreads;
    double pi,sum[NUM_THREADS][PAD],t1,t2;
    step = 1.0/(double) num_steps;  

    t1 = omp_get_wtime();
    omp_set_num_threads(NUM_THREADS);
    #pragma omp parallel
    {
        int i,id, nthrds;
        double x;
        id = omp_get_thread_num();
        nthrds = omp_get_num_threads();
        if (id==0) nthreads = nthrds;
        for (i=id;i<num_steps;i=i+nthrds) {
            x = (i+0.5)*step;
            sum[id][0] = sum[id][0] + 4.0/(1.0+x*x);
        }
    }
    for (i=0,pi=0;i<nthreads;i++) pi += sum[i][0] * step;
    t2 = omp_get_wtime();
    printf("pi: %f \n", pi);
    printf("finished in %f milisecond\n", (t2-t1)*1000);
}

Critical

Critical是一个只允许一个线程进入临界区的标记。子句通过强制线程在执行临界区域时互斥来避免数据竞争。这意味着,在任何时候,只有一个线程可以进入临界区域。这种方法可能会导致性能下降,因为线程可能需要等待其他线程完成临界区域(后面有更好的方法)。

#include <cstdio>
#include <omp.h>

#define NUM_THREADS 1
static long num_steps = 100000;
double step;

int main()
{
    double pi,sum=0.0,t1,t2;
    step = 1.0/(double) num_steps;  

    t1 = omp_get_wtime();
    omp_set_num_threads(NUM_THREADS);
    #pragma omp parallel
    {
        int i,id, nthrds; double x, s = 0.0;
        id = omp_get_thread_num();
        nthrds = omp_get_num_threads();
        for (i=id;i<num_steps;i=i+nthrds) {
            x = (i+0.5)*step;
            s = s + 4.0/(1.0+x*x);
        }
    #pragma omp critical
        sum += s;
    }
    pi = sum * step;
    t2 = omp_get_wtime();
    printf("pi: %f \n", pi);
    printf("finished in %f milisecond\n", (t2-t1)*1000);
}

经过测试,它的速度和Padding后的速度相当。

Atomic

Atomic,就是原子的,这里你可能疑问,和Critical有啥区别?其实主要是做到了速度和功能性的均衡,OpenMP才设计了两种相似的方案。Atomic操作的开销较低,它利用硬件提供的原子操作(如原子增量),因此不需要在进入/退出代码行时锁定/解锁。但是,Atomic支持的操作集受到限制。相比之下,Critical部分完全通用,可以包围任意的代码块。但是,每次线程进入和退出临界区时都会产生显著的开销(除了序列化的固有成本)。

#include <cstdio>
#include <omp.h>

#define NUM_THREADS 1
static long num_steps = 100000;
double step;

int main()
{
    double pi,sum=0.0,t1,t2;
    step = 1.0/(double) num_steps;  

    t1 = omp_get_wtime();
    omp_set_num_threads(NUM_THREADS);
    #pragma omp parallel
    {
        int i,id, nthrds; double x, s = 0.0;
        id = omp_get_thread_num();
        nthrds = omp_get_num_threads();
        for (i=id;i<num_steps;i=i+nthrds) {
            x = (i+0.5)*step;
            s = s + 4.0/(1.0+x*x);
        }
    #pragma omp atomic
        sum += s;
    }
    pi = sum * step;
    t2 = omp_get_wtime();
    printf("pi: %f \n", pi);
    printf("finished in %f milisecond\n", (t2-t1)*1000);
}

你可以注意到速度显著提升了。

Reduce

reduce用于将多个线程中的私有变量进行归约操作(如求和、求积等),并将结果存储在共享变量中。与critical不同,它不会阻止多个线程并发执行。

我们可以看看标准库的std::reduce是如何实现的,标准库的std::reduce传入BinaryOp和init_value,然后传入可选的ExecutionPolicy(std::reduce也支持并行哦)。注意到标准库的实现方式是模板Op,只要这个Functor满足交换律且结合律,就可以正确的执行。

#include <iostream>
#include <chrono>
#include <vector>
#include <numeric>
#include <execution>

int main()
{ 
    std::vector<double> v(10'000'007, 0.5);
    auto t1 = std::chrono::high_resolution_clock::now();
    double result = std::reduce(std::execution::par, v.begin(), v.end());
    auto t2 = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> ms = t2 - t1;
    std::cout << "std::reduce result "
              << result << " took " << ms.count() << " ms\n";
}

好消息是:在高版本的OpenMP中也允许用户自定义reduce.

我们把上面Atomic的求PI的代码改成Reduce版本。

#include <cstdio>
#include <omp.h>

#define NUM_THREADS 1
static long num_steps = 100000;
double step;

int main()
{
    double pi,sum=0.0,t1,t2;
    step = 1.0/(double) num_steps;  

    t1 = omp_get_wtime();
    omp_set_num_threads(NUM_THREADS);
    #pragma omp parallel
    {
        int i,id, nthrds; double x, s = 0.0;
        id = omp_get_thread_num();
        nthrds = omp_get_num_threads();
        for (i=id;i<num_steps;i=i+nthrds) {
            x = (i+0.5)*step;
            s = s + 4.0/(1.0+x*x);
        }
        #pragma omp reduction(+:sum)
        sum += s;
    }
    pi = sum * step;
    t2 = omp_get_wtime();
    printf("pi: %f \n", pi);
    printf("finished in %f milisecond\n", (t2-t1)*1000);
}

我们可以使用用户自定义的Reduce

#include <stdio.h>
#include <omp.h>

#pragma omp declare reduction(my_max: int: \
    omp_out = omp_in > omp_out ? omp_in : omp_out) \
    initializer(omp_priv = INT_MIN)

int main() {
    int data[] = {1, 2, 3, 4, 5};
    int max_val = 6;

    #pragma omp parallel for reduction(my_max: max_val)
    for (int i = 0; i < 5; i++) {
        max_val = data[i] > max_val ? data[i] : max_val;
    }

    printf("max_val: %d\n", max_val);// 6

    return 0;
}

在这里的(my_max:int...)的类型指定max_val的类型,omp_out是Reduce变量是输出值,这里即max_val;omp_in即每个线程的局部变量,这里的表达式中omp_in之间不需要相互相同(即使它们是同一个变量名omp_in);对于initializer(INT_MIN),它的意思是来将每个线程中的私有max_val变量初始化为INT_MIN。这里请注意,我们的初始变量是6,所以在reduce结束前,OpenMP会和最初的值进行Reduce.最终输出的结果将是6(正确结果)。

Schedule

在OpenMP中,schedule子句用于指定循环迭代的调度方式。它允许控制循环迭代如何分配给线程。

一般来说,schedule子句有三种类型:static、dynamic和guided,还有两种不太常用的类型就是auto和runtime。

  • static调度:在这种调度方式下,循环迭代被均匀地分配给线程。每个线程都会获得相同数量的迭代。如果指定了块大小,那么每个线程都会获得一个大小为块大小的连续迭代块。如果没有指定块大小,那么OpenMP会自动计算一个合适的块大小。

  • dynamic调度:在这种调度方式下,循环迭代被动态地分配给线程。每个线程都会获得一个大小为块大小的连续迭代块。当一个线程完成了它当前的迭代块后,它会请求更多的迭代。如果指定了块大小,那么每个线程都会获得一个大小为块大小的连续迭代块。如果没有指定块大小,那么默认的块大小为1。

  • guided调度:在这种调度方式下,循环迭代也被动态地分配给线程。但是,与dynamic调度不同的- 是,块大小会随着时间而减小。初始块大小等于剩余迭代次数除以线程数的商。每次分配一个新的迭代块时,块大小都会减半,直到达到指定的最小值。如果指定了最小值,那么最小值就是最小块大小。如果没有指定最小值,那么默认的最小值为1。

  • auto调度:在这种调度方式下,循环迭代的分配方式由编译器或运行时系统自动决定。这意味着无法控制循环迭代的分配方式。这种调度方式适用于那些不确定哪种调度方式最优的情况。

  • runtime调度:在这种调度方式下,循环迭代的分配方式由环境变量OMP_SCHEDULE决定。可以在运行时设置这个环境变量来指定循环迭代的调度方式。

因为上述的逻辑非常简单,下面只做语法上的示范。

#include <stdio.h>
#include <omp.h>

int main() {
    printf("static:\n");
    #pragma omp parallel for schedule(static)
    for (int i = 0; i < 10; i++) {
        printf("Thread %d: %d\n", omp_get_thread_num(), i);
    }

    printf("\ndynamic:\n");
    #pragma omp parallel for schedule(dynamic)
    for (int i = 0; i < 10; i++) {
        printf("Thread %d: %d\n", omp_get_thread_num(), i);
    }

    printf("\nguided:\n");
    #pragma omp parallel for schedule(guided)
    for (int i = 0; i < 10; i++) {
        printf("Thread %d: %d\n", omp_get_thread_num(), i);
    }

    return 0;
}

后记

OpenMP是非常强大的工具,这里只介绍简单的应用,后面我们会讨论到它真正强大的功能。

最后更新于 2023-05-03