fields.py
¶
Perform GPU-based streamline integration using OpenCL.
Provides PyOpenCL-accelerated functions to integrate streamlines using 2nd-order Runge-Kutta and (streamline tail-step only) Euler methods. Basins of interest can be delimited by masking.
Requires PyOpenCL.
Imports streamlines module pocl.py. Imports functions from streamlines module useful.py.
Code¶
"""
---------------------------------------------------------------------
Perform GPU-based streamline integration using `OpenCL`_.
Provides PyOpenCL-accelerated functions to integrate streamlines using
2nd-order Runge-Kutta and (streamline tail-step only) Euler methods.
Basins of interest can be delimited by masking.
Requires `PyOpenCL`_.
Imports streamlines module :doc:`pocl`.
Imports functions from streamlines module :doc:`useful`.
---------------------------------------------------------------------
.. _OpenCL: https://www.khronos.org/opencl
.. _PyOpenCL: https://documen.tician.de/pyopencl/index.html
"""
import pyopencl as cl
import numpy as np
from os import environ
environ['PYTHONUNBUFFERED']='True'
environ['PYOPENCL_NO_CACHE']='True'
environ['PYOPENCL_COMPILER_OUTPUT']='0'
from streamlines import pocl
from streamlines.useful import neatly, vprint, create_seeds
__all__ = ['Fields']
import warnings
pdebug = print
class Fields():
def __init__( self,
which_cl_platform,
which_cl_device,
cl_src_path = None,
info = None,
data = None,
verbose = False,
gpu_verbose = False ):
"""
Initialize.
Args:
cl_src_path (str):
which_cl_platform (int):
which_cl_device (int):
info (obj):
data (obj):
verbose (bool):
gpu_verbose (bool):
"""
self.platform, self.device, self.context \
= pocl.prepare_cl_context(which_cl_platform, which_cl_device)
self.queue = cl.CommandQueue(self.context,
properties=cl.command_queue_properties.PROFILING_ENABLE)
self.cl_src_path = cl_src_path
self.info = info
self.data = data
self.verbose = verbose
self.gpu_verbose = gpu_verbose
self.vprogress = verbose
def integrate(self):
"""
Trace each streamline from its corresponding seed point using 2nd-order
Runge-Kutta integration of the topographic gradient vector field.
This function is a master wrapper connecting the streamlines object and its
trace() method to the GPU/OpenCL wrapper function gpu_integrate(). As such it
acts on a set of parameters passed as arguments here rather than by accessing
object variables.
Workflow parameters are transferred here bundled in the Numpy structure array
info, which is parsed as well as passed on to gpu_integrate.
The tasks undertaken by this function are to:
1) prepare the OpenCL context, device and kernel source string
2) calculate how to split the streamline tracing into chunks
3) invoke the GPU/OpenCL device computation
4) post-process the streamline total length (slt) array (scale, sqrt)
and compute streamline trajectories statistics
"""
vprint(self.verbose,'Integrating streamline fields...')
# Shorthand
cl_src_path = self.cl_src_path
info = self.info
mask_array = self.data.mask_array
uv_array = self.data.uv_array
mapping_array = self.data.mapping_array
traj_stats_df = self.data.traj_stats_df
# Prepare CL essentials
device = self.device
context = self.context
queue = self.queue
cl_kernel_source \
= pocl.read_kernel_source(cl_src_path,['rng.cl','essentials.cl',
'writearray.cl','updatetraj.cl','computestep.cl',
'rungekutta.cl','jittertrajectory.cl','integratefields.cl'])
n_padded_seed_points = info.n_padded_seed_points
# Memory check - not really needed
gpu_traj_memory_limit = (device.get_info(cl.device_info.GLOBAL_MEM_SIZE)
*info.gpu_memory_limit_pc)//100
full_traj_memory_request = (mask_array.shape[0]*mask_array.shape[1]
*np.dtype(np.float32).itemsize*2*3)
vprint(self.verbose,
'GPU/OpenCL device global memory limit for streamline trajectories: {}'
.format(neatly(gpu_traj_memory_limit)))
vprint(self.verbose,
'GPU/OpenCL device memory required for streamline trajectories: {}'
.format(neatly(full_traj_memory_request)))
# Seed point selection, padding and shuffling
pad_width = info.pad_width
n_work_items = info.n_work_items
do_shuffle = info.do_shuffle
shuffle_rng_seed = info.shuffle_rng_seed
(self.data.seed_point_array, info.n_seed_points, info.n_padded_seed_points) \
= create_seeds(mask_array, pad_width, n_work_items,
do_shuffle=do_shuffle, rng_seed=shuffle_rng_seed,
verbose=self.verbose)
# Prep for GPU compute
n_global = info.n_padded_seed_points
pad_length = np.uint32(np.round(n_global/n_work_items))*n_work_items-n_global
if pad_length>0:
padding_array = -np.ones([pad_length,2], dtype=np.float32)
vprint(self.verbose,
'Chunk size adjustment for {0} CL work items/group: {1}->{2}...'
.format(n_work_items, n_global, n_global+pad_length))
n_global += pad_length
# Do integrations on the GPU
self.gpu_integrate(device, context, queue, cl_kernel_source, n_global)
# Streamline stats
pixel_size = info.pixel_size
dds = traj_stats_df['ds']['downstream','mean']
uds = traj_stats_df['ds']['upstream','mean']
# slc: simple counter of streamline points in a pixel
# slt: total streamline lengths running across a pixel
# slt: sum of line lengths crossing a pixel * number of line-points per pixel
# slt: <=> sum of line-points per pixel integrator_step_factor
# dds = info.integrator_step_factor
# uds = info.integrator_step_factor
self.data.slt_array[:,:,0] = self.data.slt_array[:,:,0]*(dds/pixel_size)
self.data.slt_array[:,:,1] = self.data.slt_array[:,:,1]*(uds/pixel_size)
# self.data.slt_array[:,:,0] = self.data.slt_array[:,:,0]*(dds/pixel_size)
# self.data.slt_array[:,:,1] = self.data.slt_array[:,:,1]*(uds/pixel_size)
# slt: => sum of line-points per meter
self.data.slt_array = np.sqrt(self.data.slt_array)
# slt: => sqrt(area)
self.data.slc_array \
= np.sqrt(2.0)*np.power(self.data.slc_array
/info.subpixel_seed_point_density**2,2/3)
# Done
vprint(self.verbose,'...done')
def gpu_integrate(self, device, context, queue, cl_kernel_source, n_global):
"""
Carry out GPU/OpenCL device computations in chunks.
This function is the basic wrapper interfacing Python with the GPU/OpenCL device.
The tasks undertaken by this function are to:
1) prepare Numpy data arrays and their corresponding PyOpenCL buffers
2) iterate over each OpenCL computation chunk
3) postprocess the streamline count and length arrays
Chunk iteration involves:
1) building the CL program
2) preparing the CL kernel
3) enqueueing kernel events (inc. passing data to CL device via buffers)
4) reporting kernel status
5) enqueueing data returns from CL device to CPU
6) parsing returned streamline trajectories into master np array
7) parsing returned streamline array (slt, slc) chunks into master np arrays
Args:
device (pyopencl.Device):
context (pyopencl.Context):
queue (pyopencl.CommandQueue):
cl_kernel_source (str):
n_global (int):
"""
# Shorthand
info = self.info
seed_point_array = self.data.seed_point_array
mask_array = self.data.mask_array
uv_array = self.data.uv_array
mapping_array = self.data.mapping_array
# Prepare memory, buffers
roi_nxy = mapping_array.shape
# pdebug('mask_array',mask_array.shape)
# pdebug('uv_array',uv_array.shape)
# pdebug('mapping_array',mapping_array.shape)
slc_array = np.zeros((roi_nxy[0],roi_nxy[1]), dtype=np.uint32)
slt_array = np.zeros((roi_nxy[0],roi_nxy[1]), dtype=np.uint32)
self.data.slc_array = np.zeros((roi_nxy[0],roi_nxy[1],2), dtype=np.uint32)
# Note the returned slt array is FLOAT32 but the GPU computes are done on UINT32
self.data.slt_array = np.zeros((roi_nxy[0],roi_nxy[1],2), dtype=np.float32)
self.data.sla_array = np.zeros((roi_nxy[0],roi_nxy[1],2), dtype=np.float32)
array_dict = { 'seed_point': {'array': seed_point_array, 'rwf': 'RO'},
'mask': {'array': mask_array, 'rwf': 'RO'},
'uv': {'array': uv_array, 'rwf': 'RO'},
'mapping': {'array': mapping_array, 'rwf': 'RW'},
'slc': {'array': slc_array, 'rwf': 'RW'},
'slt': {'array': slt_array, 'rwf': 'RW'} }
buffer_dict = pocl.prepare_buffers(context, array_dict, self.verbose)
# Downstream and upstream passes aka streamline integrations from
# chunks of seed points aka subsets of the total set
global_size = [n_global,1]
local_size = [info.n_work_items,1]
n_work_items = info.n_work_items
chunk_size_factor = info.chunk_size_factor
max_time_per_kernel = info.max_time_per_kernel
vprint(self.verbose,
'Seed point buffer size = {}*8 bytes'.
format(buffer_dict['seed_point'].size/8))
# Downstream then upstream loop
downup_list = [[0,+1.0],[1,-1.0]]
for downup_idx, downup_sign in downup_list:
info.downup_sign = downup_sign
##################################
# Compile the CL code
compile_options = pocl.set_compile_options(info, 'INTEGRATE_FIELDS',
downup_sign=downup_sign)
vprint(self.gpu_verbose,'Compile options:\n',compile_options)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
program = cl.Program(context,
cl_kernel_source).build(options=compile_options)
pocl.report_build_log(program, device, self.gpu_verbose)
# Set the GPU kernel
kernel = program.integrate_fields
# Designate buffered arrays
kernel.set_args(*list(buffer_dict.values()))
kernel.set_scalar_arg_dtypes( [None]*len(buffer_dict) )
# Trace the streamlines on the GPU
vprint(self.gpu_verbose,
'#### GPU/OpenCL computation: {0} work items... ####'
.format(global_size[0]))
pocl.report_kernel_info(device,kernel,self.gpu_verbose)
downup_step = (downup_idx,len(downup_list))
elapsed_time \
= pocl.adaptive_enqueue_nd_range_kernel(
queue, kernel, global_size,
local_size, n_work_items,
chunk_size_factor=chunk_size_factor,
max_time_per_kernel=max_time_per_kernel,
downup_step=downup_step,
verbose=self.gpu_verbose,
vprogress=self.vprogress)
vprint(self.gpu_verbose,
'#### ...elapsed time for {1} work items: {0:.3f}s ####'
.format(elapsed_time,global_size[0]))
queue.finish()
# Copy back the streamline length, distance density grid
cl.enqueue_copy(queue, slc_array, buffer_dict['slc'])
queue.finish()
cl.enqueue_copy(queue, slt_array, buffer_dict['slt'])
queue.finish()
##################################
# Copy out the slc, slt results for this pass only
# Count of streamlines entering per pixel width: n/meter
self.data.slc_array[:,:,downup_idx] += slc_array
# Average streamline length of streamlines entering each pixel: meters
self.data.slt_array[:,:,downup_idx] += slt_array.astype(np.float32)
if downup_idx==0:
# Zero the GPU slt, slc arrays before using in the next pass
slc_array.fill(0)
slt_array.fill(0.0)
cl.enqueue_copy(queue, buffer_dict['slc'],slc_array)
queue.finish()
cl.enqueue_copy(queue, buffer_dict['slt'],slt_array)
queue.finish()
cl.enqueue_copy(queue, mapping_array, buffer_dict['mapping'])
queue.finish()
# Compute average streamline lengths (sla) from total lengths (slt) & counts (slc)
# Shorthand
(slc,sla,slt) = (self.data.slc_array, self.data.sla_array, self.data.slt_array)
# slc: count of lines crossing a pixel * number of line-points per pixel
# slt: sum of line lengths crossing a pixel * number of line-points per pixel
# sla: sum of line lengths / count of lines
sla[slc==0] = 0.0
sla[slc>0] = np.sqrt(2.0)*slt[slc>0]/slc[slc>0]
slt[slc>0] = slt[slc>0]/info.subpixel_seed_point_density**2
# slc = slc/info.subpixel_seed_point_density**2