pocl.py
¶
Helper tools for working with OpenCL devices via PyOpenCL.
- Requires Python packages/modules:
Imports the pocl
module as well functions from the useful
module.
-
class
streamlines.pocl.
Initialize_cl
(cl_src_path, which_cl_platform, which_cl_device)¶ TBD
Parameters: TBD (TBD) – TBD
Returns: TBD Return type: TBD -
__init__
(cl_src_path, which_cl_platform, which_cl_device)¶ TBD
Parameters: TBD (TBD) – TBD
Returns: TBD Return type: TBD
-
-
streamlines.pocl.
prepare_cl_context
(cl_platform=0, cl_device=2)¶ Prepare PyOpenCL platform, device and context.
Parameters: Returns: PyOpenCL platform, PyOpenCL device, PyOpenCL context
Return type:
-
streamlines.pocl.
choose_platform_and_device
(cl_platform='env', cl_device='env')¶ Get OpenCL platform & device from environment variables if they are set.
Parameters: Returns: CL platform, CL device
Return type:
-
streamlines.pocl.
prepare_cl_queue
(context=None, kernel_source=None, compile_options=[])¶ Build PyOpenCL program and prepare command queue.
Parameters: - context (pyopencl.Context) – GPU/OpenCL device context
- kernel_source (str) – OpenCL kernel code string
Returns: PyOpenCL program, PyOpenCL command queue
Return type:
-
streamlines.pocl.
prepare_cl
(cl_platform=0, cl_device=2, kernel_source=None, compile_options=[])¶ Prepare PyOpenCL stuff.
Parameters: Returns: PyOpenCL platform, PyOpenCL device, PyOpenCL context, PyOpenCL program, PyOpenCL command queue
Return type: pyopencl.Platform, pyopencl.Device, pyopencl.Context, pyopencl.Program, pyopencl.CommandQueue
-
streamlines.pocl.
make_cl_dtype
(device, name, dtype)¶ Generate an OpenCL structure typedef codelet from a numpy structured array dtype.
Currently unused.
Parameters: - cl_device (int) –
- name (str) –
- dtype (numpy.dtype) –
Returns: processed dtype, cl dtype, CL typedef codelet
Return type: numpy.dtype, pyopencl.dtype, str
-
streamlines.pocl.
set_compile_options
(info, kernel_def, downup_sign=1, job_type='integration')¶ Convert info obj data into a list of ‘-D’ compiler macros.
Parameters: - info (obj) – container for myriad parameters controlling trace() and mapping() workflow and corresponding GPU/OpenCL device operation
- kernel_def (str) – name of the kernel in the program source string; is used by #ifdef kernel-wrapper commands in the OpenCL codes
- downup_sign (bool) – flag used to indicate desired sense of streamline integration, with +1 for downstream and -1 for upstream [‘integration’ mode]
- job_type (str) – switch between ‘integration’ (default) and ‘kde’
Returns: compile options
Return type:
-
streamlines.pocl.
report_kernel_info
(device, kernel, verbose)¶ Fetch and print GPU/OpenCL kernel info.
Parameters: - device (pyopencl.Device) –
- kernel (pyopencl.Kernel) –
- verbose (bool) –
-
streamlines.pocl.
report_device_info
(cl_platform, cl_device, platform, device, verbose)¶ Fetch and print GPU/OpenCL device info.
Parameters: - cl_platform (int) –
- cl_device (int) –
- platform (pyopencl.Platform) –
- device (pyopencl.Device) –
- verbose (bool) –
-
streamlines.pocl.
report_build_log
(program, device, verbose)¶ Fetch and print GPU/OpenCL program build log.
Parameters: - program (pyopencl.Program) –
- device (pyopencl.Device) –
- verbose (bool) –
-
streamlines.pocl.
adaptive_enqueue_nd_range_kernel
(queue, kernel, global_size, local_size, n_work_items, chunk_size_factor=10, max_time_per_kernel=4.0, downup_step=(1, 1), verbose=True, vprogress=False)¶ TBD
Parameters: TBD (TBD) – TBD
Returns: TBD Return type: TBD
-
streamlines.pocl.
prepare_buffers
(context, array_dict, verbose)¶ Create PyOpenCL buffers and np-workalike arrays to allow CPU-GPU data transfer.
Parameters: - context (pyopencl.Context) –
- array_dict (dict) –
- verbose (bool) –
Returns: buffer_dict
Return type:
-
streamlines.pocl.
gpu_compute
(cl_state, info, array_dict, verbose)¶ Carry out GPU computation.
Parameters:
Code¶
"""
---------------------------------------------------------------------
Helper tools for working with OpenCL devices via PyOpenCL.
Requires Python packages/modules:
- :mod:`PyOpenCL <pyopencl>`
Imports the :mod:`.pocl` module as well functions from the :mod:`.useful` module.
---------------------------------------------------------------------
.. _OpenCL: https://www.khronos.org/opencl
.. _PyOpenCL: https://documen.tician.de/pyopencl/index.html
"""
import pyopencl as cl
import pyopencl.tools as cltools
import numpy as np
import os
from streamlines.useful import neatly, vprint
import warnings
os.environ['PYOPENCL_COMPILER_OUTPUT']='0'
__all__ = ['Initialize_cl',
'prepare_cl_context','choose_platform_and_device',
'prepare_cl_queue', 'prepare_cl',
'make_cl_dtype',
'set_compile_options',
'report_kernel_info', 'report_device_info', 'report_build_log',
'adaptive_enqueue_nd_range_kernel',
'prepare_buffers', 'gpu_compute']
pdebug = print
class Initialize_cl():
"""
TBD
Args:
TBD (TBD):
TBD
Returns:
TBD:
TBD
"""
def __init__(self, cl_src_path, which_cl_platform, which_cl_device ):
"""
TBD
Args:
TBD (TBD):
TBD
Returns:
TBD:
TBD
"""
# Prepare CL essentials
self.platform, self.device, self.context \
= prepare_cl_context(which_cl_platform,which_cl_device)
self.queue = cl.CommandQueue(self.context,
properties=cl.command_queue_properties.PROFILING_ENABLE)
self.src_path = cl_src_path
self.kernel_source = None
self.kernel_fn = ''
def prepare_cl_context(cl_platform=0, cl_device=2):
"""
Prepare PyOpenCL platform, device and context.
Args:
cl_platform (int):
cl_device (int):
Returns:
pyopencl.Platform, pyopencl.Device, pyopencl.Context:
PyOpenCL platform, PyOpenCL device, PyOpenCL context
"""
cl_platform, cl_device = choose_platform_and_device(cl_platform,cl_device)
platform = cl.get_platforms()[cl_platform]
devices = platform.get_devices()
device = devices[cl_device]
context = cl.Context([device])
return platform, device, context
def choose_platform_and_device(cl_platform='env',cl_device='env'):
"""
Get OpenCL platform & device from environment variables if they are set.
Args:
cl_platform (int):
cl_device (int):
Returns:
int, int:
CL platform, CL device
"""
if cl_platform=='env':
try:
cl_platform = int(environ['PYOPENCL_CTX'].split(':')[0])
except:
cl_platform = 0
if cl_device=='env':
try:
cl_device = int(environ['PYOPENCL_CTX'].split(':')[1])
except:
cl_device = 2
return cl_platform, cl_device
def prepare_cl_queue(context=None, kernel_source=None, compile_options=[]):
"""
Build PyOpenCL program and prepare command queue.
Args:
context (pyopencl.Context): GPU/OpenCL device context
kernel_source (str): OpenCL kernel code string
Returns:
pyopencl.Program, pyopencl.CommandQueue:
PyOpenCL program, PyOpenCL command queue
"""
# compile_options = ['-cl-fast-relaxed-math',
# '-cl-single-precision-constant',
# '-cl-unsafe-math-optimizations',
# '-cl-no-signed-zeros',
# '-cl-finite-math-only']
program = cl.Program(context, kernel_source).build(cache_dir=False,
options=compile_options)
queue = cl.CommandQueue(context)
return program, queue
def prepare_cl(cl_platform=0, cl_device=2, kernel_source=None, compile_options=[]):
"""
Prepare PyOpenCL stuff.
Args:
cl_device (int):
kernel_source (str): OpenCL kernel code string
Returns:
pyopencl.Platform, pyopencl.Device, pyopencl.Context, pyopencl.Program, \
pyopencl.CommandQueue:
PyOpenCL platform, PyOpenCL device, PyOpenCL context, PyOpenCL program, \
PyOpenCL command queue
"""
platform,device,context = prepare_cl_context(cl_platform,cl_device)
program,queue = prepare_cl_queue(context,kernel_source,compile_options)
return platform, device, context, program, queue
def make_cl_dtype(device,name,dtype):
"""
Generate an OpenCL structure typedef codelet from a numpy structured
array dtype.
Currently unused.
Args:
cl_device (int):
name (str):
dtype (numpy.dtype):
Returns:
numpy.dtype, pyopencl.dtype, str:
processed dtype, cl dtype, CL typedef codelet
"""
processed_dtype, c_decl = cltools.match_dtype_to_c_struct(device, name, dtype)
return processed_dtype, cltools.get_or_register_dtype(name, processed_dtype), c_decl
def set_compile_options(info, kernel_def, downup_sign=1,
job_type='integration'):
"""
Convert info obj data into a list of '-D' compiler macros.
Args:
info (obj): container for myriad parameters controlling
trace() and mapping() workflow and corresponding GPU/OpenCL device operation
kernel_def (str): name of the kernel in the program source string;
is used by #ifdef kernel-wrapper commands in the OpenCL codes
downup_sign (bool): flag used to indicate desired sense of streamline integration,
with +1 for downstream and -1 for upstream ['integration' mode]
job_type (str): switch between 'integration' (default) and 'kde'
Returns:
list:
compile options
"""
if job_type=='kde':
compile_options_dict = {
'bandwidth' : 'f',
'n_data' : 'u',
'n_hist_bins' : 'u',
'n_pdf_points' : 'u',
'logx_min' : 'f',
'logx_max' : 'f',
'logx_range' : 'f',
'bin_dx' : 'f',
'pdf_dx' : 'f',
'kdf_width_x' : 'f',
'n_kdf_part_points_x' : 'u',
'logy_min' : 'f',
'logy_max' : 'f',
'logy_range' : 'f',
'bin_dy' : 'f',
'pdf_dy' : 'f',
'kdf_width_y' : 'f',
'n_kdf_part_points_y' : 'u'
}
rtn_list = ['-D','KERNEL_{}'.format(kernel_def.upper())]
rtn_list += ['-D','KDF_IS_{}'.format(info.kernel.upper())]
for item in compile_options_dict.items():
value = getattr(info,item[0])
list_item = ['-D','{0}={1}{2}'.format(item[0].upper(),value,item[1])]
rtn_list += list_item
else:
pad = np.uint32(info.pad_width)
nxp = np.uint32(info.nx_padded)
nyp = np.uint32(info.ny_padded)
nxf = np.float32(info.nx)
nyf = np.float32(info.ny)
grid_scale = np.float32(np.sqrt(nxf*nyf))
combo_factor = np.float32(grid_scale*info.integrator_step_factor)
dt_max = np.float32(min(min(1.0/nxf,1.0/nyf),0.1))
compile_options_dict = {
'n_seed_points' : ('u',''),
'downup_sign' : ('u', np.int8(downup_sign)),
'integrator_step_factor' : ('f',''),
'max_integration_step_error' : ('f',''),
'adjusted_max_error' : ('f',''),
'integration_halt_threshold' : ('f',''),
'max_length' : ('f',''),
'pixel_size' : ('f',''),
'pad_width' : ('u', pad),
'pad_width_pp5' : ('f', np.float32(pad)+0.5),
'nx_padded' : ('u', nxp),
'ny_padded' : ('u', nyp),
'nxy_padded' : ('u', np.uint32(nxp*nyp)),
'nxf_mp5' : ('f', nxf-0.5),
'nyf_mp5' : ('f', nyf-0.5),
'grid_scale' : ('f', grid_scale),
'combo_factor' : ('f', combo_factor*downup_sign),
'dt_max' : ('f', dt_max),
'max_n_steps' : ('u',''),
'trajectory_resolution' : ('u',''),
'seeds_chunk_offset' : ('u',''),
'subpixel_seed_point_density' : ('u',''),
'subpixel_seed_halfspan' : ('f',''),
'subpixel_seed_step' : ('f',''),
'jitter_magnitude' : ('f',''),
'interchannel_max_n_steps' : ('u',''),
'left_flank_addition' : ('u',''),
'is_channel' : ('u',''),
'is_thinchannel' : ('u',''),
'is_interchannel' : ('u',''),
'is_channelhead' : ('u',''),
'is_channeltail' : ('u',''),
'is_majorconfluence' : ('u',''),
'is_minorconfluence' : ('u',''),
'is_majorinflow' : ('u',''),
'is_minorinflow' : ('u',''),
'is_leftflank' : ('u',''),
'is_rightflank' : ('u',''),
'is_midslope' : ('u',''),
'is_ridge' : ('u',''),
'was_channelhead' : ('u',''),
'is_subsegmenthead' : ('u',''),
'is_loop' : ('u',''),
'is_blockage' : ('u',''),
'do_measure_hsl_from_ridges' : ('flag',''),
'debug' : ('flag',''),
'verbose' : ('flag','')
}
if info.segmentation_threshold>0:
compile_options_dict.update({'segmentation_threshold' : ('u','')})
rtn_list = ['-D','KERNEL_{}'.format(kernel_def.upper())]
for item in compile_options_dict.items():
if item[1][0]=='flag':
if getattr(info,item[0]):
list_item = ['-D', '{}'.format(item[0].upper())]
elif item[1][1]!='':
list_item = ['-D',
'{0}={1}{2}'.format(item[0].upper(),item[1][1],item[1][0])]
else:
value = getattr(info,item[0])
list_item = ['-D',
'{0}={1}{2}'.format(item[0].upper(),value,item[1][0])]
rtn_list += list_item
return rtn_list
def report_kernel_info(device,kernel,verbose):
"""
Fetch and print GPU/OpenCL kernel info.
Args:
device (pyopencl.Device):
kernel (pyopencl.Kernel):
verbose (bool):
"""
if verbose:
# Report some GPU info
print('Kernel reference count:',
kernel.get_info(
cl.kernel_info.REFERENCE_COUNT))
print('Kernel number of args:',
kernel.get_info(
cl.kernel_info.NUM_ARGS))
print('Kernel function name:',
kernel.get_info(
cl.kernel_info.FUNCTION_NAME))
print('Maximum work group size:',
kernel.get_work_group_info(
cl.kernel_work_group_info.WORK_GROUP_SIZE, device))
print('Recommended work group size:',
kernel.get_work_group_info(
cl.kernel_work_group_info.PREFERRED_WORK_GROUP_SIZE_MULTIPLE, device))
print('Local memory size:',
kernel.get_work_group_info(
cl.kernel_work_group_info.LOCAL_MEM_SIZE, device), 'bytes')
print('Private memory size:',
kernel.get_work_group_info(
cl.kernel_work_group_info.PRIVATE_MEM_SIZE, device), 'bytes')
def report_device_info(cl_platform, cl_device, platform, device, verbose):
"""
Fetch and print GPU/OpenCL device info.
Args:
cl_platform (int):
cl_device (int):
platform (pyopencl.Platform):
device (pyopencl.Device):
verbose (bool):
"""
if verbose:
print('OpenCL platform #{0} = {1}'.format(cl_platform,platform))
print('OpenCL device #{0} = {1}'.format(cl_device,device))
n_bytes = device.get_info(cl.device_info.GLOBAL_MEM_SIZE)
print('Global memory size: {} bytes = {}'.format(n_bytes,neatly(n_bytes)))
n_bytes = device.get_info(cl.device_info.MAX_MEM_ALLOC_SIZE)
print('Max memory alloc size: {} bytes = {}'
.format(n_bytes,neatly(n_bytes)))
device_info_list = [s for s in dir(cl.device_info) if not s.startswith('__')]
for s in device_info_list:
try:
print('{0} = {1}'.format(s,device.get_info(getattr(cl.device_info,s))))
except:
pass
def report_build_log(program, device, verbose):
"""
Fetch and print GPU/OpenCL program build log.
Args:
program (pyopencl.Program):
device (pyopencl.Device):
verbose (bool):
"""
build_log = program.get_build_info(device,cl.program_build_info.LOG)
if len(build_log.replace(' ',''))>0:
vprint(verbose,'\nOpenCL build log: {}'.format(build_log))
def report_progress(vprogress, x, n, downup_step=None, end=''):
"""
TBD
Args:
TBD (TBD):
TBD
Returns:
TBD:
TBD
"""
if downup_step[1]==1:
progress = (x/n)*100.0
else:
progress = (downup_step[0]/downup_step[1])*100.0+(x/n)*(100.0/downup_step[1])
if downup_step[1]==1 or progress!=50.0:
vprint(vprogress, '{0:2.1f}% '.format(progress),end=end)
def adaptive_enqueue_nd_range_kernel(queue, kernel, global_size, local_size,
n_work_items, chunk_size_factor=10,
max_time_per_kernel=4.0, downup_step=(1,1),
verbose=True, vprogress=False):
"""
TBD
Args:
TBD (TBD):
TBD
Returns:
TBD:
TBD
"""
work_size = n_work_items*int(np.ceil(global_size[0]/n_work_items))
work_left = work_size
chunk_size = min(n_work_items*chunk_size_factor,work_left)
offset = 0
cumulative_time = 0.0
time_per_item = 0.0
while work_left>0:
# event = cl.enqueue_nd_range_kernel(queue, kernel, [chunk_size+offset,1],
event = cl.enqueue_nd_range_kernel(queue, kernel, [chunk_size,1],
local_size,global_work_offset=[offset,0])
progress = 100.0*(min(work_size,(offset+chunk_size))/work_size)
report_progress(vprogress,work_size-work_left,work_size,downup_step=downup_step)
vprint(verbose,
'{0:.2f}%: enqueued {1}/{2} workitems'
.format(progress,chunk_size,work_size),
'in [{0}-{1}]'.format(offset,offset+chunk_size-1),
'estimated t={0:.3f}s...'.format(time_per_item*chunk_size),
end='')
offset += chunk_size
work_left -= chunk_size
event.wait()
try:
elapsed_time = 1e-9*(event.profile.end-event.profile.start)
vprint(verbose, '...actual t={0:.3f}s'.format(elapsed_time))
except:
vprint(verbose, '...profiling failed')
continue
cumulative_time += elapsed_time
time_per_item = elapsed_time/chunk_size
chunk_size = n_work_items*(min(
int(max_time_per_kernel/(time_per_item*n_work_items)),
int(np.ceil(work_left/n_work_items)) ))
queue.finish()
report_progress(vprogress, work_size-work_left, work_size,
downup_step=downup_step, end='\n')
return cumulative_time
def read_kernel_source(cl_src_path, cl_files):
"""
TBD
Args:
TBD (TBD):
TBD
Returns:
TBD:
TBD
"""
kernel_source = ''
for cl_file in cl_files:
with open(os.path.join(cl_src_path,cl_file), 'r') as fp:
kernel_source += fp.read()
return kernel_source
def prepare_buffers(context, array_dict, verbose):
"""
Create PyOpenCL buffers and np-workalike arrays to allow CPU-GPU data transfer.
Args:
context (pyopencl.Context):
array_dict (dict):
verbose (bool):
Returns:
dict: buffer_dict
"""
# Buffers to GPU memory
COPY_READ_ONLY = cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR
COPY_READ_WRITE = cl.mem_flags.READ_WRITE | cl.mem_flags.COPY_HOST_PTR
WRITE_ONLY = cl.mem_flags.WRITE_ONLY
# The following could be packed into a list comprehension but would be
# rather harder to read in that form
buffer_dict = {}
for array_info in array_dict.items():
if 'R' in array_info[1]['rwf']:
if array_info[1]['rwf']=='RO':
flags = COPY_READ_ONLY
elif array_info[1]['rwf']=='RW':
flags = COPY_READ_WRITE
buffer_dict.update({
array_info[0]: cl.Buffer(context, flags, hostbuf=array_info[1]['array'])
})
elif array_info[1]['rwf']=='WO':
flags = WRITE_ONLY
buffer_dict.update({
array_info[0]: cl.Buffer(context, flags, array_info[1]['array'].nbytes)
})
else:
pass
return buffer_dict
def gpu_compute(cl_state, info, array_dict, verbose):
"""
Carry out GPU computation.
Args:
cl_state (obj):
info (dict):
array_dict (dict):
verbose (bool):
"""
# Shorthand
device = cl_state.device
context = cl_state.context
queue = cl_state.queue
kernel_source = cl_state.kernel_source
kernel_fn = cl_state.kernel_fn
# Prepare memory, buffers
buffer_dict = prepare_buffers(context, array_dict, verbose)
# Specify size (# of workitems) and number of workgroups
global_size = [info.n_seed_points,1]
local_size = [info.n_work_items,1]
# Compile the CL code
compile_options = set_compile_options(info, kernel_fn, downup_sign=1)
# vprint(verbose,'Compile options:\n', compile_options)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
program = cl.Program(context, kernel_source).build(options=compile_options)
report_build_log(program, device, verbose)
# Set the GPU kernel
kernel = getattr(program, kernel_fn)
# Designate buffered arrays
kernel.set_args(*list(buffer_dict.values()))
kernel.set_scalar_arg_dtypes( [None]*len(buffer_dict) )
# Do the GPU compute
vprint(verbose,
'#### GPU/OpenCL computation: {0} work items... ####'.format(global_size[0]))
report_kernel_info(device, kernel, verbose)
elapsed_time \
= adaptive_enqueue_nd_range_kernel( queue, kernel, global_size,
local_size, info.n_work_items,
chunk_size_factor=info.chunk_size_factor,
max_time_per_kernel=info.max_time_per_kernel,
verbose=verbose )
vprint(verbose,
'#### ...elapsed time for {1} work items: {0:.3f}s ####'
.format(elapsed_time,global_size[0]))
queue.finish()
# Fetch the data back from the GPU and finish
for array_info in array_dict.items():
if 'W' in array_info[1]['rwf']:
cl.enqueue_copy(queue, array_info[1]['array'], buffer_dict[array_info[0]])
queue.finish()