导图社区 afid代码部分思维导图
荷兰特温特大学2015年在GitHub上开源的代码,求解三维RB对流的Fortran代码,介绍详细,描述全面,希望对感兴趣的小伙伴有所帮助!
编辑于2025-01-10 10:15:30main.f90
几个模块:param 一些参数 local_arrays, only: vx,vy,vz,temp,pr 一些数组 hdf5 文本库 decomp_2d 一些初始操作 decomp_2d_fft 和快速傅里叶有关 stat_arrays, only: nstatsamples
integer :: errorcode, nthreads 错误代码和 real :: instCFL,dmax 和CFL条件数有关的 real :: ti(2), tin(3), minwtdt 好像是时间有关的 real :: ts integer :: prow=0,pcol=0 二维铅笔域分解的行列进程个数 integer :: lfactor,lfactor2 快速傅里叶变换的因子 character(100) :: arg debug用的好像
call ReadInputFile
character(len=4) :: dummy 阅读文件时的标注变量的那一行的字符串被跳过 integer flagstat,flagbal,stst3flag logical fexist
nxm, nym, nzm,nsst, nread 三个方向网格数,时间推进开关, 流场数据读取开关 64 64 128 3 0 ntst, walltimemax, tout, tmax, ireset 模拟的最大时间步,模拟最大时间,统计数据的时间间隔,最大无量纲时间,时间重置开关 5000 83000 1e-2 50.0 0 alx3, istr3, str3 x长度,x方向网格拉伸开关,网格剪切系数 1.0000 6 12.0 ylen, zlen y和z方向实际长度 0.5 0.5 ray, pra, dt, resid, limitCFL Ra数,Pr数,初始时刻的时间步长,最大剩余速度散度和最大CFL数 1e+6 1.0 1e-8 1.e-2 1.2 flagstat, flagbal, tsta, starea 逻辑变量,都是统计量开关 1 1 0 0 inslws, inslwn 下壁面速度边界条件,上壁面速度边界条件,无滑移 1 1 idtv, dtmin, dtmax, limitVel 变和不变时间步长开关和最大最小值和最大速度 1 1e-8 0.01 1e3 stst3flag 不等于0代表查询stst3.in文件是否存在 0
nx=nxm+1 x方向网格点个数 ny=nym+1 y方向网格点个数 nz=nzm+1 z方向网格点个数 ren = dsqrt(ray/pra) Re数定义 pec = dsqrt(pra*ray) 温度粘性项前面的系数的倒数 pi=2.d0*dasin(1.d0) statcal = .true. disscal = .true.
通过command_argument_count得到命令行的行列进程prow和pcol,如果没有给也没关系,后面有自动给
call decomp_2d_init(nxm,nym,nzm,prow,pcol, (/ .false.,.true.,.true. /)) nxm, nym, nzm为网格数
call decomp_2d_init(nxm,nym,nzm,prow,pcol, (/ .false.,.true.,.true. /)) nxm, nym, nzm为网格数 subroutine decomp_2d_init(nx,ny,nz,p_row,p_col,periodic_bc) nx, ny, nz为网格数,p_row,p_col为行列进程,periodic_bc为进程和计算域的周期开关 integer, intent(IN) :: nx,ny,nz,p_row,p_col 输入xyz网格数和行列进程 logical, dimension(3), intent(IN), optional :: periodic_bc 可选参数,可输入可不输入 integer :: errorcode, ierror, row, col 子程序内部使用的参数 nx_global = nx ny_global = ny 还是网格数 nz_global = nz periodic_x = periodic_bc(1) periodic_y = periodic_bc(2) (F, T, T) periodic_z = periodic_bc(3) call MPI_INIT(ierror) call MPI_COMM_RANK(MPI_COMM_WORLD,nrank,ierror) nrank当前进程的标识符 call MPI_COMM_SIZE(MPI_COMM_WORLD,nproc,ierror) nproc进程总个数 if (p_row==0 .and. p_col==0) then 行列进程命令行没给的话,寻找最佳的进程分配 call best_2d_grid(nproc, row, col) else if (nproc /= p_row*p_col) then 报错 errorcode = 1 call decomp_2d_abort(errorcode, & 'Invalid 2D processor grid - nproc /= p_row*p_col') else row = p_row 给定的进程分布 col = p_col if (nrank==0) then 输出到屏幕上 write(*,*) 'the used processor grid is ', row,' by ', col end if end if end if dims(1) = row dims(2) = col periodic(1) = periodic_y X铅笔的二维笛卡尔拓扑 (T,T) periodic(2) = periodic_z call MPI_CART_CREATE(MPI_COMM_WORLD,2,dims,periodic, .false., DECOMP_2D_COMM_CART_X, ierror) periodic(1) = periodic_x Y铅笔的二维笛卡尔拓扑 (F,T) periodic(2) = periodic_z call MPI_CART_CREATE(MPI_COMM_WORLD,2,dims,periodic, .false., DECOMP_2D_COMM_CART_Y, ierror) periodic(1) = periodic_x Z铅笔的二维笛卡尔拓扑 (F,T) periodic(2) = periodic_y call MPI_CART_CREATE(MPI_COMM_WORLD,2,dims,periodic, .false., DECOMP_2D_COMM_CART_Z, ierror) call MPI_CART_COORDS(DECOMP_2D_COMM_CART_X,nrank,2,coord,ierror) 获得X铅笔的进程坐标 call MPI_CART_SUB(DECOMP_2D_COMM_CART_X,(/.true.,.false./), DECOMP_2D_COMM_COL,ierror) 列通信子组 call MPI_CART_SUB(DECOMP_2D_COMM_CART_X,(/.false.,.true./), DECOMP_2D_COMM_ROW,ierror) 行通信子组 call init_neighbour 获得邻居进程 call decomp_info_init(nx,ny,nz,decomp_main) 上面用过,不展示 xstart = decomp_main%xst decomp_main就是decomp ystart = decomp_main%yst 存储的都是三个分量的一维数组 zstart = decomp_main%zst xend = decomp_main%xen yend = decomp_main%yen zend = decomp_main%zen xsize = decomp_main%xsz ysize = decomp_main%ysz zsize = decomp_main%zsz decomp_ph=decomp_main 这边都是decomp call MPI_TYPE_SIZE(real_type,mytype_bytes,ierror) 存储real_type的字节大小给mytype_bytes return
call best_2d_grid(nproc, row, col) 总进程和行列进程 subroutine best_2d_grid(iproc, best_p_row,best_p_col) iproc总进程, best_p_row,best_p_col行列进程 integer, intent(IN) :: iproc 输入总进程个数 integer, intent(OUT) :: best_p_row, best_p_col 输出最好的行列进程分布 integer, allocatable, dimension(:) :: factors 所有因子 double precision :: t1, t2, best_time 时间 integer :: nfact, i, row, col, ierror, errorcode 因子个数和中间使用的参数(不重要) real(mytype), allocatable, dimension(:,:,:) :: u1, u2, u3 mytype 表示双精度浮点数,用 于后续代码中一致性地定义数据类型。 TYPE(DECOMP_INFO) :: decomp 一个数据体 if (nrank==0) write(*,*) 'In auto-tuning mode......' best_time = huge(t1) 初始化为一个极大值,用于后续比较 best_p_row = -1 -1是为了选不到最好的分布而报错 best_p_col = -1 i = int(sqrt(real(iproc))) + 10 表示因子的上限,因为任何数的因子对称地分布在 1 到其平方根之间。 allocate(factors(i)) 存储因子的数组 call findfactor(iproc, factors, nfact) 寻找所有的因子的子程序 if (nrank==0) write(*,*) 'factors: ', (factors(i), i=1,nfact) 主进程输出所有的因子 row = factors(1) 行进程数 板分解 col = iproc / row 列进程数 if (min(nx_global,ny_global)>=row .and. & 得保证网格数比进程数多 min(ny_global,nz_global)>=col) then dims(1) = row 行列进程分布 dims(2) = col periodic(1) = .false. cpu分布在两个方向是不周期的 periodic(2) = .false. cpu分布在两个方向是不周期的 call MPI_CART_CREATE(MPI_COMM_WORLD,2,dims,periodic, .false.,DECOMP_2D_COMM_CART_X, ierror) 创建一个笛卡尔网格拓扑结构 call MPI_CART_COORDS(DECOMP_2D_COMM_CART_X,nrank,2,coord,ierror) 当前进程在 2D 拓扑结构中的坐标 call MPI_CART_SUB(DECOMP_2D_COMM_CART_X,(/.true.,.false./), & 创建通信子组 DECOMP_2D_COMM_COL,ierror) DECOMP_2D_COMM_COL:列通信器,每一列上的所有进程在这个通信器中。 call MPI_CART_SUB(DECOMP_2D_COMM_CART_X,(/.false.,.true./), & 行通信器 DECOMP_2D_COMM_ROW,ierror) call decomp_info_init(nx_global,ny_global,nz_global,decomp) allocate(u1(decomp%xsz(1),decomp%xsz(2),decomp%xsz(3))) 分配内存,从1开始 allocate(u2(decomp%ysz(1),decomp%ysz(2),decomp%ysz(3))) x笔和y笔的铅笔块 t1 = MPI_WTIME() 这边为铅笔之间的转化计时 call transpose_x_to_y(u1,u2,decomp) call transpose_y_to_x(u2,u1,decomp) t2 = MPI_WTIME() - t1 统计的时长 deallocate(u1,u2) 释放u1,u2 call decomp_info_finalize(decomp) 释放所有的定义的数组 call MPI_Comm_free(DECOMP_2D_COMM_COL,ierror) 释放定义的通信组 call MPI_Comm_free(DECOMP_2D_COMM_ROW,ierror) 释放定义的通信组 call MPI_Comm_free(DECOMP_2D_COMM_CART_X, ierror) 释放定义的通信组 end if do i=1, nfact nfact是因子个数,是遍历所有的因子组合 row = factors(i) 行列进程分布 col = iproc / row if (min(nx_global,ny_global)>=row .and. min(ny_global,nz_global)>=col) then 确保能分配 dims(1) = row dims(2) = col periodic(1) = .false. periodic(2) = .false. call MPI_CART_CREATE(MPI_COMM_WORLD,2,dims,periodic, .false.,DECOMP_2D_COMM_CART_X, ierror) 二维笛卡尔拓扑 call MPI_CART_COORDS(DECOMP_2D_COMM_CART_X,nrank,2,coord,ierror) 对应的coord call MPI_CART_SUB(DECOMP_2D_COMM_CART_X,(/.true.,.false./), DECOMP_2D_COMM_COL,ierror) 通信子组 call MPI_CART_SUB(DECOMP_2D_COMM_CART_X,(/.false.,.true./), DECOMP_2D_COMM_ROW,ierror) 通信子组 call decomp_info_init(nx_global,ny_global,nz_global,decomp) allocate(u1(decomp%xsz(1),decomp%xsz(2),decomp%xsz(3))) 分配内存 allocate(u2(decomp%ysz(1),decomp%ysz(2),decomp%ysz(3))) allocate(u3(decomp%zsz(1),decomp%zsz(2),decomp%zsz(3))) t1 = MPI_WTIME() 计时 call transpose_x_to_y(u1,u2,decomp) call transpose_y_to_z(u2,u3,decomp) call transpose_z_to_y(u3,u2,decomp) call transpose_y_to_x(u2,u1,decomp) t2 = MPI_WTIME() - t1 deallocate(u1,u2,u3) call decomp_info_finalize(decomp) call MPI_Comm_free(DECOMP_2D_COMM_COL,ierror) call MPI_Comm_free(DECOMP_2D_COMM_ROW,ierror) call MPI_Comm_free(DECOMP_2D_COMM_CART_X, ierror) call MPI_ALLREDUCE(t2,t1,1,MPI_DOUBLE_PRECISION,MPI_SUM, MPI_COMM_WORLD,ierror) 归约操作,所有进程的时间求和 t1 = t1 / dble(nproc) 求每个进程平均时间 if (nrank==0) then write(*,*) 'processor grid', row, ' by ', col, ' time=', t1 屏幕上输出不同进程分布的平均时间 end if if (best_time > t1) then 寻找最小的平均时间以及最优的进程分布 best_time = t1 best_p_row = row best_p_col = col end if end if 在保证进程能分配到网格的判断条件下 end do 遍历所有进程分布 deallocate(factors) 释放内存 if (best_p_row/=-1) then 找到最优进程 if (nrank==0) then write(*,*) 'the best processor grid is probably ', & best_p_row, ' by ', best_p_col end if else errorcode = 9 没找到,报错 call decomp_2d_abort(errorcode, & 'The processor-grid auto-tuning code failed. ' // & 'The number of processes requested is probably too large.') end if return
TYPE, public :: DECOMP_INFO integer, dimension(3) :: xst, xen, xsz x代表铅笔,每个进程在三个方向的开始终止序列以及大小 integer, dimension(3) :: yst, yen, ysz y代表铅笔,每个进程在三个方向的开始终止序列以及大小 integer, dimension(3) :: zst, zen, zsz z代表铅笔,每个进程在三个方向的开始终止序列以及大小 integer, allocatable, dimension(:) :: & x1dist, y1dist, y2dist, z2dist, x2dist, z1dist, & x1st, y1st, y2st, z2st, x2st, z1st, & x代表维度,1代表行进程分解,2代表列进程分解 x1en, y1en, y2en, z2en, x2en, z1en 每个方向的不同的进程分解下的的开始终止序列以及大小 integer, allocatable, dimension(:) :: & x代表铅笔, 1代表铅笔那个维度的进程分解 x1cnts, y1cnts, y2cnts, z2cnts, x2cnts, z1cnts 每个铅笔在铅笔那个维度分解之后的每个块的大小 integer, allocatable, dimension(:) :: & x1disp, y1disp, y2disp, z2disp, x2disp, z1disp 每个铅笔在铅笔那个维度分解之后的每个块的偏移大小 integer :: x1count, y1count, y2count, z2count, x2count, z1count 每个铅笔在铅笔那个维度分解之后最大块的大小 integer,dimension(:),allocatable::zcnts_xz,ztypes_xz integer,dimension(:),allocatable::xcnts_xz,xtypes_xz x和z铅笔之间的交换 integer,dimension(:),allocatable::zdispls_xz,xdispls_xz logical :: even END TYPE DECOMP_INFO
call findfactor(iproc, factors, nfact) 总进程个数, 因子数组和因子个数 subroutine findfactor(num, factors, nfact) num总进程个数,factors, nfact因子数组和因子个数 integer, intent(IN) :: num 输入总进程大小 integer, intent(OUT), dimension(*) :: factors 输出所有的因子分解 integer, intent(OUT) :: nfact 因子数量 integer :: i, m 接下来以sqrt(num)为界限,算出所有的因子,存储在factors中,因子个数为nfact,比较简单,不展示
call decomp_info_init(nx_global,ny_global,nz_global,decomp)三个方向网格数和数据体 subroutine decomp_info_init(nx,ny,nz,decomp) nx,ny,nz,decomp 三个方向网格数和数据体 integer, intent(IN) :: nx,ny,nz xyz网格数 TYPE(DECOMP_INFO), intent(INOUT) :: decomp 数据体 integer :: buf_size, status, errorcode if (nx<dims(1) .or. ny<dims(1) .or. ny<dims(2) .or. nz<dims(2)) then 每个进程个数不能超过网格数 errorcode = 6 call decomp_2d_abort(errorcode, & 'Invalid 2D processor grid. ' // & 'Make sure that min(nx,ny) >= p_row and ' // & 'min(ny,nz) >= p_col') end if if (mod(nx,dims(1))==0 .and. mod(ny,dims(1))==0 .and. & mod(ny,dims(2))==0 .and. mod(nz,dims(2))==0) then decomp%even = .true. 能否被进程均匀分配,能就ture else decomp%even = .false. end if allocate(decomp%x1dist(0:dims(1)-1),decomp%y1dist(0:dims(1)-1), & 分配内存,注意数组从0开始 decomp%y2dist(0:dims(2)-1),decomp%z2dist(0:dims(2)-1), & decomp%x2dist(0:dims(2)-1),decomp%z1dist(0:dims(1)-1)) allocate(decomp%x1st(0:dims(1)-1),decomp%x1en(0:dims(1)-1), & decomp%y1st(0:dims(1)-1),decomp%y1en(0:dims(1)-1), & decomp%z1st(0:dims(1)-1),decomp%z1en(0:dims(1)-1)) allocate(decomp%x2st(0:dims(2)-1),decomp%x2en(0:dims(2)-1), & decomp%y2st(0:dims(2)-1),decomp%y2en(0:dims(2)-1), & decomp%z2st(0:dims(2)-1),decomp%z2en(0:dims(2)-1)) call get_dist(nx,ny,nz,decomp) call partition(nx, ny, nz, (/ 1,2,3 /), & x不分解,y行分解,z列分解 decomp%xst, decomp%xen, decomp%xsz) 每个进程有自己的xst,xen,xsz,第一个分量是对应x维度的范围,第二个是y的,第三个是z的 call partition(nx, ny, nz, (/ 2,1,3 /), & x行分解,y不分解,z列分解, decomp%yst, decomp%yen, decomp%ysz) call partition(nx, ny, nz, (/ 2,3,1 /), & x行分解,y列分解,z不分解 decomp%zst, decomp%zen, decomp%zsz) allocate(decomp%x1cnts(0:dims(1)-1),decomp%y1cnts(0:dims(1)-1), & 分配内存,注意数组从0开始 decomp%y2cnts(0:dims(2)-1),decomp%z2cnts(0:dims(2)-1), & decomp%z1cnts(0:dims(1)-1),decomp%x2cnts(0:dims(2)-1)) x代表铅笔, 1代表铅笔那个维度的进程分解 allocate(decomp%x1disp(0:dims(1)-1),decomp%y1disp(0:dims(1)-1), & 每个铅笔在铅笔那个维度分解之后的每个块的大小 decomp%y2disp(0:dims(2)-1),decomp%z2disp(0:dims(2)-1), & 每个铅笔在铅笔那个维度分解之后的每个块的偏移大小 decomp%x2disp(0:dims(2)-1),decomp%z1disp(0:dims(1)-1)) allocate(decomp%xcnts_xz(nproc),decomp%zcnts_xz(nproc)) x和z铅笔之间的交换,注意和上面数组不一样 allocate(decomp%xtypes_xz(nproc),decomp%ztypes_xz(nproc)) allocate(decomp%xdispls_xz(nproc)) allocate(decomp%zdispls_xz(nproc)) call prepare_buffer(decomp) buf_size = max(decomp%xsz(1)*decomp%xsz(2)*decomp%xsz(3), & max(decomp%ysz(1)*decomp%ysz(2)*decomp%ysz(3), & decomp%zsz(1)*decomp%zsz(2)*decomp%zsz(3)) ) 每一个进程在三个铅笔域中最大的笔 if (buf_size > decomp_buf_size) then decomp_buf_size = buf_size decomp_buf_size取最大的笔 if (allocated(work1_r)) deallocate(work1_r) if (allocated(work2_r)) deallocate(work2_r) if (allocated(work1_c)) deallocate(work1_c) if (allocated(work2_c)) deallocate(work2_c) allocate(work1_r(buf_size), STAT=status) allocate(work2_r(buf_size), STAT=status) 分配内存,这边数组从1开始,大小取最大的笔 allocate(work1_c(buf_size), STAT=status) allocate(work2_c(buf_size), STAT=status) end if return
call get_dist(nx,ny,nz,decomp) 网格数和数据体 subroutine get_dist(nx,ny,nz,decomp) 网格数和数据体 integer, intent(IN) :: nx, ny, nz TYPE(DECOMP_INFO), intent(INOUT) :: decomp call distribute(nx,dims(1),decomp%x1st,decomp%x1en,decomp%x1dist) !x1st,x1en分别为x方向第一个维度网格数开始和结束索引数,x1dist为第一个维度网格数在每个进程中的数据块的大小 call distribute(ny,dims(1),decomp%y1st,decomp%y1en,decomp%y1dist) !y1st,y1en分别为y方向第一个维度网格数开始和结束索引数,y1dist为第一个维度网格数在每个进程中的数据块的大小 call distribute(ny,dims(2),decomp%y2st,decomp%y2en,decomp%y2dist) !y2st,y2en分别为y方向第二个维度网格数开始和结束索引数,y2dist为第二个维度网格数在每个进程中的数据块的大小 call distribute(nz,dims(2),decomp%z2st,decomp%z2en,decomp%z2dist) end
call distribute(nx,dims(1),decomp%x1st,decomp%x1en,decomp%x1dist) x方向网格数,进程第一个维度大小,以及x方向在行进程下的每一段开始结束索引和大小 subroutine distribute(data1,proc,st,en,sz) data1是网格数,proc是行进程个数,st,en,sz每个方向在进程上的每一段开始结束索引和大小 integer data1,proc,st(0:proc-1),en(0:proc-1),sz(0:proc-1) 数组从0开始,st(0)从1开始 integer i,size1,nl,nu 如果能均匀分配,每一段大小一样 如果不能均匀分配,多的几个网格,从后往前,每个进程多一个,所以靠后的进程的size要比前面的大1 比较简单,不展示 end
call partition(nx, ny, nz, (/ 1,2,3 /), decomp%xst, decomp%xen, decomp%xsz) nx, ny, nz网格数,(/ 1,2,3 /)进程在每个维度上的分配策略,以及x代表铅笔,每个进程在三个方向的开始终止序列以及大小 subroutine partition(nx, ny, nz, pdim, lstart, lend, lsize) nx, ny, nz网格数,pdimpdim,lstart, lend, lsize每个进程在三个方向的开始终止序列以及大小 integer, intent(IN) :: nx, ny, nz integer, dimension(3), intent(IN) :: pdim !指定每个维度的分配策略,值为 1 表示不分配,2 表示沿某个方向的第一维度(行)分配,3 表示沿某个方向的第二维度(列)分配。 integer, dimension(3), intent(OUT) :: lstart, lend, lsize integer, allocatable, dimension(:) :: st,en,sz integer :: i, gsize 给出了三种铅笔分解的每个维度在不同的进程分配下的,每个进程在三个方向的开始终止序列以及大小 比较简单,不展示 end
call prepare_buffer(decomp) 数据体 subroutine prepare_buffer(decomp) 数据体 TYPE(DECOMP_INFO), intent(INOUT) :: decomp integer :: i, k integer :: rank_x, rank_z integer :: subsize_y, offset_y integer :: ierror do i=0, dims(1)-1 decomp%x1cnts(i) = decomp%x1dist(i)*decomp%xsz(2)*decomp%xsz(3) x笔下,x方向采用行分解,对应的每个进程在笔方向的各个块的大小 decomp%y1cnts(i) = decomp%ysz(1)*decomp%y1dist(i)*decomp%ysz(3) y笔下,y方向采用行分解,对应的每个进程在笔方向的各个块的大小 decomp%z1cnts(i) = decomp%zsz(1)*decomp%zsz(2)*decomp%z1dist(i) z笔下,z方向采用行分解,对应的每个进程在笔方向的各个块的大小 if (i==0) then decomp%x1disp(i) = 0 decomp%y1disp(i) = 0 第一块偏移为0 decomp%z1disp(i) = 0 else decomp%x1disp(i) = decomp%x1disp(i-1) + decomp%x1cnts(i-1) decomp%y1disp(i) = decomp%y1disp(i-1) + decomp%y1cnts(i-1) 其余块分别为前面块的总和 decomp%z1disp(i) = decomp%z1disp(i-1) + decomp%z1cnts(i-1) end decomp%x2cnts(i) x笔下,x方向采用列分解,对应的每个进程在笔方向的各个块的大小 decomp%y2cnts(i) 和前面一样,y笔下,y方向采用列分解,对应的每个进程在笔方向的各个块的大小 decomp%z2cnts(i) z笔下,z方向采用列分解,对应的每个进程在笔方向的各个块的大小 decomp%x2disp(i) decomp%y2disp(i) 和前面一样 decomp%z2disp(i) x笔和y笔之间的消息传递 decomp%x1count = decomp%x1dist(dims(1)-1) * decomp%y1dist(dims(1)-1) * decomp%xsz(3) x笔中,x方向行分解最后一个块和y行分解最后一个块以及z的列分解的所有块 decomp%y1count = decomp%x1count x行,y行,z列 y笔和z笔之间的消息传递 decomp%y2count = decomp%y2dist(dims(2)-1) *decomp%z2dist(dims(2)-1) * decomp%zsz(1) z笔中,x方向行分解的所有块和y列分解最后一个块以及z的列分解的最后一个块 decomp%z2count = decomp%y2count x行,y列,z列 在傅里叶空间,x笔和z笔之间的消息传递 decomp%xdispls_xz(:)=0 decomp%zdispls_xz(:)=0 decomp%xcnts_xz(:)=0 decomp%zcnts_xz(:)=0 decomp%xtypes_xz(:)=MPI_INTEGER decomp%ztypes_xz(:)=MPI_INTEGER do k=0,dims(1)-1 do i=0,dims(2)-1 call MPI_Cart_rank(DECOMP_2D_COMM_CART_X,(/k,i/),rank_x,ierror) 根据(/k,i/),找到CART_X中该进程的rank rank_z=rank_x if (decomp%zst(2).le.decomp%y1en(k) .and. decomp%zen(2).ge.decomp%y1st(k)) then z笔的y方向列分解的所有块的开始索引小于等于y行分解的第一个块的结束索引 以及z笔的y方向列分解的所有块的结束索引大于等于y行分解的第一个块的开始索引 decomp%zcnts_xz(rank_z+1)=1 subsize_y=min(decomp%zen(2),decomp%y1en(k))-max(decomp%zst(2),decomp%y1st(k))+1 两个区间的覆盖区域 offset_y =max(decomp%zst(2),decomp%y1st(k))-decomp%zst(2) 偏移,对于第一个块,无偏移或者两个区间的前置区域 call MPI_Type_create_subarray(3,decomp%zsz, (/decomp%zsz(1),subsize_y,decomp%z2dist(i)/), (/0,offset_y,decomp%z2st(i)-decomp%zst(3)/), MPI_ORDER_FORTRAN,complex_type,decomp%ztypes_xz(rank_z+1),ierror) 在decomp%zsz的数组下,创建子数组,用decomp%ztypes_xz作为句柄,目的是为了z笔给x笔传输消息 call MPI_Type_commit(decomp%ztypes_xz(rank_z+1),ierror) 提交到 MPI 系统,使其可以用于通信 endif if (decomp%xst(2).le.decomp%y2en(i) .and. decomp%xen(2).ge.decomp%y2st(i)) then 和上面类似 decomp%xcnts_xz(rank_x+1)=1 subsize_y=min(decomp%xen(2),decomp%y2en(i))-max(decomp%xst(2),decomp%y2st(i))+1 offset_y =max(decomp%xst(2),decomp%y2st(i))-decomp%xst(2) call MPI_Type_create_subarray(3,decomp%xsz, (/decomp%x1dist(k),subsize_y,decomp%xsz(3)/), (/decomp%x1st(k)-decomp%xst(1),offset_y,0/), & MPI_ORDER_FORTRAN,complex_type,decomp%xtypes_xz(rank_x+1),ierror) call MPI_Type_commit(decomp%xtypes_xz(rank_x+1),ierror) endif enddo enddo return
call transpose_x_to_y(u1,u2,decomp) u1输入x笔的铅笔快,u2输出y铅笔的铅笔块,数据体 interface transpose_x_to_y 接口 module procedure transpose_x_to_y_real module procedure transpose_x_to_y_complex end interface transpose_x_to_y subroutine transpose_x_to_y_real(src, dst, opt_decomp) 输入src,输出dst,可选的数据体opt_decomp real(mytype), dimension(:,:,:), intent(IN) :: src 双精度 real(mytype), dimension(:,:,:), intent(OUT) :: dst 双精度 TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp 可选参数,可以选择传递该参数,也可以不传递。 TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then 确保数据体为decomp decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) 两个三维数组的各个维度的大小,或者说是每个进程下铅笔块的大小 s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call mem_split_xy_real(src, s1, s2, s3, work1_r, dims(1), decomp%x1dist, decomp) 将src转换成一个一维向量 call MPI_ALLTOALLV(work1_r, decomp%x1cnts, decomp%x1disp, real_type, work2_r, decomp%y1cnts, decomp%y1disp, real_type, DECOMP_2D_COMM_COL, ierror) 采用全收发从x铅笔到y铅笔,是非规则的,传输数据大小可以不一样 call mem_merge_xy_real(work2_r, d1, d2, d3, dst, dims(1), decomp%y1dist, decomp) 将work2_r转换成一个铅笔块 return subroutine transpose_x_to_y_complex(src, dst, opt_decomp) complex(mytype), dimension(:,:,:), intent(IN) :: src complex(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call mem_split_xy_complex(src, s1, s2, s3, work1_c, dims(1), decomp%x1dist, decomp) call MPI_ALLTOALLV(work1_c, decomp%x1cnts, decomp%x1disp, complex_type, work2_c, decomp%y1cnts, decomp%y1disp, complex_type, DECOMP_2D_COMM_COL, ierror) call mem_merge_xy_complex(work2_c, d1, d2, d3, dst, dims(1), decomp%y1dist, decomp) return
call mem_split_xy_real(src, s1, s2, s3, work1_r, dims(1), decomp%x1dist, decomp) 输入数组src,s123是src的三个方向维度,work1_r输出数组,一个一维向量,dims(1)为x方向的行进程分布,x1dist为x行进程分布的各个段的大小,数据体 subroutine mem_split_xy_real(in,n1,n2,n3,out,iproc,dist,decomp) in输入数组,n123是in的三个方向维度,out输出数组,一个一维向量,iproc为x方向的行进程分布,dist为x行进程分布的各个段的大小,数据体 integer, intent(IN) :: n1,n2,n3 real(mytype), dimension(1:n1,1:n2,1:n3), intent(IN) :: in real(mytype), dimension(*), intent(OUT) :: out integer, intent(IN) :: iproc integer, dimension(0:iproc-1), intent(IN) :: dist TYPE(DECOMP_INFO), intent(IN) :: decomp integer :: i,j,k, m,i1,i2,pos do m=0,iproc-1 if (m==0) then i1和i2代表x铅笔中,x方向每一个块的开始结束索引 i1 = 1 i2 = dist(0) else i1 = i2+1 i2 = i1+dist(m)-1 end if pos = decomp%x1disp(m) + 1 每一个块的偏移量,加上1是为了确保从下个索引开始 do k=1,n3 do j=1,n2 do i=i1,i2 out(pos) = in(i,j,k) 把in的数据按照xyz的顺序提取和存储,顺序采用铅笔块的顺序,相当于每一个三维数组的铅笔被存储成了一个一维向量, pos = pos + 1 但不是按照整个铅笔的xyz顺序,而是大顺序采用一个个铅笔块,每个铅笔块采用xyz的顺序 end do end do end do end do return
call mem_merge_xy_real(work2_r, d1, d2, d3, dst, dims(1), decomp%y1dist, decomp) 输入数组work2_r,d123是dst的三个方向维度,dst输出数组,一个三维矩阵,铅笔块,dims(1)为y方向的行进程分布,y1dist为y行进程分布的各个段的大小,数据体 subroutine mem_merge_xy_real(in,n1,n2,n3,out,iproc,dist,decomp) 输入数组in,n123是out的三个方向维度,out输出数组,一个三维矩阵,铅笔块,iproc为y方向的行进程分布,dist为y行进程分布的各个段的大小,数据体 integer, intent(IN) :: n1,n2,n3 real(mytype), dimension(*), intent(IN) :: in real(mytype), dimension(1:n1,1:n2,1:n3), intent(OUT) :: out integer, intent(IN) :: iproc integer, dimension(0:iproc-1), intent(IN) :: dist TYPE(DECOMP_INFO), intent(IN) :: decomp integer :: i,j,k, m,i1,i2, pos do m=0,iproc-1 和上面几乎一样,不解释 if (m==0) then i1 = 1 i2 = dist(0) else i1 = i2+1 i2 = i1+dist(m)-1 end if pos = decomp%y1disp(m) + 1 do k=1,n3 do j=i1,i2 do i=1,n1 out(i,j,k) = in(pos) pos = pos + 1 end do end do end do end do return
call transpose_y_to_x(u2,u1,decomp) u2输入y笔的铅笔快,u1输出x铅笔的铅笔块,数据体 interface transpose_y_to_x module procedure transpose_y_to_x_real module procedure transpose_y_to_x_complex end interface transpose_y_to_x subroutine transpose_y_to_x_real(src, dst, opt_decomp) 输入src,输出dst,可选的数据体opt_decomp real(mytype), dimension(:,:,:), intent(IN) :: src real(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call mem_split_yx_real(src, s1, s2, s3, work1_r, dims(1), decomp%y1dist, decomp) call MPI_ALLTOALLV(work1_r, decomp%y1cnts, decomp%y1disp, real_type, work2_r, decomp%x1cnts, decomp%x1disp, real_type, DECOMP_2D_COMM_COL, ierror) call mem_merge_yx_real(work2_r, d1, d2, d3, dst, dims(1), decomp%x1dist, decomp) return subroutine transpose_y_to_x_complex(src, dst, opt_decomp) complex(mytype), dimension(:,:,:), intent(IN) :: src complex(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call mem_split_yx_complex(src, s1, s2, s3, work1_c, dims(1), decomp%y1dist, decomp) call MPI_ALLTOALLV(work1_c, decomp%y1cnts, decomp%y1disp, complex_type, work2_c, decomp%x1cnts, decomp%x1disp, complex_type, DECOMP_2D_COMM_COL, ierror) call mem_merge_yx_complex(work2_c, d1, d2, d3, dst, dims(1), decomp%x1dist, decomp) return
call decomp_info_finalize(decomp) 数据体 subroutine decomp_info_finalize(decomp) 数据体 integer :: i integer :: ierror TYPE(DECOMP_INFO), intent(INOUT) :: decomp deallocate(decomp%x1dist,decomp%y1dist,decomp%y2dist,decomp%z2dist) deallocate(decomp%z1dist,decomp%x2dist) deallocate(decomp%x1st,decomp%y1st,decomp%y2st,decomp%z2st) deallocate(decomp%z1st,decomp%x2st) deallocate(decomp%x1en,decomp%y1en,decomp%y2en,decomp%z2en) deallocate(decomp%z1en,decomp%x2en) deallocate(decomp%x1cnts,decomp%y1cnts,decomp%y2cnts,decomp%z2cnts) deallocate(decomp%z1cnts,decomp%x2cnts) deallocate(decomp%x1disp,decomp%y1disp,decomp%y2disp,decomp%z2disp) deallocate(decomp%z1disp,decomp%x2disp) do i=1,nproc if (decomp%ztypes_xz(i).ne.MPI_INTEGER) then call MPI_Type_free(decomp%ztypes_xz(i),ierror) endif if (decomp%xtypes_xz(i).ne.MPI_INTEGER) then call MPI_Type_free(decomp%xtypes_xz(i),ierror) endif enddo deallocate(decomp%xcnts_xz,decomp%zcnts_xz) deallocate(decomp%xtypes_xz,decomp%ztypes_xz) deallocate(decomp%xdispls_xz) deallocate(decomp%zdispls_xz) return
call transpose_y_to_z(u2,u3,decomp) interface transpose_y_to_z module procedure transpose_y_to_z_real module procedure transpose_y_to_z_complex end interface transpose_y_to_z subroutine transpose_y_to_z_real(src, dst, opt_decomp) real(mytype), dimension(:,:,:), intent(IN) :: src real(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call mem_split_yz_real(src, s1, s2, s3, work1_r, dims(2), decomp%y2dist, decomp) call MPI_ALLTOALLV(work1_r, decomp%y2cnts, decomp%y2disp, real_type, dst, decomp%z2cnts, decomp%z2disp, real_type, DECOMP_2D_COMM_ROW, ierror) 注意这边和上面有点区别是因为直接存储在dst中了,并不再需要重新排列 return subroutine transpose_y_to_z_complex(src, dst, opt_decomp) complex(mytype), dimension(:,:,:), intent(IN) :: src complex(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call mem_split_yz_complex(src, s1, s2, s3, work1_c, dims(2), decomp%y2dist, decomp) call MPI_ALLTOALLV(work1_c, decomp%y2cnts, decomp%y2disp, complex_type, dst, decomp%z2cnts, decomp%z2disp, complex_type, DECOMP_2D_COMM_ROW, ierror) return
call transpose_z_to_y(u3,u2,decomp) interface transpose_z_to_y module procedure transpose_z_to_y_real module procedure transpose_z_to_y_complex end interface transpose_z_to_y subroutine transpose_z_to_y_real(src, dst, opt_decomp) real(mytype), dimension(:,:,:), intent(IN) :: src real(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call MPI_ALLTOALLV(src, decomp%z2cnts, decomp%z2disp, real_type, work2_r, decomp%y2cnts, decomp%y2disp, real_type, DECOMP_2D_COMM_ROW, ierror) 和上面不一样,不需要重排数组 call mem_merge_zy_real(work2_r, d1, d2, d3, dst, dims(2), decomp%y2dist, decomp) return subroutine transpose_z_to_y_complex(src, dst, opt_decomp) complex(mytype), dimension(:,:,:), intent(IN) :: src complex(mytype), dimension(:,:,:), intent(OUT) :: dst TYPE(DECOMP_INFO), intent(IN), optional :: opt_decomp TYPE(DECOMP_INFO) :: decomp integer :: s1,s2,s3,d1,d2,d3 integer :: ierror if (present(opt_decomp)) then decomp = opt_decomp else decomp = decomp_main end if s1 = SIZE(src,1) s2 = SIZE(src,2) s3 = SIZE(src,3) d1 = SIZE(dst,1) d2 = SIZE(dst,2) d3 = SIZE(dst,3) call MPI_ALLTOALLV(src, decomp%z2cnts, decomp%z2disp, complex_type, work2_c, decomp%y2cnts, decomp%y2disp, complex_type, DECOMP_2D_COMM_ROW, ierror) call mem_merge_zy_complex(work2_c, d1, d2, d3, dst, dims(2), decomp%y2dist, decomp) return
call init_neighbour subroutine init_neighbour 存储邻居进程,由于是二维进程分布,所以是东西南北四个邻居 integer :: ierror neighbour(1,1) = MPI_PROC_NULL 表示进程在东边没有邻居 neighbour(1,2) = MPI_PROC_NULL 表示进程在西边没有邻居 call MPI_CART_SHIFT(DECOMP_2D_COMM_CART_X, 0, 1, neighbour(1,4), neighbour(1,3), ierror) 在第一个维度上移动一步,存储进程向东移动(1,4)和向西移动(1,3)后的邻居 call MPI_CART_SHIFT(DECOMP_2D_COMM_CART_X, 1, 1, neighbour(1,6), neighbour(1,5), ierror) 在第二个维度上移动一步,存储进程向北移动(1,6)和向南移动(1,5)后的邻居 return
ts=MPI_WTIME() 初始计时 tin(1) = MPI_WTIME() 初始计时 call MpiBarrier 全部进程的屏障 call HdfStart 初始化HDF5库 if (nrank.eq.master) ismaster = .true. if (ismaster) write(6,*) 'MPI tasks=', nproc 主进程写总的计算使用核数 if(ismaster) then call ResetLogs write(6,112)ylen/alx3,zlen/alx3 长高比和宽高比 112 format(//,20x,'R A Y B E N ',//,10x, '3D Cell with aspect-ratio: D1/H = ',f5.2,' D2/H = ',f5.2) write(6,142) 142 format(//,8x,'Periodic lateral wall boundary condition') 横向周期边界 write(6,202) ray,pra Ra数和Pr数 202 format(/,5x,'Parameters: ',' Ra=',e10.3,' Pr= ',e10.3) if(variabletstep) then 是否是变时间步长的 write(6,204) limitCFL 204 format(/,5x,'Variable dt and fixed cfl= ', e11.4,/ ) else write(6,205) dtmax,limitCFL 205 format(/,5x,'Fixed dt= ',e11.4,' and maximum cfl=', e11.4,/ ) endif endif call InitTimeMarchScheme call InitVariables call CreateGrid
call MpiBarrier subroutine MpiBarrier call MPI_BARRIER(MPI_COMM_WORLD,ierr) return
call HdfStart subroutine HdfStart use hdf5 implicit none integer :: hdf_error call h5open_f(hdf_error) return
call ResetLogs subroutine ResetLogs use param implicit none if(resetlogstime) then open(95,file='nu_vol.out',status='unknown',access='sequential', position='append') 临时文件处理 close(95,status='delete') open(97,file="nu_plate.out",status='unknown',access='sequential', position='append') 临时文件处理 close(97,status='delete') if (disscal) then open(92,file='nu_diss.out',status='unknown',access='sequential', position='append') 临时文件处理 close(92,status='delete') end if open(94,file='rms_vel.out',status='unknown',position='append', access='sequential') 临时文件处理 close(94,status='delete') endif return
call InitTimeMarchScheme subroutine InitTimeMarchScheme integer ns if(nsst.gt.1) then 三阶RK前面的系数 gam(1)=8.d0/15.d0 gam(2)=5.d0/12.d0 gam(3)=3.d0/4.d0 rom(1)=0.d0 rom(2)=-17.d0/60.d0 rom(3)=-5.d0/12.d0 if(ismaster) then write(6,100) (gam(ns),ns=1,nsst),(rom(ns),ns=1,nsst) 100 format(/,5x,'The time scheme is a III order Runge-Kutta' ,4x,'gam= ',3f8.3,4x,'ro= ',3f8.3) endif else gam(1)=1.5d0 AB2方案 gam(2)=0.d0 gam(3)=0.d0 rom(1)=-0.5d0 rom(2)=0.d0 rom(3)=0.d0 if(ismaster) then write(6,110) gam(1),rom(1) 110 format(/,5x,'The time scheme is the Adams-Bashfort',4x, 'gam= ',f8.3,4x,'ro= ',f8.3) endif endif do ns=1,nsst 这是RK前面的系数,两个参数加起来的那一块 alm(ns)=(gam(ns)+rom(ns)) end do return
call InitVariables subroutine InitVariables call AllocateReal1DArray(zc,1,nz) zc大小为(1,nz),值全为0 实数数组,nz是网格点数 call AllocateReal1DArray(zm,1,nz) zm大小为(1,nz),值全为0 call AllocateReal1DArray(ak1,1,nz) ak1大小为(1,nz),值全为0 call AllocateReal1DArray(ao,1,nz) ao大小为(1,nz),值全为0 call AllocateReal1DArray(yc,1,ny) yc大小为(1,ny),值全为0 call AllocateReal1DArray(ym,1,ny) ym大小为(1,ny),值全为0 call AllocateReal1DArray(ak2,1,ny) ak2大小为(1,ny),值全为0 call AllocateReal1DArray(ap,1,ny) ap小为(1,ny),值全为0 call AllocateReal1DArray(xc,1,nx) xc大小为(1,nx),值全为0 call AllocateReal1DArray(xm,1,nx) xm大小为(1,nx),值全为0 call AllocateReal1DArray(g3rc,1,nx) g3rc大小为(1,nx),值全为0 call AllocateReal1DArray(g3rm,1,nx) g3rm大小为(1,nx),值全为0 call AllocateReal1DArray(udx3c,1,nx) udx3c大小为(1,nx),值全为0 call AllocateReal1DArray(udx3m,1,nx) udx3m大小为(1,nx),值全为0 call AllocateReal1DArray(ap3ck,1,nx) ap3ck大小为(1,nx),值全为0 call AllocateReal1DArray(ac3ck,1,nx) ac3ck大小为(1,nx),值全为0 call AllocateReal1DArray(am3ck,1,nx) am3ck大小为(1,nx),值全为0 call AllocateReal1DArray(ap3sk,1,nx) ap3sk大小为(1,nx),值全为0 call AllocateReal1DArray(ac3sk,1,nx) ac3sk大小为(1,nx),值全为0 call AllocateReal1DArray(am3sk,1,nx) am3sk大小为(1,nx),值全为0 call AllocateReal1DArray(ap3ssk,1,nx) ap3ssk大小为(1,nx),值全为0 call AllocateReal1DArray(ac3ssk,1,nx) ac3ssk大小为(1,nx),值全为0 call AllocateReal1DArray(am3ssk,1,nx) am3ssk大小为(1,nx),值全为0 call AllocateReal1DArray(amphk,1,nx) amphk大小为(1,nx),值全为0 call AllocateReal1DArray(acphk,1,nx) acphk大小为(1,nx),值全为0 call AllocateReal1DArray(apphk,1,nx) apphk大小为(1,nx),值全为0 call AllocateInt1dArray(kmc,1,nx) kmc大小为(1,nx),值全为0 整数数组 call AllocateInt1dArray(kpc,1,nx) kpc大小为(1,nx),值全为0 call AllocateInt1dArray(kmv,1,nx) kmv大小为(1,nx),值全为0 call AllocateInt1dArray(kpv,1,nx) kpv大小为(1,nx),值全为0 call AllocateReal2DArray(tempbp,1,ny,1,nz) tempbp大小为(1:ny, 1:nz),值全为0 实数数组,和温度相关 call AllocateReal2DArray(temptp,1,ny,1,nz) temptp大小为(1:ny, 1:nz),值全为0 if (statcal) then 和统计数据相关的数组 call AllocateReal1DArray(vx_me,1,nxm) vx_me大小为(1,nxm),值全为0 实数数组,nxm是网格数 call AllocateReal1DArray(vy_me,1,nxm) vy_me大小为(1,nxm),值全为0 call AllocateReal1DArray(vz_me,1,nxm) vz_me大小为(1,nxm),值全为0 call AllocateReal1DArray(vx_rms,1,nxm) vx_rms大小为(1,nxm),值全为0 call AllocateReal1DArray(vy_rms,1,nxm) vy_rms大小为(1,nxm),值全为0 call AllocateReal1DArray(vz_rms,1,nxm) vz_rms大小为(1,nxm),值全为0 call AllocateReal1DArray(temp_me,1,nxm) temp_me大小为(1,nxm),值全为0 call AllocateReal1DArray(temp_rms,1,nxm) temp_rms大小为(1,nxm),值全为0 call AllocateReal1DArray(tempvx_me,1,nxm) tempvx_me大小为(1,nxm),值全为0 if (disscal) then call AllocateReal1DArray(disste,1,nxm) disste大小为(1,nxm),值全为0 call AllocateReal1DArray(dissth,1,nxm) dissth大小为(1,nxm),值全为0 end if end if call AllocateReal3DArray(vy,1,nx,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) 有辅助边界的数组,实数数组,nx是网格点数,lvlhalo=1 call AllocateReal3DArray(vz,1,nx,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) nxm是网格数 call AllocateReal3DArray(vx,1,nx,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) call AllocateReal3DArray(pr,1,nx,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) call AllocateReal3DArray(temp,1,nx,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) call AllocateReal3DArray(dphhalo,1,nxm,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) call AllocateReal3DArray(rhs,1,nx,xstart(2),xend(2),xstart(3),xend(3)) rhs大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 实数数组 call AllocateReal3DArray(dph,1,nxm,xstart(2),xend(2),xstart(3),xend(3)) dph大小为(1:nxm,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(dq,1,nx,xstart(2),xend(2),xstart(3),xend(3)) dq大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(qcap,1,nx,xstart(2),xend(2),xstart(3),xend(3)) qcap大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(rux,1,nx,xstart(2),xend(2),xstart(3),xend(3)) rux大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(ruy,1,nx,xstart(2),xend(2),xstart(3),xend(3)) ruy大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(ruz,1,nx,xstart(2),xend(2),xstart(3),xend(3)) ruz大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(hro,1,nx,xstart(2),xend(2),xstart(3),xend(3)) hro大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 call AllocateReal3DArray(rutemp,1,nx,xstart(2),xend(2),xstart(3),xend(3)) rutemp大小为(1:nx,xstart(2):xend(2), xstart(3):xend(3)),值全为0 return
call AllocateReal1DArray(zc,1,nz) 输入zc,1,nz,输出zc subroutine AllocateReal1DArray(var,st1,en1) 输入var,st1,en1,输出var integer, intent(in) :: st1,en1 real,allocatable,dimension(:),intent(inout) :: var integer :: alloc_stat, errorcode if (.not. allocated(var)) allocate(var(st1:en1), stat=alloc_stat) var没有被分配内存,则分配为(1,nz),数组从1开始 if (alloc_stat /= 0) then call decomp_2d_abort(errorcode, 'Memory allocation failed when creating new arrays') end if var =0.0d0 return6
call AllocateInt1dArray(kmc,1,nx) 输入kmc,1,nx,输出kmc subroutine AllocateInt1DArray(var,st1,en1) 输入var,st1,en1,输出var integer, intent(in) :: st1,en1 integer,allocatable,dimension(:),intent(inout) :: var integer :: alloc_stat, errorcode if (.not. allocated(var)) allocate(var(st1:en1), stat=alloc_stat) var没有被分配内存,则分配为(1,nx),数组从1开始 if (alloc_stat /= 0) then errorcode = 8 call decomp_2d_abort(errorcode, 'Memory allocation failed when creating new arrays') end if var =0 return
call AllocateReal2DArray(tempbp,1,ny,1,nz) 输入tempbp,1,ny,1,nz,输出tempbp subroutine AllocateReal2DArray(var,st1,en1,st2,en2) 输入var,st1,en1,,st2,en2,输出var integer, intent(in) :: st1,en1,st2,en2 real,allocatable,dimension(:,:),intent(inout) :: var integer :: alloc_stat, errorcode if (.not. allocated(var)) allocate(var(st1:en1,st2:en2), stat=alloc_stat) var没有被分配内存,则分配为(1:ny,1:nz),数组从1开始 if (alloc_stat /= 0) then errorcode = 8 call decomp_2d_abort(errorcode, 'Memory allocation failed when creating new arrays') end if var =0.0 return
call AllocateReal3DArray(vy,1,nx,xstart(2)-lvlhalo,xend(2)+lvlhalo,xstart(3)-lvlhalo,xend(3)+lvlhalo) subroutine AllocateReal3DArray(var,st1,en1,st2,en2,st3,en3) integer, intent(in) :: st1,en1,st2,en2,st3,en3 real,allocatable,dimension(:,:,:),intent(inout) :: var integer :: alloc_stat, errorcode if (.not. allocated(var)) allocate(var(st1:en1,st2:en2,st3:en3), stat=alloc_stat) if (alloc_stat /= 0) then errorcode = 8 call decomp_2d_abort(errorcode, 'Memory allocation failed when creating new arrays') end if var =0.0 return
call CreateGrid subroutine CreateGrid real :: x1,x2,x3 real :: a33, a33m, a33p real :: delet, etain, tstr3 real :: z2dp real, allocatable, dimension(:) :: etaz, etazm integer :: i, j, kc, km, kp integer :: nxmo, nclip do kc=1,nxm nxm是网格数,x方向 kmv(kc)=kc-1 比当前节点小1的索引 kpv(kc)=kc+1 比当前节点大1的索引 if(kc.eq.1) kmv(kc)=kc 边界处,第一个节点的后向索引仍然是自身 if(kc.eq.nxm) kpv(kc)=kc 边界处,最后一个节点的前向索引仍然是自身 end do do kc=1,nxm nxm是网格数,x方向 kpc(kc)=kpv(kc)-kc 计算前向节点相对于当前节点的偏移量,边界处不一样 kmc(kc)=kc-kmv(kc) 计算后向节点相对于当前节点的偏移量,边界处不一样 end do do i=1,nz nz是网格点数,z方向,均匀网格 x1=real(i-1)/real(nzm) 计算第几个网格 zc(i)= zlen*x1 计算实际网格的大小,下标从1开始 end do do i=1,nzm nzm是网格数 zm(i)=(zc(i)+zc(i+1))*0.5d0 中间的平均点,中心点 end do do j=1,ny ny是网格点数,y方向,均匀网格 x2=real(j-1)/real(nym) 计算第几个网格 yc(j)= ylen*x2 计算实际网格的大小,下标从1开始 end do do j=1,nym nym是网格数 ym(j)=(yc(j)+yc(j+1))*0.5d0 中间的平均点,中心点 end do call AllocateReal1DArray(etaz,1,nx+500) etaz大小为(1:nx+500),值全为0,实数数组 call AllocateReal1DArray(etazm,1,nx+500) etazm大小为(1:nx+500),值全为0 if (istr3.eq.0) then 均匀网格,x方向 do kc=1,nx x3=real(kc-1)/real(nxm) etaz(kc)=alx3*x3 xc(kc)=etaz(kc) enddo endif tstr3=tanh(str3) 非均匀网格,x方向,双曲正切网格 if (istr3.eq.4) then xc(1)=0.0d0 do kc=2,nx z2dp=float(2*kc-nx-1)/float(nxm) xc(kc)=(1+tanh(str3*z2dp)/tstr3)*0.5*alx3 if(xc(kc).lt.0.or.xc(kc).gt.alx3)then 检查网格是否超过限制 write(*,*)'Grid is too streched: ','zc(',kc,')=',xc(kc) stop endif end do end if if(istr3.eq.6) then 非均匀网格,x方向,截断的切比雪夫网格 nclip = int(str3) 定义剪切程度 nxmo = nx+nclip+nclip do kc=1,nxmo etazm(kc)=+cos(pi*(float(kc)-0.5)/float(nxmo)) end do do kc=1,nx etaz(kc)=etazm(kc+nclip) etazm 的前 nclip 和后 nclip 节点被裁剪掉,保留中间的 nx 个节点 end do delet = etaz(1)-etaz(nx) 计算第一个和最后一个节点之间的差值 etain = etaz(1) do kc=1,nx etaz(kc)=etaz(kc)/(0.5*delet) 将值缩放到(1,-1),因为前面剪切了,所以值的范围不在(1,-1) end do xc(1) = 0. do kc=2,nxm xc(kc) = alx3*(1.-etaz(kc))*0.5 再将值插回到原来的计算区域中 end do xc(nx) = alx3 endif call DestroyReal1DArray(etaz) 释放内存 call DestroyReal1DArray(etazm) 释放内存 dx=real(nxm)/alx3 由于x是非均匀网格,最终还是需要插值回到均匀网格上来计算(只是多一个拉伸系数),所以给出了三个方向均匀网格的步长的倒数,用dx,dy,dz表示 dy=real(nym)/ylen dz=real(nzm)/zlen dxq=dx*dx 步长的倒数的平方 dyq=dy*dy dzq=dz*dz do kc=1,nxm xm(kc)=(xc(kc)+xc(kc+1))*0.5d0 中间的平均点,中心点 g3rm(kc)=(xc(kc+1)-xc(kc))*dx 每一个切比雪夫区间段与均匀下的区间段的比值,为了插值回到均匀网格,1/dx之前的拉伸系数 enddo do kc=2,nxm g3rc(kc)=(xc(kc+1)-xc(kc-1))*dx*0.5d0 每两个相邻的切比雪夫区间段的平均与均匀下的区间段的比值,为了插值回到均匀网格, 1/dx^2之前的拉伸系数 enddo g3rc(1)=(xc(2)-xc(1))*dx 边界 g3rc(nx)= (xc(nx)-xc(nxm))*dx 边界 do kc=1,nxm udx3m(kc) = dx/g3rm(kc) 每一个切比雪夫区间段的倒数 udx3c(kc) = dx/g3rc(kc) 每两个相邻的切比雪夫区间段的平均的倒数 end do udx3c(nx) = dx/g3rc(nx) 边界 if(ismaster) then 相关坐标写入文件 open(unit=78,file='axicor.out',status='unknown') do kc=1,nx write(78,345) kc,xc(kc),xm(kc),g3rc(kc),g3rm(kc) end do close(78) 345 format(i4,4(2x,e23.15)) open(unit=78,file='fact3.out',status='unknown') 相关坐标写入文件 do kc=1,nxm write(78,*) kc,udx3m(kc),udx3c(kc) end do write(78,*) nx,udx3m(nxm),udx3c(nx) close(78) endif am3ck(1)=0.d0 设置三对角前面的系数 ap3ck(1)=0.d0 ac3ck(1)=1.d0 am3ck(nx)=0.d0 ap3ck(nx)=0.d0 ac3ck(nx)=1.d0 do kc=2,nxm km=kc-1 kp=kc+1 a33=dxq/g3rc(kc) a33p=1.d0/g3rm(kc) a33m=1.d0/g3rm(km) ap3ck(kc)=a33*a33p am3ck(kc)=a33*a33m ac3ck(kc)=-(ap3ck(kc)+am3ck(kc)) enddo