delayed_image package¶
Submodules¶
- delayed_image.channel_spec module
BaseChannelSpecFusedChannelSpecFusedChannelSpec.concat()FusedChannelSpec.specFusedChannelSpec.unique()FusedChannelSpec.parse()FusedChannelSpec.coerce()FusedChannelSpec.concise()FusedChannelSpec.normalize()FusedChannelSpec.numel()FusedChannelSpec.sizes()FusedChannelSpec.code_list()FusedChannelSpec.as_list()FusedChannelSpec.as_oset()FusedChannelSpec.as_set()FusedChannelSpec.to_set()FusedChannelSpec.to_oset()FusedChannelSpec.to_list()FusedChannelSpec.as_path()FusedChannelSpec.difference()FusedChannelSpec.intersection()FusedChannelSpec.union()FusedChannelSpec.issubset()FusedChannelSpec.issuperset()FusedChannelSpec.component_indices()FusedChannelSpec.streams()FusedChannelSpec.fuse()
ChannelSpecChannelSpec.specChannelSpec.infoChannelSpec.coerce()ChannelSpec.parse()ChannelSpec.concise()ChannelSpec.normalize()ChannelSpec.keys()ChannelSpec.values()ChannelSpec.items()ChannelSpec.fuse()ChannelSpec.streams()ChannelSpec.code_list()ChannelSpec.as_path()ChannelSpec.difference()ChannelSpec.intersection()ChannelSpec.union()ChannelSpec.issubset()ChannelSpec.issuperset()ChannelSpec.numel()ChannelSpec.sizes()ChannelSpec.unique()ChannelSpec.encode()ChannelSpec.decode()ChannelSpec.component_indices()
subsequence_index()oset_insert()oset_delitem()
- delayed_image.delayed_base module
- delayed_image.delayed_leafs module
- delayed_image.delayed_nodes module
- delayed_image.demo module
- delayed_image.helpers module
- delayed_image.lazy_loaders module
- delayed_image.sensorchan_spec module
SENSOR_CHAN_GRAMMARSensorSpecSensorChanSpecFusedSensorChanSpecSensorChanNodeFusedChanNodeSensorChanTransformerSensorChanTransformer.chan_id()SensorChanTransformer.chan_single()SensorChanTransformer.chan_getitem()SensorChanTransformer.chan_getslice_0b()SensorChanTransformer.chan_getslice_ab()SensorChanTransformer.chan_code()SensorChanTransformer.sensor_seq()SensorChanTransformer.fused_seq()SensorChanTransformer.fused()SensorChanTransformer.channel_rhs()SensorChanTransformer.sensor_lhs()SensorChanTransformer.nosensor_chan()SensorChanTransformer.sensor_chan()SensorChanTransformer.stream_item()SensorChanTransformer.stream()
normalize_sensor_chan()concise_sensor_chan()sensorchan_concise_parts()sensorchan_normalized_parts()
Module contents¶
The delayed image module.
Todo
The optimize logic could likley be better expressed as some sort of AST transformer.
Example
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from delayed_image import * # NOQA
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> dimg = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> quantization = {'quant_max': 255, 'nodata': 0}
>>> #
>>> # Make a complex chain of operations
>>> dimg = dimg.dequantize(quantization)
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg[0:400, 1:400]
>>> dimg = dimg.warp({'scale': 0.5})
>>> dimg = dimg[0:800, 1:800]
>>> dimg = dimg.warp({'scale': 0.5})
>>> dimg = dimg[0:800, 1:800]
>>> dimg = dimg.warp({'scale': 0.5})
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg.warp({'scale': 2.1})
>>> dimg = dimg[0:200, 1:200]
>>> dimg = dimg[1:200, 2:200]
>>> dimg.write_network_text()
╙── Crop dsize=(128,130),space_slice=(slice(1,131,None),slice(2,130,None))
╽
Crop dsize=(130,131),space_slice=(slice(0,131,None),slice(1,131,None))
╽
Warp dsize=(131,131),transform={scale=2.1000}
╽
Warp dsize=(62,62),transform={scale=1.1000}
╽
Warp dsize=(56,56),transform={scale=1.1000}
╽
Warp dsize=(50,50),transform={scale=0.5000}
╽
Crop dsize=(99,100),space_slice=(slice(0,100,None),slice(1,100,None))
╽
Warp dsize=(100,100),transform={scale=0.5000}
╽
Crop dsize=(199,200),space_slice=(slice(0,200,None),slice(1,200,None))
╽
Warp dsize=(200,200),transform={scale=0.5000}
╽
Crop dsize=(399,400),space_slice=(slice(0,400,None),slice(1,400,None))
╽
Warp dsize=(621,621),transform={scale=1.1000}
╽
Warp dsize=(564,564),transform={scale=1.1000}
╽
Dequantize dsize=(512,512),quantization={quant_max=255,nodata=0}
╽
Load channels=r|g|b,dsize=(512,512),num_overviews=3,fname=astro_overviews=3.tif
>>> # Optimize the chain
>>> dopt = dimg.optimize()
>>> dopt.write_network_text()
╙── Warp dsize=(128,130),transform={offset=(-0.6115,-1.0000),scale=1.5373}
╽
Dequantize dsize=(80,83),quantization={quant_max=255,nodata=0}
╽
Crop dsize=(80,83),space_slice=(slice(0,83,None),slice(3,83,None))
╽
Overview dsize=(128,128),overview=2
╽
Load channels=r|g|b,dsize=(512,512),num_overviews=3,fname=astro_overviews=3.tif
>>> final0 = dimg.finalize(optimize=False)
>>> final1 = dopt.finalize()
>>> assert final0.shape == final1.shape
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
Example
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from delayed_image import * # NOQA
>>> import ubelt as ub
>>> import kwimage
>>> # Sometimes we want to manipulate data in a space, but then remove all
>>> # warps in order to get a sample without any data artifacts. This is
>>> # handled by adding a new transform that inverts everything and optimizing
>>> # it, which results in all warps canceling each other out.
>>> fpath = kwimage.grab_test_image_fpath()
>>> base = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> warp = kwimage.Affine.random(rng=321, offset=0)
>>> warp = kwimage.Affine.scale(0.5)
>>> orig = base.get_overview(1).warp(warp)[16:96, 24:128]
>>> delayed = orig.optimize()
>>> print('Orig')
>>> orig.write_network_text()
>>> print('Delayed')
>>> delayed.write_network_text()
>>> # Get the transform that would bring us back to the leaf
>>> tf_root_from_leaf = delayed.get_transform_from_leaf()
>>> print('tf_root_from_leaf =\n{}'.format(ub.urepr(tf_root_from_leaf, nl=1)))
>>> undo_all = tf_root_from_leaf.inv()
>>> print('undo_all =\n{}'.format(ub.urepr(undo_all, nl=1)))
>>> undo_scale = kwimage.Affine.coerce(ub.dict_diff(undo_all.concise(), ['offset']))
>>> print('undo_scale =\n{}'.format(ub.urepr(undo_scale, nl=1)))
>>> print('Undone All')
>>> undone_all = delayed.warp(undo_all).optimize()
>>> undone_all.write_network_text()
>>> # Discard translation components
>>> print('Undone Scale')
>>> undone_scale = delayed.warp(undo_scale).optimize()
>>> undone_scale.write_network_text()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> to_stack = []
>>> to_stack.append(base.finalize(optimize=False))
>>> to_stack.append(orig.finalize(optimize=False))
>>> to_stack.append(delayed.finalize(optimize=False))
>>> to_stack.append(undone_all.finalize(optimize=False))
>>> to_stack.append(undone_scale.finalize(optimize=False))
>>> kwplot.autompl()
>>> stack = kwimage.stack_images(to_stack, axis=1, bg_value=(5, 100, 10), pad=10)
>>> kwplot.imshow(stack)
CommandLine
xdoctest -m /home/joncrall/code/kwcoco/delayed_image/__init__.py __doc__:2
Example
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from delayed_image import * # NOQA
>>> import ubelt as ub
>>> import kwimage
>>> import kwarray
>>> import numpy as np
>>> # Demo case where we have different channels at different resolutions
>>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255})
>>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate()
>>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate()
>>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate()
>>> # Align the bands in "video" space
>>> delayed_vidspace = DelayedChannelConcat([
>>> bandR.scale(6, dsize=(600, 600)).optimize(),
>>> bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize(),
>>> bandB.scale(1, dsize=(600, 600)).optimize(),
>>> ]).warp(
>>> #{'scale': 0.35, 'theta': 0.3, 'about': (30, 50), 'offset': (-10, -80)}
>>> {'scale': 0.7}
>>> )
>>> #delayed_vidspace._set_nested_params(border_value=0)
>>> vidspace_box = kwimage.Boxes([[100, 10, 270, 160]], 'ltrb')
>>> vidspace_poly = vidspace_box.to_polygons()[0]
>>> vidspace_slice = vidspace_box.to_slices()[0]
>>> crop_vidspace = delayed_vidspace[vidspace_slice]
>>> crop_vidspace._set_nested_params(interpolation='lanczos')
>>> # Note: this only works because the graph is lazilly optimized
>>> crop_vidspace_box = vidspace_box.warp(crop_vidspace._transform_from_subdata())
>>> crop_vidspace_poly = vidspace_poly.warp(crop_vidspace._transform_from_subdata())
>>> opt_crop_vidspace = crop_vidspace.optimize()
>>> print('Original: Video Space')
>>> delayed_vidspace.write_network_text()
>>> print('Original Crop: Video Space')
>>> crop_vidspace.write_network_text()
>>> print('Optimized Crop: Video Space')
>>> opt_crop_vidspace.write_network_text()
>>> tostack_grid = []
>>> # Drop boxes in asset space
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='Underlying asset bands (imagine these are on disk)'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> delayed_vidspace_opt = delayed_vidspace.optimize()
>>> tf_vidspace_to_rband = delayed_vidspace_opt.parts[0].get_transform_from_leaf().inv()
>>> tf_vidspace_to_gband = delayed_vidspace_opt.parts[1].get_transform_from_leaf().inv()
>>> tf_vidspace_to_bband = delayed_vidspace_opt.parts[2].get_transform_from_leaf().inv()
>>> rband_box = vidspace_box.warp(tf_vidspace_to_rband)
>>> gband_box = vidspace_box.warp(tf_vidspace_to_gband)
>>> bband_box = vidspace_box.warp(tf_vidspace_to_bband)
>>> rband_poly = vidspace_poly.warp(tf_vidspace_to_rband)
>>> gband_poly = vidspace_poly.warp(tf_vidspace_to_gband)
>>> bband_poly = vidspace_poly.warp(tf_vidspace_to_bband)
>>> row.append(kwimage.draw_header_text(rband_poly.draw_on(rband_box.draw_on(bandR.finalize()), edgecolor='b', fill=0), 'R'))
>>> row.append(kwimage.draw_header_text(gband_poly.draw_on(gband_box.draw_on(bandG.finalize()), edgecolor='b', fill=0), 'asset G-band'))
>>> row.append(kwimage.draw_header_text(bband_poly.draw_on(bband_box.draw_on(bandB.finalize()), edgecolor='b', fill=0), 'asset B-band'))
>>> # Draw the box in image space
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='A Box in Virtual Video Space (This space is conceptually easy to work in)'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> def _tocanvas(img):
... if img.dtype.kind == 'u':
... return img
... return kwimage.ensure_uint255(kwimage.fill_nans_with_checkers(img).clip(0, 1))
>>> row.append(kwimage.draw_header_text(vidspace_box.draw_on(_tocanvas(delayed_vidspace.finalize())), 'vidspace'))
>>> # Draw finalized aligned crops
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='Finalized delayed warp/crop. Left-to-Right: Original, Optimized, Difference'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> crop_opt_final = opt_crop_vidspace.finalize()
>>> crop_raw_final = crop_vidspace.finalize(optimize=False)
>>> row.append(crop_raw_final)
>>> row.append(crop_opt_final)
>>> row.append(kwimage.ensure_uint255(kwarray.normalize(np.linalg.norm(kwimage.ensure_float01(crop_opt_final) - kwimage.ensure_float01(crop_raw_final), axis=2))))
>>> # Get the transform that would bring us back to the leaf
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='The "Unwarped" / "Unscaled" cropped regions'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> for chosen_band in opt_crop_vidspace.parts:
>>> spec = chosen_band.channels.spec
>>> lut = {c[0]: c for c in ['red', 'green', 'blue']}
>>> color = lut[spec]
>>> print(ub.color_text('============', color))
>>> print(ub.color_text(spec, color))
>>> print(ub.color_text('============', color))
>>> chosen_band.write_network_text()
>>> tf_root_from_leaf = chosen_band.get_transform_from_leaf()
>>> tf_leaf_from_root = tf_root_from_leaf.inv()
>>> undo_all = tf_leaf_from_root
>>> undo_scale = kwimage.Affine.coerce(ub.dict_diff(undo_all.concise(), ['offset', 'theta']))
>>> print('tf_root_from_leaf = {}'.format(ub.urepr(tf_root_from_leaf.concise(), nl=1)))
>>> print('undo_all = {}'.format(ub.urepr(undo_all.concise(), nl=1)))
>>> print('undo_scale = {}'.format(ub.urepr(undo_scale.concise(), nl=1)))
>>> print('Undone All')
>>> undone_all = chosen_band.warp(undo_all, interpolation='lanczos').optimize()
>>> undone_all.write_network_text()
>>> # Discard translation components
>>> print('Undone Scale')
>>> undone_scale = chosen_band.warp(undo_scale).optimize()
>>> undone_scale.write_network_text()
>>> undone_all_canvas = undone_all.finalize()
>>> undone_scale_canvas = undone_scale.finalize()
>>> undone_all_canvas = crop_vidspace_box.warp(undo_all).draw_on(undone_all_canvas)
>>> undone_scale_canvas = crop_vidspace_box.warp(undo_scale).draw_on(undone_scale_canvas)
>>> undone_all_canvas = crop_vidspace_poly.warp(undo_all).draw_on(undone_all_canvas, edgecolor='b', fill=0)
>>> undone_scale_canvas = crop_vidspace_poly.warp(undo_scale).draw_on(undone_scale_canvas, edgecolor='b', fill=0)
>>> #row.append(kwimage.stack_images([undone_all_canvas, undone_scale_canvas], axis=0, bg_value=(5, 100, 10), pad=10))
>>> row.append(undone_all_canvas)
>>> row.append(undone_scale_canvas)
>>> print(ub.color_text('============', color))
>>> #
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> tostack_grid = [[_tocanvas(c) for c in cols] for cols in tostack_grid]
>>> tostack_rows = [kwimage.stack_images(cols, axis=1, bg_value=(5, 100, 10), pad=10) for cols in tostack_grid if cols]
>>> stack = kwimage.stack_images(tostack_rows, axis=0, bg_value=(5, 100, 10), pad=10)
>>> kwplot.imshow(stack, title='notice how the "undone all" crops are shifted to the right' + chr(10) + 'such that they align with the original image')
>>> kwplot.show_if_requested()
- class delayed_image.DelayedArray(subdata=None)[source]¶
Bases:
DelayedUnaryOperationA generic NDArray.
- Parameters:
subdata (DelayedArray)
- property shape¶
Returns: None | Tuple[int | None, …]
- class delayed_image.DelayedAsXarray(subdata=None, dsize=None, channels=None)[source]¶
Bases:
DelayedImageCasts the data to an xarray object in the finalize step
- Example;
>>> # xdoctest: +REQUIRES(module:xarray) >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> # without channels >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> self = base.as_xarray() >>> final = self._validate().finalize() >>> assert len(final.coords) == 0 >>> assert final.dims == ('y', 'x', 'c') >>> # with channels >>> base = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare() >>> self = base.as_xarray() >>> final = self._validate().finalize() >>> assert final.coords.indexes['c'].tolist() == ['r', 'g', 'b'] >>> assert final.dims == ('y', 'x', 'c')
- Parameters:
subdata (DelayedArray)
dsize (None | Tuple[int | None, int | None]) – overrides subdata dsize
channels (None | int | FusedChannelSpec) – overrides subdata channels
- class delayed_image.DelayedChannelConcat(parts, dsize=None)[source]¶
Bases:
ImageOpsMixin,DelayedConcatStacks multiple arrays together.
Example
>>> from delayed_image import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> dsize = (307, 311) >>> c1 = DelayedNans(dsize=dsize, channels='foo') >>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare() >>> cat = DelayedChannelConcat([c1, c2]) >>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332)) >>> warped_cat._validate() >>> warped_cat.finalize()
Example
>>> # Test case that failed in initial implementation >>> # Due to incorrectly pushing channel selection under the concat >>> from delayed_image import * # NOQA >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath() >>> base1 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> base2 = DelayedLoad(fpath, channels='x|y|z').prepare().scale(2) >>> base3 = DelayedLoad(fpath, channels='i|j|k').prepare().scale(2) >>> bands = [base2, base1[:, :, 0].scale(2).evaluate(), >>> base1[:, :, 1].evaluate().scale(2), >>> base1[:, :, 2].evaluate().scale(2), base3] >>> delayed = DelayedChannelConcat(bands) >>> delayed = delayed.warp({'scale': 2}) >>> delayed = delayed[0:100, 0:55, [0, 2, 4]] >>> delayed.write_network_text() >>> delayed.optimize()
- Parameters:
parts (List[DelayedArray]) – data to concat
dsize (Tuple[int, int] | None) – size if known a-priori
- property channels¶
Returns: None | FusedChannelSpec
- property shape¶
Returns: Tuple[int | None, int | None, int | None]
- take_channels(channels)[source]¶
This method returns a subset of the vision data with only the specified bands / channels.
- Parameters:
channels (List[int] | slice | channel_spec.FusedChannelSpec) – List of integers indexes, a slice, or a channel spec, which is typically a pipe (|) delimited list of channel codes. See
ChannelSpecfor more detials.- Returns:
a delayed vision operation that only operates on the following channels.
- Return type:
Example
>>> # xdoctest: +REQUIRES(module:kwcoco) >>> from delayed_image.delayed_nodes import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = delayed = dset.coco_image(1).delay() >>> channels = 'B11|B8|B1|B10' >>> new = self.take_channels(channels)
Example
>>> # xdoctest: +REQUIRES(module:kwcoco) >>> # Complex case >>> import kwcoco >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> delayed = dset.coco_image(1).delay() >>> astro = DelayedLoad.demo('astro', channels='r|g|b').prepare() >>> aligned = astro.warp(kwimage.Affine.scale(600 / 512), dsize='auto') >>> self = combo = DelayedChannelConcat(delayed.parts + [aligned]) >>> channels = 'B1|r|B8|g' >>> new = self.take_channels(channels) >>> new_cropped = new.crop((slice(10, 200), slice(12, 350))) >>> new_opt = new_cropped.optimize() >>> datas = new_opt.finalize() >>> if 1: >>> new_cropped.write_network_text(with_labels='name') >>> new_opt.write_network_text(with_labels='name') >>> vizable = kwimage.normalize_intensity(datas, axis=2) >>> self._validate() >>> new._validate() >>> new_cropped._validate() >>> new_opt._validate() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> stacked = kwimage.stack_images(vizable.transpose(2, 0, 1)) >>> kwplot.imshow(stacked)
Example
>>> # xdoctest: +REQUIRES(module:kwcoco) >>> # Test case where requested channel does not exist >>> import kwcoco >>> from delayed_image.delayed_nodes import * # NOQA >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', use_cache=1, verbose=100) >>> self = delayed = dset.coco_image(1).delay() >>> channels = 'B1|foobar|bazbiz|B8' >>> new = self.take_channels(channels) >>> new_cropped = new.crop((slice(10, 200), slice(12, 350))) >>> fused = new_cropped.finalize() >>> assert fused.shape == (190, 338, 4) >>> assert np.all(np.isnan(fused[..., 1:3])) >>> assert not np.any(np.isnan(fused[..., 0])) >>> assert not np.any(np.isnan(fused[..., 3]))
- property num_overviews¶
Returns: int
- undo_warps(remove=None, retain=None, squash_nans=False, return_warps=False)[source]¶
Attempts to “undo” warping for each concatenated channel and returns a list of delayed operations that are cropped to the right regions.
Typically you will retrain offset, theta, and shear to remove scale. This ensures the data is spatially aligned up to a scale factor.
- Parameters:
remove (List[str]) – if specified, list components of the warping to remove. Can include: “offset”, “scale”, “shearx”, “theta”. Typically set this to [“scale”].
retain (List[str]) – if specified, list components of the warping to retain. Can include: “offset”, “scale”, “shearx”, “theta”. Mutually exclusive with “remove”. If neither remove or retain is specified, retain is set to
[].squash_nans (bool) – if True, pure nan channels are squashed into a 1x1 array as they do not correspond to a real source.
return_warps (bool) – if True, return the transforms we applied. I.e. the transform from the
selfto the returnedparts. This is useful when you need to warp objects in the original space into the jagged space.
- Returns:
The List[DelayedImage] are the
partsi.e. the new images with the warping undone. The List[kwimage.Affine]: is the transforms fromselfto each item inparts- Return type:
List[DelayedImage] | Tuple[List[DelayedImage] | List[kwimage.Affine]]
Example
>>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> from delayed_image.delayed_leafs import DelayedNans >>> import ubelt as ub >>> import kwimage >>> import kwarray >>> import numpy as np >>> # Demo case where we have different channels at different resolutions >>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255}) >>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate() >>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate() >>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate() >>> bandN = DelayedNans((600, 600), channels='N') >>> # Make a concatenation of images of different underlying native resolutions >>> delayed_vidspace = DelayedChannelConcat([ >>> bandR.scale(6, dsize=(600, 600)).optimize(), >>> bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize(), >>> bandB.scale(1, dsize=(600, 600)).optimize(), >>> bandN, >>> ]).warp({'scale': 0.7}).optimize() >>> vidspace_box = kwimage.Boxes([[100, 10, 270, 160]], 'ltrb') >>> vidspace_poly = vidspace_box.to_polygons()[0] >>> vidspace_slice = vidspace_box.to_slices()[0] >>> self = delayed_vidspace[vidspace_slice].optimize() >>> print('--- Aligned --- ') >>> self.write_network_text() >>> squash_nans = True >>> undone_all_parts, tfs1 = self.undo_warps(squash_nans=squash_nans, return_warps=True) >>> undone_scale_parts, tfs2 = self.undo_warps(remove=['scale'], squash_nans=squash_nans, return_warps=True) >>> stackable_aligned = self.finalize().transpose(2, 0, 1) >>> stackable_undone_all = [] >>> stackable_undone_scale = [] >>> print('--- Undone All --- ') >>> for undone in undone_all_parts: ... undone.write_network_text() ... stackable_undone_all.append(undone.finalize()) >>> print('--- Undone Scale --- ') >>> for undone in undone_scale_parts: ... undone.write_network_text() ... stackable_undone_scale.append(undone.finalize()) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1) >>> canvas1 = kwimage.stack_images(stackable_undone_all, axis=1) >>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1) >>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Aligned Channels') >>> canvas1 = kwimage.draw_header_text(canvas1, 'Unwarped Channels') >>> canvas2 = kwimage.draw_header_text(canvas2, 'Unscaled Channels') >>> canvas = kwimage.stack_images([canvas0, canvas1, canvas2], axis=0) >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas)
- class delayed_image.DelayedConcat(parts, axis)[source]¶
Bases:
DelayedNaryOperationStacks multiple arrays together.
- Parameters:
parts (List[DelayedArray]) – data to concat
axis (int) – axes to concat on
- property shape¶
Returns: None | Tuple[int | None, …]
- class delayed_image.DelayedCrop(subdata, space_slice=None, chan_idxs=None)[source]¶
Bases:
DelayedImageCrops an image along integer pixel coordinates.
Example
>>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> # Test Fuse Crops Space Only >>> crop1 = base[4:12, 0:16] >>> self = crop1[2:6, 0:8] >>> opt = self._opt_fuse_crops() >>> self.write_network_text() >>> opt.write_network_text() >>> # >>> # Test Channel Select Via Index >>> self = base[:, :, [0]] >>> self.write_network_text() >>> final = self._finalize() >>> assert final.shape == (16, 16, 1) >>> assert base[:, :, [0, 1]].finalize().shape == (16, 16, 2) >>> assert base[:, :, [2, 0, 1]].finalize().shape == (16, 16, 3)
Example
>>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> # Test Discontiguous Channel Select Via Index >>> self = base[:, :, [0, 2]] >>> self.write_network_text() >>> final = self._finalize() >>> assert final.shape == (16, 16, 2)
- Parameters:
subdata (DelayedArray) – data to operate on
space_slice (Tuple[slice, slice]) – if speficied, take this y-slice and x-slice.
chan_idxs (List[int] | None) – if specified, take these channels / bands
- optimize()[source]¶
- Returns:
DelayedImage
Example
>>> # Test optimize nans >>> from delayed_image import DelayedNans >>> import kwimage >>> base = DelayedNans(dsize=(100, 100), channels='a|b|c') >>> self = base[0:10, 0:5] >>> # Should simply return a new nan generator >>> new = self.optimize() >>> self.write_network_text() >>> new.write_network_text() >>> assert len(new.as_graph().nodes) == 1
- class delayed_image.DelayedDequantize(subdata, quantization)[source]¶
Bases:
DelayedImageRescales image intensities from int to floats.
The output is usually between 0 and 1. This also handles transforming nodata into nan values.
- Parameters:
subdata (DelayedArray) – data to operate on
quantization (Dict) – see
delayed_image.helpers.dequantize()
- optimize()[source]¶
- Returns:
DelayedImage
Example
>>> # Test a case that caused an error in development >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> fpath = kwimage.grab_test_image_fpath() >>> base = DelayedLoad(fpath, channels='r|g|b').prepare() >>> quantization = {'quant_max': 255, 'nodata': 0} >>> self = base.get_overview(1).dequantize(quantization) >>> self.write_network_text() >>> opt = self.optimize()
- class delayed_image.DelayedFrameStack(parts)[source]¶
Bases:
DelayedStackStacks multiple arrays together.
- Parameters:
parts (List[DelayedArray]) – data to stack
- class delayed_image.DelayedIdentity(data, channels=None, dsize=None)[source]¶
Bases:
DelayedImageLeafReturns an ndarray as-is
Example
self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce(‘rgb’)) region_slices = (slice(5, 10), slice(1, 12)) delayed = self.crop(region_slices)
Example
>>> from delayed_image import * # NOQA >>> arr = kwimage.checkerboard() >>> self = DelayedIdentity(arr, channels='gray') >>> warp = self.warp({'scale': 1.07}) >>> warp.optimize().finalize()
- class delayed_image.DelayedImage(subdata=None, dsize=None, channels=None)[source]¶
Bases:
ImageOpsMixin,DelayedArrayFor the case where an array represents a 2D image with multiple channels
- Parameters:
subdata (DelayedArray)
dsize (None | Tuple[int | None, int | None]) – overrides subdata dsize
channels (None | int | FusedChannelSpec) – overrides subdata channels
- property shape¶
Returns: None | Tuple[int | None, int | None, int | None]
- property num_channels¶
Returns: None | int
- property dsize¶
Returns: None | Tuple[int | None, int | None]
- property channels¶
Returns: None | FusedChannelSpec
- property num_overviews¶
Returns: int
- take_channels(channels)[source]¶
This method returns a subset of the vision data with only the specified bands / channels.
- Parameters:
channels (List[int] | slice | channel_spec.FusedChannelSpec) – List of integers indexes, a slice, or a channel spec, which is typically a pipe (|) delimited list of channel codes. See ChannelSpec for more detials.
- Returns:
a new delayed load with a fused take channel operation
- Return type:
Note
The channel subset must exist here or it will raise an error. A better implementation (via pymbolic) might be able to do better
Example
>>> # >>> # Test Channel Select Via Code >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare() >>> channels = 'r|b' >>> new = self.take_channels(channels)._validate() >>> new2 = new[:, :, [1, 0]]._validate() >>> new3 = new2[:, :, [1]]._validate()
Example
>>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo('astro').prepare() >>> channels = [2, 0] >>> new = self.take_channels(channels) >>> new3 = new.take_channels([1, 0]) >>> new._validate() >>> new3._validate()
>>> final1 = self.finalize() >>> final2 = new.finalize() >>> final3 = new3.finalize() >>> assert np.all(final1[..., 2] == final2[..., 0]) >>> assert np.all(final1[..., 0] == final2[..., 1]) >>> assert final2.shape[2] == 2
>>> assert np.all(final1[..., 2] == final3[..., 1]) >>> assert np.all(final1[..., 0] == final3[..., 0]) >>> assert final3.shape[2] == 2
Example
>>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare() >>> # Case where a channel doesn't exist >>> channels = 'r|b|magic' >>> new = self.take_channels(channels) >>> assert len(new.parts) == 2 >>> new._validate()
- undo_warp(remove=None, retain=None, squash_nans=False, return_warp=False)[source]¶
Attempts to “undo” warping for each concatenated channel and returns a list of delayed operations that are cropped to the right regions.
Typically you will retrain offset, theta, and shear to remove scale. This ensures the data is spatially aligned up to a scale factor.
- Parameters:
remove (List[str]) – if specified, list components of the warping to remove. Can include: “offset”, “scale”, “shearx”, “theta”. Typically set this to [“scale”].
retain (List[str]) – if specified, list components of the warping to retain. Can include: “offset”, “scale”, “shearx”, “theta”. Mutually exclusive with “remove”. If neither remove or retain is specified, retain is set to
[].squash_nans (bool) – if True, pure nan channels are squashed into a 1x1 array as they do not correspond to a real source.
return_warp (bool) – if True, return the transform we applied. This is useful when you need to warp objects in the original space into the jagged space.
- SeeAlso:
DelayedChannelConcat.undo_warps
Example
>>> # Test similar to undo_warps, but on each channel separately >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> from delayed_image.delayed_leafs import DelayedNans >>> import ubelt as ub >>> import kwimage >>> import kwarray >>> import numpy as np >>> # Demo case where we have different channels at different resolutions >>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255}) >>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate() >>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate() >>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate() >>> bandN = DelayedNans((600, 600), channels='N') >>> B0 = bandR.scale(6, dsize=(600, 600)).optimize() >>> B1 = bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize() >>> B2 = bandB.scale(1, dsize=(600, 600)).optimize() >>> vidspace_box = kwimage.Boxes([[-10, -10, 270, 160]], 'ltrb').scale(1 / .7).quantize() >>> vidspace_poly = vidspace_box.to_polygons()[0] >>> vidspace_slice = vidspace_box.to_slices()[0] >>> # Test with the padded crop >>> self0 = B0.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize() >>> self1 = B1.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize() >>> self2 = B2.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize() >>> parts = [self0, self1, self2] >>> # Run the undo on each channel >>> undone_scale_parts = [d.undo_warp(remove=['scale']) for d in parts] >>> print('--- Aligned --- ') >>> stackable_aligned = [] >>> for d in parts: >>> d.write_network_text() >>> stackable_aligned.append(d.finalize()) >>> print('--- Undone Scale --- ') >>> stackable_undone_scale = [] >>> for undone in undone_scale_parts: ... undone.write_network_text() ... stackable_undone_scale.append(undone.finalize()) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1, pad=5, bg_value='kw_darkgray') >>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1, pad=5, bg_value='kw_darkgray') >>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Channels') >>> canvas2 = kwimage.draw_header_text(canvas2, 'Native Scale Channels') >>> canvas = kwimage.stack_images([canvas0, canvas2], axis=0, bg_value='kw_darkgray') >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas)
- class delayed_image.DelayedImageLeaf(subdata=None, dsize=None, channels=None)[source]¶
Bases:
DelayedImage- Parameters:
subdata (DelayedArray)
dsize (None | Tuple[int | None, int | None]) – overrides subdata dsize
channels (None | int | FusedChannelSpec) – overrides subdata channels
- class delayed_image.DelayedLoad(fpath, channels=None, dsize=None, nodata_method=None)[source]¶
Bases:
DelayedImageLeafPoints to an image on disk to be loaded.
This is the starting point for most delayed operations. Disk IO is avoided until the
finalizeoperation is called. Callingpreparecan read image headers if metadata like the image width, height, and number of channels is not provided, but most operations can be performed while these are still unknown.If a gdal backend is available, and the underlying image is in the appropriate formate (e.g. COG), finalize will return a lazy reference that enables fast overviews and crops. For image formats that do not allow for tiling / overviews, then there is no way to avoid reading entire image as an ndarray.
Example
>>> from delayed_image import * # NOQA >>> self = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> data1 = self.finalize()
Example
>>> # xdoctest: +REQUIRES(module:osgeo) >>> # Demo code to develop support for overviews >>> from delayed_image import * # NOQA >>> import kwimage >>> import ubelt as ub >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> self = DelayedLoad(fpath, channels='r|g|b').prepare() >>> print(f'self={self}') >>> print('self.meta = {}'.format(ub.repr2(self.meta, nl=1))) >>> quantization = { >>> 'quant_max': 255, >>> 'nodata': 0, >>> } >>> node0 = self >>> node1 = node0.get_overview(2) >>> node2 = node1[13:900, 11:700] >>> node3 = node2.dequantize(quantization) >>> node4 = node3.warp({'scale': 0.05}) >>> # >>> data0 = node0._validate().finalize() >>> data1 = node1._validate().finalize() >>> data2 = node2._validate().finalize() >>> data3 = node3._validate().finalize() >>> data4 = node4._validate().finalize() >>> node4.write_network_text()
Example
>>> # xdoctest: +REQUIRES(module:osgeo) >>> # Test delayed ops with int16 and nodata values >>> from delayed_image import * # NOQA >>> import kwimage >>> from delayed_image.helpers import quantize_float01 >>> import ubelt as ub >>> dpath = ub.Path.appdir('delayed_image/tests/test_delay_nodata').ensuredir() >>> fpath = dpath / 'data.tif' >>> data = kwimage.ensure_float01(kwimage.grab_test_image()) >>> poly = kwimage.Polygon.random(rng=321032).scale(data.shape[0]) >>> poly.fill(data, np.nan) >>> data_uint16, quantization = quantize_float01(data) >>> nodata = quantization['nodata'] >>> kwimage.imwrite(fpath, data_uint16, nodata=nodata, backend='gdal', overviews=3) >>> # Test loading the data >>> self = DelayedLoad(fpath, channels='r|g|b', nodata_method='float').prepare() >>> node0 = self >>> node1 = node0.dequantize(quantization) >>> node2 = node1.warp({'scale': 0.51}, interpolation='lanczos') >>> node3 = node2[13:900, 11:700] >>> node4 = node3.warp({'scale': 0.9}, interpolation='lanczos') >>> node4.write_network_text() >>> node5 = node4.optimize() >>> node5.write_network_text() >>> node6 = node5.warp({'scale': 8}, interpolation='lanczos').optimize() >>> node6.write_network_text() >>> # >>> data0 = node0._validate().finalize() >>> data1 = node1._validate().finalize() >>> data2 = node2._validate().finalize() >>> data3 = node3._validate().finalize() >>> data4 = node4._validate().finalize() >>> data5 = node5._validate().finalize() >>> data6 = node6._validate().finalize() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> stack1 = kwimage.stack_images([data1, data2, data3, data4, data5]) >>> stack2 = kwimage.stack_images([stack1, data6], axis=1) >>> kwplot.imshow(stack2)
- Parameters:
fpath (str | PathLike) – URI pointing at the image data to load
channels (int | str | FusedChannelSpec | None) – the underlying channels of the image if known a-priori
dsize (Tuple[int, int]) – The width / height of the image if known a-priori
nodata_method (str | None) – How to handle nodata values in the file itself. Can be “auto”, “float”, or “ma”.
- property fpath¶
- classmethod demo(key='astro', channels=None, dsize=None, nodata_method=None, overviews=None)[source]¶
Creates a demo DelayedLoad node that points to a file generated by
kwimage.grab_test_image_fpath().If metadata like dsize and channels are not provided, then the
prepare()can be used to auto-populate them at the cost of the disk IO to read image headers.- Parameters:
key (str) – which test image to grab. Valid choices are: astro - an astronaught carl - Carl Sagan paraview - ParaView logo stars - picture of stars in the sky
channels (str) – if specified, these channels will be stored in the delayed load metadata. Note: these are not auto-populated. Usually the key corresponds to 3-channel data,
dsize (None | Tuple[int, int]) – if specified, we will return a variant of the data with the specific dsize
nodata_method (str | None) – How to handle nodata values in the file itself. Can be “auto”, “float”, or “ma”.
overviews (None | int) – if specified, will return a variant of the data with overviews
- Returns:
DelayedLoad
Example
>>> from delayed_image.delayed_leafs import * # NOQA >>> import delayed_image >>> delayed = delayed_image.DelayedLoad.demo() >>> print(f'delayed={delayed}') >>> delayed.prepare() >>> print(f'delayed={delayed}') >>> delayed = DelayedLoad.demo(channels='r|g|b', nodata_method='float') >>> print(f'delayed={delayed}') >>> delayed.prepare() >>> print(f'delayed={delayed}') >>> delayed.finalize()
- class delayed_image.DelayedNans(dsize=None, channels=None)[source]¶
Bases:
DelayedImageLeafConstructs nan channels as needed
Example
self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce(‘rgb’)) region_slices = (slice(5, 10), slice(1, 12)) delayed = self.crop(region_slices)
Example
>>> from delayed_image.delayed_leafs import * # NOQA >>> from delayed_image import DelayedChannelConcat >>> dsize = (307, 311) >>> c1 = DelayedNans(dsize=dsize, channels='foo') >>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare() >>> cat = DelayedChannelConcat([c1, c2]) >>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332))._validate() >>> warped_cat._validate().optimize().finalize()
- class delayed_image.DelayedNodata(dsize=None, channels=None, nodata_method='float')[source]¶
Bases:
DelayedNansConstructs nan or masked array depending on what is needed
Example
>>> from delayed_image.delayed_leafs import * # NOQA >>> dsize = (307, 311) >>> self1 = DelayedNodata(dsize=dsize, channels='foo', nodata_method='float') >>> self2 = DelayedNodata(dsize=dsize, channels='foo', nodata_method='ma') >>> im1 = self1.finalize() >>> im2 = self2.finalize() >>> assert im1.dtype.kind == 'f' >>> assert not hasattr(im1, 'mask') >>> assert hasattr(im2, 'mask')
- class delayed_image.DelayedNaryOperation(parts)[source]¶
Bases:
DelayedOperationFor operations that have multiple input arrays
- class delayed_image.DelayedOperation[source]¶
Bases:
NiceRepr- as_graph(fields='auto')[source]¶
Builds the underlying graph structure as a networkx graph with human readable labels.
- Parameters:
fields (str | List[str]) – Add the specified fields as labels. If ‘auto’ then does somthing “reasonable”. If ‘all’ then shows everything. TODO: only implemented for “auto” and “all”, implement general field selection (PR Wanted).
- Returns:
networkx.DiGraph
- print_graph(fields='auto', with_labels=True, rich='auto', vertical_chains=True)[source]¶
Alias for write_network_text
- Parameters:
fields (str | List[str]) – Add the specified fields as labels. If ‘auto’ then does somthing “reasonable”. If ‘all’ then shows everything. TODO: only implemented for “auto” and “all”, implement general field selection (PR Wanted).
with_labels (bool) – set to false for no label data
rich (bool | str) – defaults to ‘auto’
vertical_chains (bool) – Defaults to True. Set to false to save vertical space at the cost of horizontal space.
- write_network_text(fields='auto', with_labels=True, rich='auto', vertical_chains=True)[source]¶
Alias for
DelayedOperation.print_graph()
- property shape¶
Returns: None | Tuple[int | None, …]
- prepare()[source]¶
If metadata is missing, perform minimal IO operations in order to prepopulate metadata that could help us better optimize the operation tree.
- Returns:
DelayedOperation
- finalize(prepare=True, optimize=True, **kwargs)[source]¶
Evaluate the operation tree in full.
- Parameters:
prepare (bool) – ensure prepare is called to ensure metadata exists if possible before optimizing. Defaults to True.
optimize (bool) – ensure the graph is optimized before loading. Default to True.
**kwargs – for backwards compatibility, these will allow for in-place modification of select nested parameters.
- Returns:
ArrayLike
Notes
Do not overload this method. Overload
DelayedOperation._finalize()instead.
- class delayed_image.DelayedOverview(subdata, overview)[source]¶
Bases:
DelayedImageDownsamples an image by a factor of two.
If the underlying image being loaded has precomputed overviews it simply loads these instead of downsampling the original image, which is more efficient.
Example
>>> # xdoctest: +REQUIRES(module:osgeo) >>> # Make a complex chain of operations and optimize it >>> from delayed_image import * # NOQA >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> dimg = DelayedLoad(fpath, channels='r|g|b').prepare() >>> dimg = dimg.get_overview(1) >>> dimg = dimg.get_overview(1) >>> dimg = dimg.get_overview(1) >>> dopt = dimg.optimize() >>> if 1: >>> import networkx as nx >>> dimg.write_network_text() >>> dopt.write_network_text() >>> print(ub.urepr(dopt.nesting(), nl=-1, sort=0)) >>> final0 = dimg._finalize()[:] >>> final1 = dopt._finalize()[:] >>> assert final0.shape == final1.shape >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw') >>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
- Parameters:
subdata (DelayedArray) – data to operate on
overview (int) – the overview to use (assuming it exists)
- property num_overviews¶
Returns: int
- class delayed_image.DelayedStack(parts, axis)[source]¶
Bases:
DelayedNaryOperationStacks multiple arrays together.
- Parameters:
parts (List[DelayedArray]) – data to stack
axis (int) – axes to stack on
- property shape¶
Returns: None | Tuple[int | None, …]
- class delayed_image.DelayedUnaryOperation(subdata)[source]¶
Bases:
DelayedOperationFor operations that have a single input array
- class delayed_image.DelayedWarp(subdata, transform, dsize='auto', antialias=True, interpolation='linear', border_value='auto', noop_eps=0)[source]¶
Bases:
DelayedImageApplies an affine transform to an image.
Example
>>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> warp1 = self.warp({'scale': 3}) >>> warp2 = warp1.warp({'theta': 0.1}) >>> warp3 = warp2._opt_fuse_warps() >>> warp3._validate() >>> print(ub.urepr(warp2.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(warp3.nesting(), nl=-1, sort=0))
- Parameters:
subdata (DelayedArray) – data to operate on
transform (ndarray | dict | kwimage.Affine) – a coercable affine matrix. See
kwimage.Affinefor details on what can be coerced.dsize (Tuple[int, int] | str) – The width / height of the output canvas. If ‘auto’, dsize is computed such that the positive coordinates of the warped image will fit in the new canvas. In this case, any pixel that maps to a negative coordinate will be clipped. This has the property that the input transformation is not modified.
antialias (bool) – if True determines if the transform is downsampling and applies antialiasing via gaussian a blur. Defaults to False
interpolation (str) – interpolation code or cv2 integer. Interpolation codes are linear, nearest, cubic, lancsoz, and area. Defaults to “linear”.
noop_eps (float) – This is the tolerance for optimizing a warp away. If the transform has all of its decomposed parameters (i.e. scale, rotation, translation, shear) less than this value, the warp node can be optimized away. Defaults to 0.
- property transform¶
Returns: kwimage.Affine
- optimize()[source]¶
- Returns:
DelayedImage
Example
>>> # Demo optimization that removes a noop warp >>> from delayed_image import DelayedLoad >>> import kwimage >>> base = DelayedLoad.demo(channels='r|g|b').prepare() >>> self = base.warp(kwimage.Affine.eye()) >>> new = self.optimize() >>> assert len(self.as_graph().nodes) == 2 >>> assert len(new.as_graph().nodes) == 1
Example
>>> # Test optimize nans >>> from delayed_image import DelayedNans >>> import kwimage >>> base = DelayedNans(dsize=(100, 100), channels='a|b|c') >>> self = base.warp(kwimage.Affine.scale(0.1)) >>> # Should simply return a new nan generator >>> new = self.optimize() >>> assert len(new.as_graph().nodes) == 1
Example
>>> # Test optimize nans >>> from delayed_image import DelayedLoad >>> import kwimage >>> base = DelayedLoad.demo(channels='r|g|b').prepare() >>> transform = kwimage.Affine.scale(1.0 + 1e-7) >>> self = base.warp(transform, dsize=base.dsize) >>> # An optimize will not remove a warp if there is any >>> # doubt if it is the identity. >>> new = self.optimize() >>> assert len(self.as_graph().nodes) == 2 >>> assert len(new.as_graph().nodes) == 2 >>> # But we can specify a threshold where it will >>> self._set_nested_params(noop_eps=1e-6) >>> new = self.optimize() >>> assert len(self.as_graph().nodes) == 2 >>> assert len(new.as_graph().nodes) == 1
- class delayed_image.ImageOpsMixin[source]¶
Bases:
object- crop(space_slice=None, chan_idxs=None, clip=True, wrap=True, pad=0)[source]¶
Crops an image along integer pixel coordinates.
- Parameters:
space_slice (Tuple[slice, slice]) – y-slice and x-slice.
chan_idxs (List[int]) – indexes of bands to take
clip (bool) – if True, the slice is interpreted normally, where it won’t go past the image extent, otherwise slicing into negative regions or past the image bounds will result in padding. Defaults to True.
wrap (bool) – if True, negative indexes “wrap around”, otherwise they are treated as is. Defaults to True.
pad (int | List[Tuple[int, int]]) – if specified, applies extra padding
- Returns:
DelayedImage
Example
>>> from delayed_image import DelayedLoad >>> import kwimage >>> self = DelayedLoad.demo().prepare() >>> self = self.dequantize({'quant_max': 255}) >>> self = self.warp({'scale': 1 / 2}) >>> pad = 0 >>> h, w = space_dims = self.dsize[::-1] >>> grid = list(ub.named_product({ >>> 'left': [0, -64], 'right': [0, 64], >>> 'top': [0, -64], 'bot': [0, 64],})) >>> grid += [ >>> {'left': 64, 'right': -64, 'top': 0, 'bot': 0}, >>> {'left': 64, 'right': 64, 'top': 0, 'bot': 0}, >>> {'left': 0, 'right': 0, 'top': 64, 'bot': -64}, >>> {'left': 64, 'right': -64, 'top': 64, 'bot': -64}, >>> ] >>> crops = [] >>> for pads in grid: >>> space_slice = (slice(pads['top'], h + pads['bot']), >>> slice(pads['left'], w + pads['right'])) >>> delayed = self.crop(space_slice) >>> crop = delayed.finalize() >>> yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0] >>> title = '[{}:{}, {}:{}]'.format(*yyxx) >>> crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray') >>> crops.append(crop_canvas) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen') >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas, title='Normal Slicing: Cropped Images With Wrap+Clipped Slices', doclf=1, fnum=1) >>> kwplot.show_if_requested()
Example
>>> # Demo the case with pads / no-clips / no-wraps >>> from delayed_image import DelayedLoad >>> import kwimage >>> self = DelayedLoad.demo().prepare() >>> self = self.dequantize({'quant_max': 255}) >>> self = self.warp({'scale': 1 / 2}) >>> pad = [(64, 128), (32, 96)] >>> pad = [(0, 20), (0, 0)] >>> pad = 0 >>> pad = 8 >>> h, w = space_dims = self.dsize[::-1] >>> grid = list(ub.named_product({ >>> 'left': [0, -64], 'right': [0, 64], >>> 'top': [0, -64], 'bot': [0, 64],})) >>> grid += [ >>> {'left': 64, 'right': -64, 'top': 0, 'bot': 0}, >>> {'left': 64, 'right': 64, 'top': 0, 'bot': 0}, >>> {'left': 0, 'right': 0, 'top': 64, 'bot': -64}, >>> {'left': 64, 'right': -64, 'top': 64, 'bot': -64}, >>> ] >>> crops = [] >>> for pads in grid: >>> space_slice = (slice(pads['top'], h + pads['bot']), >>> slice(pads['left'], w + pads['right'])) >>> delayed = self._padded_crop(space_slice, pad=pad) >>> crop = delayed.finalize(optimize=1) >>> yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0] >>> title = '[{}:{}, {}:{}]'.format(*yyxx) >>> if pad: >>> title += f'{chr(10)}pad={pad}' >>> crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray') >>> crops.append(crop_canvas) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen', resize='smaller') >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas, title='Negative Slicing: Cropped Images With clip=False wrap=False', doclf=1, fnum=2) >>> kwplot.show_if_requested()
- warp(transform, dsize='auto', **warp_kwargs)[source]¶
Applys an affine transformation to the image. See
DelayedWarp.- Parameters:
transform (ndarray | dict | kwimage.Affine) – a coercable affine matrix. See
kwimage.Affinefor details on what can be coerced.dsize (Tuple[int, int] | str) – The width / height of the output canvas. If ‘auto’, dsize is computed such that the positive coordinates of the warped image will fit in the new canvas. In this case, any pixel that maps to a negative coordinate will be clipped. This has the property that the input transformation is not modified.
antialias (bool) – if True determines if the transform is downsampling and applies antialiasing via gaussian a blur. Defaults to False
interpolation (str) – interpolation code or cv2 integer. Interpolation codes are linear, nearest, cubic, lancsoz, and area. Defaults to “linear”.
border_value (int | float | str) – if auto will be nan for float and 0 for int.
noop_eps (float) – This is the tolerance for optimizing a warp away. If the transform has all of its decomposed parameters (i.e. scale, rotation, translation, shear) less than this value, the warp node can be optimized away. Defaults to 0.
- Returns:
DelayedImage
- dequantize(quantization)[source]¶
Rescales image intensities from int to floats.
- Parameters:
quantization (Dict[str, Any]) – quantization information dictionary to undo. see
delayed_image.helpers.dequantize()Expected keys are: orig_dtype (str) orig_min (float) orig_max (float) quant_min (float) quant_max (float) nodata (None | int)- Returns:
DelayedDequantize
Example
>>> from delayed_image.delayed_leafs import DelayedLoad >>> self = DelayedLoad.demo().prepare() >>> quantization = { >>> 'orig_dtype': 'float32', >>> 'orig_min': 0, >>> 'orig_max': 1, >>> 'quant_min': 0, >>> 'quant_max': 255, >>> 'nodata': None, >>> } >>> new = self.dequantize(quantization) >>> assert self.finalize().max() > 1 >>> assert new.finalize().max() <= 1
- get_overview(overview)[source]¶
Downsamples an image by a factor of two.
- Parameters:
overview (int) – the overview to use (assuming it exists)
- Returns:
DelayedOverview
- get_transform_from(src)[source]¶
Find a transform from a given node (src) to this node (self / dst).
Given two delayed images src and dst that share a common leaf, find the transform from src to dst.
- Parameters:
src (DelayedOperation) – the other view to get a transform to. This must share a leaf with self (which is the dst).
- Returns:
The transform that warps the space of src to the space of self.
- Return type:
Example
>>> from delayed_image import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> base = DelayedLoad.demo().prepare() >>> src = base.scale(2) >>> dst = src.warp({'scale': 4, 'offset': (3, 5)}) >>> transform = dst.get_transform_from(src) >>> tf = transform.decompose() >>> assert tf['scale'] == (4, 4) >>> assert tf['offset'] == (3, 5)
Example
>>> from delayed_image import demo >>> self = demo.non_aligned_leafs() >>> leaf = list(self._leaf_paths())[0][0] >>> tf1 = self.get_transform_from(leaf) >>> tf2 = leaf.get_transform_from(self) >>> np.allclose(np.linalg.inv(tf2), tf1)
- class delayed_image.FusedSensorChanSpec(sensor, chans)[source]¶
Bases:
SensorChanSpecA single sensor a corresponding fused channels.
- property chans¶
- property spec¶
- class delayed_image.ChannelSpec(spec, parsed=None)[source]¶
Bases:
BaseChannelSpecParse and extract information about network input channel specs for early or late fusion networks.
Behaves like a dictionary of FusedChannelSpec objects
Todo
- [ ] Rename to something that indicates this is a collection of
FusedChannelSpec? MultiChannelSpec?
Note
This class name and API is in flux and subject to change.
Note
The pipe (‘|’) character represents an early-fused input stream, and order matters (it is non-communative).
The comma (‘,’) character separates different inputs streams/branches for a multi-stream/branch network which will be lated fused. Order does not matter
Example
>>> from delayed_image.channel_spec import * # NOQA >>> # Integer spec >>> ChannelSpec.coerce(3) <ChannelSpec(u0|u1|u2) ...>
>>> # single mode spec >>> ChannelSpec.coerce('rgb') <ChannelSpec(rgb) ...>
>>> # early fused input spec >>> ChannelSpec.coerce('rgb|disprity') <ChannelSpec(rgb|disprity) ...>
>>> # late fused input spec >>> ChannelSpec.coerce('rgb,disprity') <ChannelSpec(rgb,disprity) ...>
>>> # early and late fused input spec >>> ChannelSpec.coerce('rgb|ir,disprity') <ChannelSpec(rgb|ir,disprity) ...>
Example
>>> self = ChannelSpec('gray') >>> print('self.info = {}'.format(ub.urepr(self.info, nl=1))) >>> self = ChannelSpec('rgb') >>> print('self.info = {}'.format(ub.urepr(self.info, nl=1))) >>> self = ChannelSpec('rgb|disparity') >>> print('self.info = {}'.format(ub.urepr(self.info, nl=1))) >>> self = ChannelSpec('rgb|disparity,disparity') >>> print('self.info = {}'.format(ub.urepr(self.info, nl=1))) >>> self = ChannelSpec('rgb,disparity,flowx|flowy') >>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
Example
>>> specs = [ >>> 'rgb', # and rgb input >>> 'rgb|disprity', # rgb early fused with disparity >>> 'rgb,disprity', # rgb early late with disparity >>> 'rgb|ir,disprity', # rgb early fused with ir and late fused with disparity >>> 3, # 3 unknown channels >>> ] >>> for spec in specs: >>> print('=======================') >>> print('spec = {!r}'.format(spec)) >>> # >>> self = ChannelSpec.coerce(spec) >>> print('self = {!r}'.format(self)) >>> sizes = self.sizes() >>> print('sizes = {!r}'.format(sizes)) >>> print('self.info = {}'.format(ub.urepr(self.info, nl=1))) >>> # >>> item = self._demo_item((1, 1), rng=0) >>> inputs = self.encode(item) >>> components = self.decode(inputs) >>> input_shapes = ub.map_vals(lambda x: x.shape, inputs) >>> component_shapes = ub.map_vals(lambda x: x.shape, components) >>> print('item = {}'.format(ub.urepr(item, precision=1))) >>> print('inputs = {}'.format(ub.urepr(inputs, precision=1))) >>> print('input_shapes = {}'.format(ub.urepr(input_shapes))) >>> print('components = {}'.format(ub.urepr(components, precision=1))) >>> print('component_shapes = {}'.format(ub.urepr(component_shapes, nl=1)))
- property spec¶
- property info¶
- classmethod coerce(data)[source]¶
Attempt to interpret the data as a channel specification
- Returns:
ChannelSpec
Example
>>> from delayed_image.channel_spec import * # NOQA >>> data = FusedChannelSpec.coerce(3) >>> assert ChannelSpec.coerce(data).spec == 'u0|u1|u2' >>> data = ChannelSpec.coerce(3) >>> assert data.spec == 'u0|u1|u2' >>> assert ChannelSpec.coerce(data).spec == 'u0|u1|u2' >>> data = ChannelSpec.coerce('u:3') >>> assert data.normalize().spec == 'u.0|u.1|u.2'
- parse()[source]¶
Build internal representation
Example
>>> from delayed_image.channel_spec import * # NOQA >>> self = ChannelSpec('b1|b2|b3|rgb,B:3') >>> print(self.parse()) >>> print(self.normalize().parse()) >>> ChannelSpec('').parse()
Example
>>> base = ChannelSpec('rgb|disparity,flowx|r|flowy') >>> other = ChannelSpec('rgb') >>> self = base.intersection(other) >>> assert self.numel() == 4
- concise()[source]¶
Example
>>> self = ChannelSpec('b1|b2,b3|rgb|B.0,B.1|B.2') >>> print(self.concise().spec) b1|b2,b3|r|g|b|B.0,B.1:3
- normalize()[source]¶
Replace aliases with explicit single-band-per-code specs
- Returns:
normalized spec
- Return type:
Example
>>> self = ChannelSpec('b1|b2,b3|rgb,B:3') >>> normed = self.normalize() >>> print('self = {}'.format(self)) >>> print('normed = {}'.format(normed)) self = <ChannelSpec(b1|b2,b3|rgb,B:3)> normed = <ChannelSpec(b1|b2,b3|r|g|b,B.0|B.1|B.2)>
- fuse()[source]¶
Fuse all parts into an early fused channel spec
- Returns:
FusedChannelSpec
Example
>>> from delayed_image.channel_spec import * # NOQA >>> self = ChannelSpec.coerce('b1|b2,b3|rgb,B:3') >>> fused = self.fuse() >>> print('self = {}'.format(self)) >>> print('fused = {}'.format(fused)) self = <ChannelSpec(b1|b2,b3|rgb,B:3)> fused = <FusedChannelSpec(b1|b2|b3|rgb|B:3)>
- streams()[source]¶
Breaks this spec up into one spec for each early-fused input stream
Example
self = ChannelSpec.coerce(‘r|g,B1|B2,fx|fy’) list(map(len, self.streams()))
- as_path()[source]¶
Returns a string suitable for use in a path.
Note, this may no longer be a valid channel spec
Example
>>> from delayed_image.channel_spec import * >>> self = ChannelSpec('rgb|disparity,flowx|r|flowy') >>> self.as_path() rgb_disparity,flowx_r_flowy
- difference(other)[source]¶
Set difference. Remove all instances of other channels from this set of channels.
Example
>>> from delayed_image.channel_spec import * >>> self = ChannelSpec('rgb|disparity,flowx|r|flowy') >>> other = ChannelSpec('rgb') >>> print(self.difference(other)) >>> other = ChannelSpec('flowx') >>> print(self.difference(other)) <ChannelSpec(disparity,flowx|flowy)> <ChannelSpec(r|g|b|disparity,r|flowy)>
Example
>>> from delayed_image.channel_spec import * >>> self = ChannelSpec('a|b,c|d') >>> new = self - {'a', 'b'} >>> len(new.sizes()) == 1 >>> empty = new - 'c|d' >>> assert empty.numel() == 0
- intersection(other)[source]¶
Set difference. Remove all instances of other channels from this set of channels.
Example
>>> from delayed_image.channel_spec import * >>> self = ChannelSpec('rgb|disparity,flowx|r|flowy') >>> other = ChannelSpec('rgb') >>> new = self.intersection(other) >>> print(new) >>> print(new.numel()) >>> other = ChannelSpec('flowx') >>> new = self.intersection(other) >>> print(new) >>> print(new.numel()) <ChannelSpec(r|g|b,r)> 4 <ChannelSpec(flowx)> 1
- union(other)[source]¶
Union simply tags on a second channel spec onto this one. Duplicates are maintained.
Example
>>> from delayed_image.channel_spec import * >>> self = ChannelSpec('rgb|disparity,flowx|r|flowy') >>> other = ChannelSpec('rgb') >>> new = self.union(other) >>> print(new) >>> print(new.numel()) >>> other = ChannelSpec('flowx') >>> new = self.union(other) >>> print(new) >>> print(new.numel()) <ChannelSpec(r|g|b|disparity,flowx|r|flowy,r|g|b)> 10 <ChannelSpec(r|g|b|disparity,flowx|r|flowy,flowx)> 8
- sizes()[source]¶
Number of dimensions for each fused stream channel
IE: The EARLY-FUSED channel sizes
Example
>>> self = ChannelSpec('rgb|disparity,flowx|flowy,B:10') >>> self.normalize().concise() >>> self.sizes()
- encode(item, axis=0, mode=1)[source]¶
Given a dictionary containing preloaded components of the network inputs, build a concatenated (fused) network representations of each input stream.
- Parameters:
item (Dict[str, Tensor]) – a batch item containing unfused parts. each key should be a single-stream (optionally early fused) channel key.
axis (int, default=0) – concatenation dimension
- Returns:
mapping between input stream and its early fused tensor input.
- Return type:
Dict[str, Tensor]
Example
>>> from delayed_image.channel_spec import * # NOQA >>> import numpy as np >>> dims = (4, 4) >>> item = { >>> 'rgb': np.random.rand(3, *dims), >>> 'disparity': np.random.rand(1, *dims), >>> 'flowx': np.random.rand(1, *dims), >>> 'flowy': np.random.rand(1, *dims), >>> } >>> # Complex Case >>> self = ChannelSpec('rgb,disparity,rgb|disparity|flowx|flowy,flowx|flowy') >>> fused = self.encode(item) >>> input_shapes = ub.map_vals(lambda x: x.shape, fused) >>> print('input_shapes = {}'.format(ub.urepr(input_shapes, nl=1))) >>> # Simpler case >>> self = ChannelSpec('rgb|disparity') >>> fused = self.encode(item) >>> input_shapes = ub.map_vals(lambda x: x.shape, fused) >>> print('input_shapes = {}'.format(ub.urepr(input_shapes, nl=1)))
Example
>>> # Case where we have to break up early fused data >>> import numpy as np >>> dims = (40, 40) >>> item = { >>> 'rgb|disparity': np.random.rand(4, *dims), >>> 'flowx': np.random.rand(1, *dims), >>> 'flowy': np.random.rand(1, *dims), >>> } >>> # Complex Case >>> self = ChannelSpec('rgb,disparity,rgb|disparity,rgb|disparity|flowx|flowy,flowx|flowy,flowx,disparity') >>> inputs = self.encode(item) >>> input_shapes = ub.map_vals(lambda x: x.shape, inputs) >>> print('input_shapes = {}'.format(ub.urepr(input_shapes, nl=1)))
>>> # xdoctest: +REQUIRES(--bench) >>> #self = ChannelSpec('rgb|disparity,flowx|flowy') >>> import timerit >>> ti = timerit.Timerit(100, bestof=10, verbose=2) >>> for timer in ti.reset('mode=simple'): >>> with timer: >>> inputs = self.encode(item, mode=0) >>> for timer in ti.reset('mode=minimize-concat'): >>> with timer: >>> inputs = self.encode(item, mode=1)
- decode(inputs, axis=1)[source]¶
break an early fused item into its components
- Parameters:
inputs (Dict[str, Tensor]) – dictionary of components
axis (int, default=1) – channel dimension
Example
>>> from delayed_image.channel_spec import * # NOQA >>> import numpy as np >>> dims = (4, 4) >>> item_components = { >>> 'rgb': np.random.rand(3, *dims), >>> 'ir': np.random.rand(1, *dims), >>> } >>> self = ChannelSpec('rgb|ir') >>> item_encoded = self.encode(item_components) >>> batch = {k: np.concatenate([v[None, :], v[None, :]], axis=0) ... for k, v in item_encoded.items()} >>> components = self.decode(batch)
Example
>>> # xdoctest: +REQUIRES(module:netharn, module:torch) >>> import torch >>> import numpy as np >>> dims = (4, 4) >>> components = { >>> 'rgb': np.random.rand(3, *dims), >>> 'ir': np.random.rand(1, *dims), >>> } >>> components = ub.map_vals(torch.from_numpy, components) >>> self = ChannelSpec('rgb|ir') >>> encoded = self.encode(components) >>> from netharn.data import data_containers >>> item = {k: data_containers.ItemContainer(v, stack=True) >>> for k, v in encoded.items()} >>> batch = data_containers.container_collate([item, item]) >>> components = self.decode(batch)
- component_indices(axis=2)[source]¶
Look up component indices within fused streams
Example
>>> dims = (4, 4) >>> inputs = ['flowx', 'flowy', 'disparity'] >>> self = ChannelSpec('disparity,flowx|flowy') >>> component_indices = self.component_indices() >>> print('component_indices = {}'.format(ub.urepr(component_indices, nl=1))) component_indices = { 'disparity': ('disparity', (slice(None, None, None), slice(None, None, None), slice(0, 1, None))), 'flowx': ('flowx|flowy', (slice(None, None, None), slice(None, None, None), slice(0, 1, None))), 'flowy': ('flowx|flowy', (slice(None, None, None), slice(None, None, None), slice(1, 2, None))), }
- class delayed_image.FusedChannelSpec(parsed, _is_normalized=False)[source]¶
Bases:
BaseChannelSpecA specific type of channel spec with only one early fused stream.
The channels in this stream are non-communative
Behaves like a list of atomic-channel codes (which may represent more than 1 channel), normalized codes always represent exactly 1 channel.
Note
This class name and API is in flux and subject to change.
Todo
A special code indicating a name and some number of bands that that names contains, this would primarilly be used for large numbers of channels produced by a network. Like:
resnet_d35d060_L5:512
or
resnet_d35d060_L5[:512]
might refer to a very specific (hashed) set of resnet parameters with 512 bands
maybe we can do something slicly like:
resnet_d35d060_L5[A:B] resnet_d35d060_L5:A:B
Do we want to “just store the code” and allow for parsing later?
Or do we want to ensure the serialization is parsed before we construct the data structure?
Example
>>> from delayed_image.channel_spec import * # NOQA >>> import pickle >>> self = FusedChannelSpec.coerce(3) >>> recon = pickle.loads(pickle.dumps(self)) >>> self = ChannelSpec.coerce('a|b,c|d') >>> recon = pickle.loads(pickle.dumps(self))
- property spec¶
- classmethod coerce(data)[source]¶
Example
>>> from delayed_image.channel_spec import * # NOQA >>> FusedChannelSpec.coerce(['a', 'b', 'c']) >>> FusedChannelSpec.coerce('a|b|c') >>> FusedChannelSpec.coerce(3) >>> FusedChannelSpec.coerce(FusedChannelSpec(['a'])) >>> assert FusedChannelSpec.coerce('').numel() == 0
- concise()[source]¶
Shorted the channel spec by de-normaliz slice syntax
- Returns:
concise spec
- Return type:
Example
>>> from delayed_image.channel_spec import * # NOQA >>> self = FusedChannelSpec.coerce( >>> 'b|a|a.0|a.1|a.2|a.5|c|a.8|a.9|b.0:3|c.0') >>> short = self.concise() >>> long = short.normalize() >>> numels = [c.numel() for c in [self, short, long]] >>> print('self.spec = {!r}'.format(self.spec)) >>> print('short.spec = {!r}'.format(short.spec)) >>> print('long.spec = {!r}'.format(long.spec)) >>> print('numels = {!r}'.format(numels)) self.spec = 'b|a|a.0|a.1|a.2|a.5|c|a.8|a.9|b.0:3|c.0' short.spec = 'b|a|a:3|a.5|c|a.8:10|b:3|c.0' long.spec = 'b|a|a.0|a.1|a.2|a.5|c|a.8|a.9|b.0|b.1|b.2|c.0' numels = [13, 13, 13] >>> assert long.concise().spec == short.spec
- normalize()[source]¶
Replace aliases with explicit single-band-per-code specs
- Returns:
normalize spec
- Return type:
Example
>>> from delayed_image.channel_spec import * # NOQA >>> self = FusedChannelSpec.coerce('b1|b2|b3|rgb') >>> normed = self.normalize() >>> print('self = {}'.format(self)) >>> print('normed = {}'.format(normed)) self = <FusedChannelSpec(b1|b2|b3|rgb)> normed = <FusedChannelSpec(b1|b2|b3|r|g|b)> >>> self = FusedChannelSpec.coerce('B:1:11') >>> normed = self.normalize() >>> print('self = {}'.format(self)) >>> print('normed = {}'.format(normed)) self = <FusedChannelSpec(B:1:11)> normed = <FusedChannelSpec(B.1|B.2|B.3|B.4|B.5|B.6|B.7|B.8|B.9|B.10)> >>> self = FusedChannelSpec.coerce('B.1:11') >>> normed = self.normalize() >>> print('self = {}'.format(self)) >>> print('normed = {}'.format(normed)) self = <FusedChannelSpec(B.1:11)> normed = <FusedChannelSpec(B.1|B.2|B.3|B.4|B.5|B.6|B.7|B.8|B.9|B.10)>
- sizes()[source]¶
Returns a list indicating the size of each atomic code
- Returns:
List[int]
Example
>>> from delayed_image.channel_spec import * # NOQA >>> self = FusedChannelSpec.coerce('b1|Z:3|b2|b3|rgb') >>> self.sizes() [1, 3, 1, 1, 3] >>> assert(FusedChannelSpec.parse('a.0').numel()) == 1 >>> assert(FusedChannelSpec.parse('a:0').numel()) == 0 >>> assert(FusedChannelSpec.parse('a:1').numel()) == 1
- to_set()¶
- to_oset()¶
- to_list()¶
- as_path()[source]¶
Returns a string suitable for use in a path.
Note, this may no longer be a valid channel spec
Example
>>> from delayed_image.channel_spec import * # NOQA >>> self = FusedChannelSpec.coerce('b1|Z:3|b2|b3|rgb') >>> self.as_path() b1_Z..3_b2_b3_rgb
- difference(other)[source]¶
Set difference
Example
>>> FCS = FusedChannelSpec.coerce >>> self = FCS('rgb|disparity|flowx|flowy') >>> other = FCS('r|b') >>> self.difference(other) >>> other = FCS('flowx') >>> self.difference(other) >>> FCS = FusedChannelSpec.coerce >>> assert len((FCS('a') - {'a'}).parsed) == 0 >>> assert len((FCS('a.0:3') - {'a.0'}).parsed) == 2
- intersection(other)[source]¶
Example
>>> FCS = FusedChannelSpec.coerce >>> self = FCS('rgb|disparity|flowx|flowy') >>> other = FCS('r|b|XX') >>> self.intersection(other)
- union(other)[source]¶
Example
>>> from delayed_image.channel_spec import * # NOQA >>> FCS = FusedChannelSpec.coerce >>> self = FCS('rgb|disparity|flowx|flowy') >>> other = FCS('r|b|XX') >>> self.union(other)
- component_indices(axis=2)[source]¶
Look up component indices within this stream
Example
>>> FCS = FusedChannelSpec.coerce >>> self = FCS('disparity|rgb|flowx|flowy') >>> component_indices = self.component_indices() >>> print('component_indices = {}'.format(ub.urepr(component_indices, nl=1, _dict_sort_behavior='old'))) component_indices = { 'disparity': (slice(...), slice(...), slice(0, 1, None)), 'flowx': (slice(...), slice(...), slice(4, 5, None)), 'flowy': (slice(...), slice(...), slice(5, 6, None)), 'rgb': (slice(...), slice(...), slice(1, 4, None)), }
- streams()[source]¶
Idempotence with
ChannelSpec.streams()
- fuse()[source]¶
Idempotence with
ChannelSpec.streams()
- class delayed_image.SensorChanSpec(spec: str)[source]¶
Bases:
NiceReprThe public facing API for the sensor / channel specification
Example
>>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> self = SensorChanSpec('(L8,S2):BGR,WV:BGR,S2:nir,L8:land.0:4') >>> s1 = self.normalize() >>> s2 = self.concise() >>> streams = self.streams() >>> print(s1) >>> print(s2) >>> print('streams = {}'.format(ub.urepr(streams, sv=1, nl=1))) L8:BGR,S2:BGR,WV:BGR,S2:nir,L8:land.0|land.1|land.2|land.3 (L8,S2,WV):BGR,L8:land:4,S2:nir streams = [ L8:BGR, S2:BGR, WV:BGR, S2:nir, L8:land.0|land.1|land.2|land.3, ]
Example
>>> # Check with generic sensors >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> import delayed_image >>> self = SensorChanSpec('(*):BGR,*:BGR,*:nir,*:land.0:4') >>> self.concise().normalize() >>> s1 = self.normalize() >>> s2 = self.concise() >>> print(s1) >>> print(s2) *:BGR,*:BGR,*:nir,*:land.0|land.1|land.2|land.3 (*):BGR,*:(nir,land:4) >>> import delayed_image >>> c = delayed_image.ChannelSpec.coerce('BGR,BGR,nir,land.0:8') >>> c1 = c.normalize() >>> c2 = c.concise() >>> print(c1) >>> print(c2)
Example
>>> # Check empty channels >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> import delayed_image >>> print(SensorChanSpec('*:').normalize()) *: >>> print(SensorChanSpec('sen:').normalize()) sen: >>> print(SensorChanSpec('sen:').normalize().concise()) sen: >>> print(SensorChanSpec('sen:').concise().normalize().concise()) sen:
- classmethod coerce(data)[source]¶
Attempt to interpret the data as a channel specification
- Returns:
SensorChanSpec
Example
>>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import * # NOQA >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> data = SensorChanSpec.coerce(3) >>> assert SensorChanSpec.coerce(data).normalize().spec == '*:u0|u1|u2' >>> data = SensorChanSpec.coerce(3) >>> assert data.spec == 'u0|u1|u2' >>> assert SensorChanSpec.coerce(data).spec == 'u0|u1|u2' >>> data = SensorChanSpec.coerce('u:3') >>> assert data.normalize().spec == '*:u.0|u.1|u.2'
- concise()[source]¶
Example
>>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image import SensorChanSpec >>> a = SensorChanSpec.coerce('Cam1:(red,blue)') >>> b = SensorChanSpec.coerce('Cam2:(blue,green)') >>> c = (a + b).concise() >>> print(c) (Cam1,Cam2):blue,Cam1:red,Cam2:green >>> # Note the importance of parenthesis in the previous example >>> # otherwise channels will be assigned to `*` the generic sensor. >>> a = SensorChanSpec.coerce('Cam1:red,blue') >>> b = SensorChanSpec.coerce('Cam2:blue,green') >>> c = (a + b).concise() >>> print(c) (*,Cam2):blue,*:green,Cam1:red
- streams()[source]¶
- Returns:
List of sensor-names and fused channel specs
- Return type:
List[FusedSensorChanSpec]
- late_fuse(*others)[source]¶
Example
>>> # xdoctest: +REQUIRES(module:lark) >>> import delayed_image >>> from delayed_image import sensorchan_spec >>> import delayed_image >>> delayed_image.SensorChanSpec = sensorchan_spec.SensorChanSpec # hack for 3.6 >>> a = delayed_image.SensorChanSpec.coerce('A|B|C,edf') >>> b = delayed_image.SensorChanSpec.coerce('A12') >>> c = delayed_image.SensorChanSpec.coerce('') >>> d = delayed_image.SensorChanSpec.coerce('rgb') >>> print(a.late_fuse(b).spec) >>> print((a + b).spec) >>> print((b + a).spec) >>> print((a + b + c).spec) >>> print(sum([a, b, c, d]).spec) A|B|C,edf,A12 A|B|C,edf,A12 A12,A|B|C,edf A|B|C,edf,A12 A|B|C,edf,A12,rgb >>> import delayed_image >>> a = delayed_image.SensorChanSpec.coerce('A|B|C,edf').normalize() >>> b = delayed_image.SensorChanSpec.coerce('A12').normalize() >>> c = delayed_image.SensorChanSpec.coerce('').normalize() >>> d = delayed_image.SensorChanSpec.coerce('rgb').normalize() >>> print(a.late_fuse(b).spec) >>> print((a + b).spec) >>> print((b + a).spec) >>> print((a + b + c).spec) >>> print(sum([a, b, c, d]).spec) *:A|B|C,*:edf,*:A12 *:A|B|C,*:edf,*:A12 *:A12,*:A|B|C,*:edf *:A|B|C,*:edf,*:A12,*: *:A|B|C,*:edf,*:A12,*:,*:rgb >>> print((a.late_fuse(b)).concise()) >>> print(((a + b)).concise()) >>> print(((b + a)).concise()) >>> print(((a + b + c)).concise()) >>> print((sum([a, b, c, d])).concise()) *:(A|B|C,edf,A12) *:(A|B|C,edf,A12) *:(A12,A|B|C,edf) *:(A|B|C,edf,A12,) *:(A|B|C,edf,A12,,r|g|b)
Example
>>> # Test multi-arg case >>> import delayed_image >>> a = delayed_image.SensorChanSpec.coerce('A|B|C,edf') >>> b = delayed_image.SensorChanSpec.coerce('A12') >>> c = delayed_image.SensorChanSpec.coerce('') >>> d = delayed_image.SensorChanSpec.coerce('rgb') >>> others = [b, c, d] >>> print(a.late_fuse(*others).spec) >>> print(delayed_image.SensorChanSpec.late_fuse(a, b, c, d).spec) A|B|C,edf,A12,rgb A|B|C,edf,A12,rgb
- matching_sensor(sensor)[source]¶
Get the components corresponding to a specific sensor
- Parameters:
sensor (str) – the name of the sensor to match
Example
>>> # xdoctest: +REQUIRES(module:lark) >>> import delayed_image >>> self = delayed_image.SensorChanSpec.coerce('(S1,S2):(a|b|c),S2:c|d|e') >>> sensor = 'S2' >>> new = self.matching_sensor(sensor) >>> print(f'new={new}') new=S2:a|b|c,S2:c|d|e >>> print(self.matching_sensor('S1')) S1:a|b|c >>> print(self.matching_sensor('S3')) S3:
- property chans¶
Returns the channel-only spec, ONLY if all of the sensors are the same
Example
>>> # xdoctest: +REQUIRES(module:lark) >>> import delayed_image >>> self = delayed_image.SensorChanSpec.coerce('(S1,S2):(a|b|c),S2:c|d|e') >>> import pytest >>> with pytest.raises(Exception): >>> self.chans >>> print(self.matching_sensor('S1').chans.spec) >>> print(self.matching_sensor('S2').chans.spec) a|b|c a|b|c,c|d|e