import torch
from torch import nn
import numpy as np
import math
from .. import numerical_filters as nf
from .. import variables
from ..base import Layer
from .. import _get_default_resolution
TIME_DIMENSION = 2
X_DIMENSION = 3
Y_DIMENSION = 4
__all__ = ['TimePadding','Delay','VariableDelay','Conv3d','Conv2d','Conv1d','RF','L','LN',
'TemporalLowPassFilterRecursive','TemporalHighPassFilterRecursive','SpatialRecursiveFilter',
'SmoothConv','NLRectify','NLSquare','NLRectifyScale','NLRectifySquare',
'Sum','sum','Diff']
[docs]class TimePadding(Layer):
"""
Remembers references to previous time slices
and prepends the input with `length` many
time steps from previous calls.
If the size of the image is changed without
removing the state first, an Exception is
raised.
To avoid this, call :meth:`~convis.base.Layer.clear_state()`. This method is recursive
on all :class:`convis.base.Layer` s, so you only have to call it on the
outermost :class:`~convis.base.Layer`.
If you want to store your history for one set of images,
do some computation on other images and then return to
the previous one, you can use :meth:`~convis.base.Layer.push_state()` and :meth:`~convis.base.Layer.pop_state()`.
Parameters
----------
length : int
The number of frames that should be prepended to each slice
mode : str
The behaviour if the buffer does not contain enough frames:
- `'mirror'` (default) appends the time reversed input until buffer is filled enough
- `'full_copy'` appends the input until buffer is full enough
- `'first_frame'` appends copies of the first frame of the input
- `'mean'` fills the buffer with the mean value of the input
- `'ones'` fills the buffer with ones
- `'zeros'` fills the buffer with zeros
"""
def __init__(self,length=0,mode='mirror'):
self.dim = 5
self.length = length
super(TimePadding, self).__init__()
self.register_state('saved_inputs',[])
self.saved_inputs = []
self.mode = 'first_frame'
@property
def available_length(self):
return np.sum([i.size()[TIME_DIMENSION] for i in self.saved_inputs])
def forward(self, x):
if len(self.saved_inputs) > 0:
if x.size()[-2:] != self.saved_inputs[-1].size()[-2:]:
raise Exception('input size '+str(x.size()[-2:])+' does not match state size ('+str(self.saved_inputs[-1].size()[-2:])+')! Call `.clear_state()` on your model first!')
while self.available_length < self.length:
if self.mode is 'full_copy':
self.saved_inputs.append(x)
elif self.mode is 'mirror':
self.saved_inputs.append(variables.Variable(x.numpy()[:,:,::-1,:,:]))
elif self.mode is 'first_frame':
self.saved_inputs.append(x[:,:,:1,:,:])
elif self.mode is 'mean':
self.saved_inputs.append(torch.ones_like(x)*x.mean())
elif self.mode is 'ones':
self.saved_inputs.append(torch.ones_like(x))
elif self.mode is 'zeros':
self.saved_inputs.append(torch.zeros_like(x))
else:
raise Exception("TimePadding argument `mode`='%s' not recognized!."%(self.mode,))
if self._use_cuda:
x_pad = torch.cat([i.cuda().detach() for i in self.saved_inputs] + [x.cuda()], dim=TIME_DIMENSION)
else:
x_pad = torch.cat([i.cpu().detach() for i in self.saved_inputs] +[x.cpu()], dim=TIME_DIMENSION)
while self.available_length > self.length + x.size(TIME_DIMENSION):
self.saved_inputs.pop(0)
self.saved_inputs.append(x)
return x_pad[:,:,-(self.length + x.size(TIME_DIMENSION)):,:,:]
[docs]class Delay(Layer):
"""
Causes the input to be delayed by a set
number of time steps.
d = Delay(delay=100)
d.run(some_input,10)
Optionally, a length of input can also be prependet
similar to the TimePadding Layer.
d = Delay(delay=100,length=10) # additionally preprends 10 timesteps of each previous chunk
d.run(some_input,10)
When the size of the image is changed, the previous inputs
do not match, so an Exception is raised.
To avoid this, call `.clear_state()`. This method is recursive
on all `convis.Layers`, so you only have to call it on the
outermost `Layer`.
If you want to store your history for one set of images,
do some computation on other images and then return to
the previous one, you can use `.push_state()` and `.pop_state()`.
"""
def __init__(self,delay=0,length=0):
self.dim = 5
self.length = length
self.delay = delay
super(Delay, self).__init__()
self.register_state('saved_inputs',[])
self.saved_inputs = []
@property
def available_length(self):
return np.sum([i.size()[TIME_DIMENSION] for i in self.saved_inputs])
def forward(self, x):
if len(self.saved_inputs) > 0:
if x.size()[-2:] != self.saved_inputs[-1].size()[-2:]:
raise Exception('input size does not match state size! Call `.clear_state()` on your model first!')
while self.available_length < self.length + self.delay:
self.saved_inputs.append(torch.zeros_like(x))
if self._use_cuda:
x_pad = torch.cat([i.cuda().detach() for i in self.saved_inputs] + [x.cuda()], dim=TIME_DIMENSION)
else:
x_pad = torch.cat([i.cpu().detach() for i in self.saved_inputs] +[x.cpu()], dim=TIME_DIMENSION)
while self.available_length > self.length + x.size(TIME_DIMENSION) + self.delay:
self.saved_inputs.pop(0)
self.saved_inputs.append(x)
to = -self.delay if self.delay > 0 else None
return x_pad[:,:,-(self.length + x.size(TIME_DIMENSION) + self.delay):to,:,:]
[docs]class VariableDelay(Layer):
"""
This Layer applies variable delays to each
pixel of the input.
Example::
v = VariableDelay(delays = d)
At the moment, the delays do *not* provide a gradient.
Possible future feature if requested:
variable delay per pixel, channel and batch dimension.
"""
def __init__(self, delays = None):
super(VariableDelay, self).__init__()
if delays is None:
delays = torch.zeros((1,1,1,1,1))
self.delays = variables.Parameter(delays)
self.all_delay = Delay()
def forward(self, x):
if self.delays is None or self.delays.size()[-2:] != x.size()[-2:]:
self.delays = variables.Parameter(variables.ones(x.size()[-2:]))
self.all_delay.delay = int(torch.min(self.delays))
self.all_delay.length = int(torch.max(self.delays)-torch.min(self.delays))
x_delayed = self.all_delay(x)
ind_to = -(self.delays - int(torch.min(self.delays)))
ind_from = -(-ind_to + x.size()[2])
x_out = []
for i,x_row in enumerate(x_delayed.split(1,-1)):
new_row = []
for j,x_pixel in enumerate(x_row.split(1,-2)):
#print int(ind_from[i,j]),int(ind_to[i,j])
to = int(ind_to[i,j])
if to == 0:
to = None
new_row.append(x_pixel[:,:,int(ind_from[i,j]):to,:,:])
x_out.append(torch.cat(new_row,dim=-2))
x_out = torch.cat(x_out,dim=-1)
return x_out
[docs]class Conv3d(torch.nn.Conv3d,Layer):
"""
Does a convolution, but pads the input in time
with previous input and in space by replicating
the edge.
Arguments:
* in_channels
* out_channels
* kernel_size
* bias (bool)
Additional PyTorch Conv3d keyword arguments:
* padding (should not be used)
* stride
* dilation
* groups
Additional convis Conv3d keyword arguments:
* time_pad: True (enables padding in time)
* autopad: True (enables padding in space)
To change the weight, use the method `set_weight()`
which also accepts numpy arguments.
See Also
--------
torch.nn.Conv3d
Conv1d
Conv2d
RF
"""
def __init__(self,in_channels=1,out_channels=1,kernel_size=(1,1,1),bias=True,*args,**kwargs):
self.do_adjust_padding = kwargs.get('adjust_padding',False)
self.do_time_pad = kwargs.get('time_pad',True)
self.autopad = kwargs.get('autopad',True)
self.autopad_mode = 'replicate'
if 'adjust_padding' in kwargs.keys():
del kwargs['adjust_padding']
if 'time_pad' in kwargs.keys():
del kwargs['time_pad']
if 'autopad' in kwargs.keys():
del kwargs['autopad']
super(Conv3d, self).__init__(in_channels,out_channels,kernel_size,bias=bias,*args,**kwargs)
if hasattr(self,'bias') and self.bias is not None:
self.bias.data[0] = 0.0
#self.weight.data = torch.zeros(self.weight.data.shape)
#self.w = self.weight
self.weight = variables.Parameter(np.zeros(self.weight.data.shape),
doc="""The weight tensor of the convolution""")
if bias is True:
self.bias = variables.Parameter(np.zeros(self.bias.data.shape),
doc="""The bias of the convolution""")
self.time_pad = TimePadding(self.weight.size()[TIME_DIMENSION])
[docs] def set_weight(self,w,normalize=False,preserve_channels=False,flip=True):
"""
Sets a new weight for the convolution.
Parameters
----------
w: numpy array or PyTorch Tensor
The new kernel `w` should have 1,2,3 or 5 dimensions.
1 dimensions: temporal kernel
2 dimensions: spatial kernel
3 dimensions: spatio-temporal kernel (time,x,y)
5 dimensions: spatio-temporal kernels for multiple channels
(out_channels, in_channels, time, x, y)
If the new kernel has 1, 2 or 3 dimensions and
`preserve_channels` is `True`, the input and output
channels will be preserved and the same kernel
will be applied to all channel combinations.
(ie. each output channel recieves the sum of all
input channels).
This makes sense if the kernel is further optimized,
otherwise, the same effect can be achieved with a
single input and output channel more effectively.
normalize: bool (default: False)
Whether or not the sum of the kernel values
should be normalized to 1, such that the
sum over all input values and all output
values is the approximately same.
preserve_channels: bool (default: False)
Whether or not to copy smaller kernels
to all input-output channel combinations.
flip: bool (default: True)
If `True`, the weight will be flipped, so that it corresponds
1:1 to patterns it matches (ie. 0,0,0 is the first frame, top left pixel)
and the impulse response will be exactly `w`.
If `False`, the weight will not be flipped.
.. versionadded:: 0.6.4
"""
if type(w) in [int,float]:
#self.weight.data = variables.ones(self.weight.data.shape) * w
self.weight.data = torch.ones(self.weight.data.shape) * w
else:
if len(w.shape) == 5:
self.out_channels = w.shape[0]
self.in_channels = w.shape[1]
else:
if hasattr(w,'__array__'):
# convert to numpy if possible
w = w.__array__()
if len(w.shape) == 1:
w = w[None,None,:,None,None]
elif len(w.shape) == 2:
w = w[None,None,None,:,:]
elif len(w.shape) == 3:
w = w[None,None,:,:,:]
if preserve_channels is True:
w = w * np.ones((self.out_channels, self.in_channels, 1, 1, 1))
w = torch.Tensor(w)
if flip:
if hasattr(w,'flip'):
# in newer PyTorch versions
w = w.flip(2,3,4)
else:
# fallback, see https://github.com/pytorch/pytorch/issues/229
def flip(x, dim):
xsize = x.size()
dim = x.dim() + dim if dim < 0 else dim
x = x.view(-1, *xsize[dim:])
x = x.view(x.size(0), x.size(1), -1)[:, getattr(torch.arange(x.size(1)-1,
-1, -1), ('cpu','cuda')[x.is_cuda])().long(), :]
return x.view(xsize)
w = flip(w,2)
w = flip(w,3)
w = flip(w,4)
self.weight.data = w
if self._use_cuda:
self.weight.data = self.weight.data.cuda()
self.kernel_size = self.weight.data.shape[2:]
if normalize:
self.weight.data = self.weight.data / self.weight.data.sum()
if self.do_adjust_padding:
self.adjust_padding()
def adjust_padding(self):
self.padding = (int(math.ceil((self.kernel_size[0])/2)),
int(math.ceil((self.kernel_size[1])/2)),
int(math.ceil((self.kernel_size[1])/2)))
@property
def filter_length(self):
return self.weight.data.shape[TIME_DIMENSION] - 1
@property
def kernel_padding(self):
k = np.array(self.weight.data.shape[2:])
return (int(math.floor((k[2])/2.0)),
int(math.ceil(k[2]))-int(math.floor((k[2])/2.0))-1,
int(math.floor((k[1])/2.0)),
int(math.ceil(k[1]))-int(math.floor((k[1])/2.0))-1,
0,0)
@property
def kernel_padding_all(self):
k = np.array(self.weight.data.shape[2:])
print('new code!')
return (int(math.floor((k[2])/2.0))-1,
int(math.ceil(k[2]))-int(math.floor((k[2])/2.0)),
int(math.floor((k[1])/2.0))-1,
int(math.ceil(k[1]))-int(math.floor((k[1])/2.0)),
int(math.floor((k[0])/2.0))-1,
int(math.ceil(k[0]))-int(math.floor((k[0])/2)))
[docs] def exponential(self,tau=0.0,adjust_padding=False,*args,**kwargs):
"""Sets the weight to be a 1d temporal lowpass filter with time constant `tau`."""
self.set_weight(nf.exponential_filter_1d(tau,*args,**kwargs)[::-1].copy(),normalize=False,flip=False)
if adjust_padding:
self.adjust_padding()
[docs] def highpass_exponential(self,tau=0.0,adjust_padding=False,*args,**kwargs):
"""Sets the weight to be a 1d temporal highpass filter with time constant `tau`."""
self.set_weight(nf.exponential_highpass_filter_1d(tau,*args,**kwargs)[::-1].copy(),normalize=False,flip=False)
if adjust_padding:
self.adjust_padding()
[docs] def gaussian(self,sig,adjust_padding=False,resolution=None):
"""Sets the weight to be a 2d gaussian filter with width `sig`."""
self.set_weight(nf.gauss_filter_5d(sig,sig,resolution=resolution),normalize=False)
if adjust_padding:
self.adjust_padding()
def __len__(self):
return self.kernel_size[0]*self.kernel_size[1]*self.kernel_size[2]
def forward(self,x):
if self.do_time_pad:
self.time_pad.length = self.filter_length
x = self.time_pad(x)
if self.autopad:
x = torch.nn.functional.pad(x,self.kernel_padding, self.autopad_mode)
return super(Conv3d, self).forward(x)
[docs]class RF(Conv3d):
"""
A Receptive Field Layer
Does a convolution and pads the input in time
with previous input, just like Conv3d, but with
no spatial padding, resulting in a single output
pixel.
To use it correctly, the weight should be set to
the same spatial dimensions as the input.
However, if the weight is larger than the input
or the input is larger than the weight,
the input is padded or cut. The parameter `rf_mode`
controls the placement of the receptive field
on the image.
Currently, only rf_mode='corner' is implemented,
which keeps the top left pixel identical and only
extends or cuts the right and bottom portions
of the input.
.. warning::
The spatial extent of your weight should match your input images to
get meaningful receptive fields. Otherwise the receptive field is placed
at the top left corner of the input.
If the weight was not set manually, the first time the filter sees input
it creates an empty weight of the matching size. However when
the input size is changed, the weight does not change automatically
to match new input. Use :meth:`reset_weight()` to reset the weight
or change the size manually.
Any receptive field of size 1 by 1 pixel is considered
empty and will be replaced with a uniform
weight of the size of the input the next time
the filter is used.
Examples
--------
Simple usage example processing a grating stimulus from `convis.samples`::
>>> m = convis.filters.RF()
>>> inp = convis.samples.moving_gratings()
>>> o = m.run(inp, dt=200)
>>> o.plot()
Or as a part of a cascade model::
>>> m = convis.models.LNCascade()
>>> m.add_layer(convis.filters.Conv3d(1,5,(1,10,10)))
>>> m.add_layer(convis.filters.RF(5,1,(10,1,1)))
# this RF will take into account 10 timesteps, it's width and height will be set by the input
>>> inp = convis.samples.moving_grating()
>>> o = m.run(inp, dt=200)
See Also
--------
torch.nn.Conv3d
Conv3d
"""
def __init__(self,in_channels=1,out_channels=1,kernel_size=(1,1,1),bias=True,rf_mode='corner',*args,**kwargs):
autopad = kwargs.get('autopad',False)
kwargs['autopad'] = autopad
self.rf_placement_mode = rf_mode
super(RF, self).__init__(in_channels,out_channels,kernel_size,bias=bias, *args,**kwargs)
def reset_weight(self):
self.set_weight(np.zeros((self.weight.size()[2],1,1)))
def forward(self,x):
if self.weight.size()[3] == 1 and self.weight.size()[4] == 1:
self.set_weight(np.ones((self.weight.size()[2],x.size()[3],x.size()[4])),normalize=True)
if self.rf_placement_mode is 'corner':
if x.size()[3] < self.weight.size()[3]:
x = torch.nn.functional.pad(x,(0,0,0,self.weight.size()[3],0,0))
if x.size()[4] < self.weight.size()[4]:
x = torch.nn.functional.pad(x,(0,self.weight.size()[4],0,0,0,0))
if x.size()[3] > self.weight.size()[3]:
x = x[:,:,:,:self.weight.size()[3],:]
if x.size()[4] > self.weight.size()[4]:
x = x[:,:,:,:,:self.weight.size()[4]]
else:
raise Exception('RF placements other than \'corner\' are not implemented yet!')
return super(RF, self).forward(x)
[docs]class Conv2d(nn.Conv2d, Layer):
"""Performs a 2d convolution.
Filter size can be 2d (spatial filter: `x,y`) or 3d (`channels,x,y`)
or 4d (`batches,channels,x,y`).
A filter can be set by supplying a `torch.Tensor` or `np.array` to `.set_weight()` and is expanded to a 4d Tensor.
**Note:** The filter is flipped during the convolution with respect to the image.
Convolutions in convis do automatic padding in space, unless told other wise by supplying
the keyword argument `autopad=False`. The input will be padded to create output of the same size.
Uneven weights (eg. `9x9`) will be perfectly centered, such that the center pixel of the weight, the
input pixel and output pixel all align. For even weights, this holds for the last pixel
after the center (`[6,6]` for a `10x10` weight).
The attribute `self.autopad_mode` can be set to a string that is passed to
:func:`torch.nn.functional.pad`. The default is `'replicate`'
See Also
--------
torch.nn.Conv2d
Conv1d
Conv3d
RF
"""
def __init__(self,in_channels=1,out_channels=1,kernel_size=(1,1),*args,**kwargs):
self.dims = 5
self.autopad = kwargs.get('autopad',True)
self.autopad_mode = 'replicate'
if 'autopad' in kwargs.keys():
del kwargs['autopad']
super(Conv2d, self).__init__(in_channels,out_channels,kernel_size,*args,**kwargs)
if hasattr(self,'bias') and self.bias is not None:
self.bias = variables.Parameter(0.0)
self.weight = variables.Parameter(torch.zeros(self.weight.data.shape))
[docs] def set_weight(self,w,normalize=False,flip=True):
"""
Sets a new weight for the convolution.
Parameters
----------
w: numpy array or PyTorch Tensor
The new kernel `w` should have 2,3 or 4 dimensions:
**2d:** (x, y)
**3d:** (out_channels, x, y)
**4d:** (in_channels, out_channels, x, y)
Missing dimensions are added at the front.
normalize: bool (default: False)
Whether or not the sum of the kernel values
should be normalized to 1, such that the
sum over all input values and all output
values is the approximately same.
flip: bool (default: True)
If `True`, the weight will be flipped, so that it corresponds
1:1 to patterns it matches (ie. 0,0 is the top left pixel)
and the impulse response will be exactly `w`.
If `False`, the weight will not be flipped.
.. versionadded:: 0.6.4
"""
if type(w) in [int,float]:
self.weight.data = torch.ones(self.weight.data.shape).data * w
else:
if self.weight.data.shape == w.shape:
self.weight.data = torch.Tensor(w)
else:
if len(w.shape) == 4:
self.out_channels = w.shape[0]
self.in_channels = w.shape[1]
w_h = w.shape[2]
w_w = w.shape[3]
w = torch.Tensor(w)
elif len(w.shape) == 3:
w_h = w.shape[1]
w_w = w.shape[2]
w = torch.Tensor(w)[None]
elif len(w.shape) == 2:
w_h = w.shape[0]
w_w = w.shape[1]
w= torch.Tensor(w)[None][None]
else:
raise Exception('Conv2d accepts weights with 2,3 or 4 dimensions. Weight has shape '+str(w.shape)+'!')
if flip:
if hasattr(w,'flip'):
# in newer PyTorch versions
w = w.flip(2,3)
else:
# fallback, see https://github.com/pytorch/pytorch/issues/229
def flip(x, dim):
xsize = x.size()
dim = x.dim() + dim if dim < 0 else dim
x = x.view(-1, *xsize[dim:])
x = x.view(x.size(0), x.size(1), -1)[:, getattr(torch.arange(x.size(1)-1,
-1, -1), ('cpu','cuda')[x.is_cuda])().long(), :]
return x.view(xsize)
w = flip(w,2)
w = flip(w,3)
self.weight.data = w
if normalize:
self.weight.data = self.weight.data / self.weight.data.sum()
@property
def kernel_padding(self):
k = np.array(self.weight.data.shape[-2:])
return (int(math.floor((k[1])/2.0)),
int(math.ceil(k[1]))-int(math.floor((k[1])/2.0))-1,
int(math.floor((k[0])/2.0)),
int(math.ceil(k[0]))-int(math.floor((k[0])/2.0))-1,
0,0)
def forward(self,x):
if self.autopad:
x = torch.nn.functional.pad(x,self.kernel_padding, self.autopad_mode)
outs = []
for i in range(x.size()[0]):
outs.append(super(Conv2d, self).forward(x[i].transpose(0,1)).transpose(0,1)[None,:,:,:,:])
return torch.cat(outs,dim=0)
def gaussian(self,sig):
self.set_weight(nf.gauss_filter_2d(sig,sig)[None,None,:,:],normalize=False)
[docs]class Conv1d(nn.Conv1d, Layer):
"""1d convolution with optional in-out-channels.
Weights can be set with `set_weight` and will be automatically flipped
to keep the weight and the impulse response identical.
The weight can be 1d (no channels/only time) or 3d (in-channels, out-channels,time).
.. note::
During the processing, all spatial dimensions will be collapsed into the batch dimension.
See Also
--------
torch.nn.Conv1d
Conv2d
Conv3d
RF
"""
def __init__(self,in_channels=1,out_channels=1,kernel_size=1,*args,**kwargs):
self.do_time_pad = kwargs.get('time_pad',True)
if 'time_pad' in kwargs.keys():
del kwargs['time_pad']
super(Conv1d, self).__init__(in_channels,out_channels,kernel_size,*args,**kwargs)
if hasattr(self,'bias') and self.bias is not None:
self.bias.data[0] = 0.0
self.weight.data = torch.zeros(self.weight.data.shape)
self.time_pad = TimePadding(self.weight.size()[TIME_DIMENSION])
@property
def filter_length(self):
return self.weight.data.shape[-1] - 1
[docs] def set_weight(self,w,normalize=False,flip=True):
"""
Sets a new weight for the convolution.
Parameters
----------
w: numpy array or PyTorch Tensor
The new kernel `w` should have 1 or 3 dimensions:
**1d:** (time)
**3d:** (in_channels, out_channels, time)
normalize: bool (default: False)
Whether or not the sum of the kernel values
should be normalized to 1, such that the
sum over all input values and all output
values is the approximately same.
flip: bool (default: True)
If `True`, the weight will be flipped, so that it corresponds
1:1 to patterns it matches (ie. 0 is the first frame)
and the impulse response will be exactly `w`.
If `False`, the weight will not be flipped.
.. versionadded:: 0.6.4
"""
if type(w) in [int,float]:
self.weight.data = torch.ones(self.weight.data.shape).data * w
else:
#self.weight.data = torch.Tensor(w)
w = torch.Tensor(w)
if len(w.shape) == 3:
self.out_channels = w.shape[0]
self.in_channels = w.shape[1]
elif len(w.shape) == 1:
w = w[None,None,:]
else:
raise Exception('Conv1d weights have to be 1d or 3d, not '+str(len(w.shape))+'! Please refer to the doc string.')
if flip:
if hasattr(w,'flip'):
# in newer PyTorch versions
w = w.flip(2)
else:
# fallback, see https://github.com/pytorch/pytorch/issues/229
def flip(x, dim):
xsize = x.size()
dim = x.dim() + dim if dim < 0 else dim
x = x.view(-1, *xsize[dim:])
x = x.view(x.size(0), x.size(1), -1)[:, getattr(torch.arange(x.size(1)-1,
-1, -1), ('cpu','cuda')[x.is_cuda])().long(), :]
return x.view(xsize)
w = flip(w,2)
self.weight.data = w
if normalize:
self.weight.data = self.weight.data / self.weight.data.sum()
def forward(self,x):
if self.do_time_pad:
self.time_pad.length = self.filter_length
x = self.time_pad(x)
# we move both space dimensions into the batch dimension
s = list(x.size())
x = x.transpose(1,3).transpose(2,4).reshape((s[0]*s[3]*s[4],s[1],s[2]))
y = super(Conv1d, self).forward(x)
s_y = list(y.size())
return y.reshape((s[0],s[3],s[4],s[1],s_y[2])).transpose(4,2).transpose(3,1)
[docs] def exponential(self,tau,*args,**kwargs):
"""Sets the weight to be an exponential filter (low-pass filter) with time constant `tau`.
"""
self.set_weight(nf.exponential_filter_1d(tau,*args,**kwargs),normalize=False,flip=True)
class L(Layer):
def __init__(self,kernel_dim=(1,1,1), bias = False):
self.dim = 5
super(L, self).__init__()
self.conv = Conv3d(1, 1, kernel_dim, bias = bias)
if hasattr(self,'bias') and self.bias is not None:
self.conv.bias.data[0] = 0.0
self.conv.weight.data[:,:,:,:,:] = 0.0
def forward(self, x):
return self.conv(x)
class LN(Layer):
def __init__(self,kernel_dim=(1,1,1), bias = False):
self.dim = 5
super(LN, self).__init__()
self.conv = Conv3d(1, 1, kernel_dim, bias = bias)
if hasattr(self,'bias') and self.bias is not None:
self.conv.bias.data[0] = 0.0
self.conv.weight.data[:,:,:,:,:] = 0.0
def forward(self, x):
return self.conv(x).clamp(min=0.0)
class TemporalLowPassFilterRecursive(Layer):
def __init__(self,kernel_dim=(1,1,1),requires_grad=True):
self.dim = 5
super(TemporalLowPassFilterRecursive, self).__init__()
#self.tau = Parameter(0.01,requires_grad=True)
self.tau = variables.Parameter(torch.Tensor([0.01]),requires_grad=requires_grad)
self.register_state('last_y',None)
def clear(self):
if hasattr(self,'last_y'):
self.last_y = None
def forward(self, x):
steps = variables.Parameter(1.0/_get_default_resolution().steps_per_second,requires_grad=False)
if self._use_cuda:
steps = steps.cuda()
a_0 = 1.0
a_1 = -torch.exp(-steps/self.tau)
b_0 = 1.0 - a_1
if self.last_y is not None:
y = self.last_y
else:
y = variables.zeros(1,1,1,x.data.shape[3],x.data.shape[4])
if self._use_cuda:
y = y.cuda()
o = []
for i in range(x.data.shape[TIME_DIMENSION]):
y = (x[:,:,i,:,:] * b_0 - y * a_1) / a_0
o.append(y)
self.last_y = y.detach()
norm = 2.0*self.tau/steps#(self.tau/(self.tau+0.5))*steps
return torch.cat(o,dim=TIME_DIMENSION)/norm
class TemporalHighPassFilterRecursive(Layer):
def __init__(self,kernel_dim=(1,1,1),requires_grad=True):
self.dim = 5
super(TemporalHighPassFilterRecursive, self).__init__()
#self.tau = Parameter(0.01,requires_grad=True)
self.tau = variables.Parameter(torch.Tensor([0.01]),requires_grad=requires_grad)
self.k = variables.Parameter(torch.Tensor([0.5]),requires_grad=requires_grad)
self.register_state('last_y',None)
def clear(self):
if hasattr(self,'last_y'):
self.last_y = None
def forward(self, x):
steps = variables.Parameter(1.0/_get_default_resolution().steps_per_second,requires_grad=False)
if self._use_cuda:
steps = steps.cuda()
a_0 = 1.0
a_1 = -torch.exp(-steps/self.tau)
b_0 = 1.0 - a_1
if self.last_y is not None:
y = self.last_y
else:
y = variables.zeros(1,1,1,x.data.shape[3],x.data.shape[4])
if self._use_cuda:
y = y.cuda()
o = []
x1 = x[:,:,0,:,:]
for i in range(x.data.shape[TIME_DIMENSION]):
y = (x1 * b_0 - y * a_1) / a_0
x1 = x[:,:,i,:,:]
o.append(y)
self.last_y = y
norm = 2.0*self.tau/steps#(self.tau/(self.tau+0.5))*steps
return x - (self.k)*torch.cat(o,dim=TIME_DIMENSION)/norm
def _select_(x,dim,i):
if dim == 0:
return x[i,:,:,:,:,][None,:,:,:,:]
if dim == 1:
return x[:,i,:,:,:][:,None,:,:,:]
if dim == 2:
return x[:,:,i,:,:][:,:,None,:,:]
if dim == 3:
return x[:,:,:,i,:][:,:,:,None,:]
if dim == 4:
return x[:,:,:,:,i][:,:,:,:,None]
class SpatialRecursiveFilter(Layer):
def __init__(self,kernel_dim=(1,1,1),requires_grad=True):
self.dim = 5
super(SpatialRecursiveFilter, self).__init__()
self.density = variables.Parameter(torch.Tensor([1.0]))
def forward(self, x):
config = {}
alpha = 1.695 * self.density
ema = torch.exp(-alpha)
ek = (1.0-ema)*(1.0-ema) / (1.0+2.0*alpha*ema - ema*ema)
A1 = ek
A2 = ek * ema * (alpha-1.0)
A3 = ek * ema * (alpha+1.0)
A4 = -ek*ema*ema
B1 = 2.0*ema
B2 = -ema*ema
def smooth_forward(x,a1,a2,b1,b2,dim):
x1 = _select_(x,dim,0)
o = []
y1 = variables.zeros_like(x1.data)
y2 = variables.zeros_like(x1.data)
x2 = variables.zeros_like(x1.data)
for i in range(x.data.shape[dim]):
x1,x2 = _select_(x,dim,i),x1
y = (a1 * x1 + a2 * x2 + b1 * y1 + b2 * y2)
y1, y2 = y, y1
o.append(y)
o = torch.cat(o,dim=dim)
return o
def smooth_backward(x,a1,a2,b1,b2,dim):
x1 = _select_(x,dim,0)
o = []
y1 = variables.zeros_like(x1.data)
y2 = variables.zeros_like(x1.data)
x2 = variables.zeros_like(x1.data)
for i in range(x.data.shape[dim]-1,-1,-1):
y = (a1 * x1 + a2 * x2 + b1 * y1 + b2 * y2)
x1,x2 = _select_(x,dim,i),x1
y1, y2 = y, y1
o.append(y)
o = torch.cat(o[::-1],dim=dim)
return o
x_ = smooth_forward(x,A1,A2,B1,B2,dim=X_DIMENSION)
x = smooth_backward(x,A3,A4,B1,B2,dim=X_DIMENSION) + x_
x_ = smooth_forward(x,A1,A2,B1,B2,dim=Y_DIMENSION)
x = smooth_backward(x,A3,A4,B1,B2,dim=Y_DIMENSION) + x_
return x
def gaussian(self, sigma):
"""
sets the filter density to
approximate a gaussian filter with
sigma standard deviation.
"""
self.density.data[0] = 1.0/(sigma*_get_default_resolution().pixel_per_degree)
[docs]class SmoothConv(Layer):
"""
A convolution with temporally smoothed filters.
It can cover a long temporal period, but is a lot more
efficient than a convlution filter of the same length.
Each spatial filter `.g[n]` is applied to a temporally filtered
signal with increasing delays by convolving multiple recursive
exponential filters.
The length of the filter depends on the number of temporal
components and the time constant used for the delays.
Each exponential filter `.e[n]` can have an individual
time constant, giving variable spacing between the filters.
By default, the time constants are set to not create a gradient,
so that they are not fittable.
To show each component, use `get_all_components(some_input)`
.. plot::
:include-source:
import matplotlib.pyplot as plt
import numpy as np
import convis
s = convis.filters.SmoothConv(n=6,tau=0.05)
inp = np.zeros((1000,1,1))
inp[50,0,0] = 1.0
inp = convis.prepare_input(inp)
c = s.get_all_components(inp)
convis.plot_5d_time(c,mean=(3,4))
c = c.data.cpu().numpy()
Attributes
----------
Methods
-------
See Also
--------
convis.filters.Conv3d : A full convolution layer
"""
def __init__(self,n=3,tau=0.1,spatial_filter=(10,10)):
super(SmoothConv, self).__init__()
self.dims=5
self.e = []
self.g = []
for i in range(n):
self.e.append(TemporalLowPassFilterRecursive(requires_grad=False))
self.e[i].tau.data[0] = tau
self.g.append(Conv3d(1,1,(1,spatial_filter[0],spatial_filter[1]),autopad=True, bias=False))
self.g[i].set_weight(np.random.randn(1,spatial_filter[0],spatial_filter[1]))
self.e = torch.nn.ModuleList(self.e)
self.g = torch.nn.ModuleList(self.g)
def forward(self,the_input):
o = []
y = the_input
for i in range(len(self.e)):
y = self.e[i](y)
o.append(self.g[i](y))
return torch.sum(torch.cat(o,dim=0),dim=0)[None,:,:,:,:]
def get_all_components(self,the_input):
o = []
y = the_input
for i in range(len(self.e)):
y = self.e[i](y)
o.append(self.g[i](y))
return torch.cat(o,dim=1)
[docs]class NLRectify(Layer):
"""Rectifies the input (ie. sets values < 0 to 0)
Note
----
To implement simple nonlinearities, you can also
use lambda expressions::
model = convis.mdoels.LN()
model.nonlinearity = lambda inp: (inp).clamp(min=0.0,max=1000000.0)
"""
def __init__(self):
super(NLRectify, self).__init__()
def forward(self, inp):
return (inp).clamp(min=0.0,max=1000000.0)
[docs]class NLRectifyScale(Layer):
"""Rectifies the input, but transforms the input with a scale and a bias.
Pseudocode::
out = bias + the_input * scale
out[out < 0] = 0
"""
def __init__(self):
super(NLRectifyScale, self).__init__()
self.scale = variables.Parameter(1.0)
self.bias = variables.Parameter(0.0)
def forward(self, inp):
return (self.bias+inp*self.scale).clamp(min=0.0,max=1000000.0)
[docs]class NLSquare(Layer):
"""A square nonlinearity with a scalable input weight and bias.
"""
def __init__(self):
super(NLSquare, self).__init__()
self.scale = variables.Parameter(1.0)
self.bias = variables.Parameter(0.0)
def forward(self, inp):
return (self.bias+inp*self.scale)**2
[docs]class NLRectifySquare(Layer):
"""A square nonlinearity with a scalable input weight and bias
that cuts off negative values after adding the bias.
"""
def __init__(self):
super(NLRectifySquare, self).__init__()
self.scale = variables.Parameter(1.0)
self.bias = variables.Parameter(0.0)
def forward(self, inp):
return ((self.bias+inp*self.scale).clamp(min=0.0,max=1000000.0))**2
[docs]def sum(*args, **kwargs):
"""concatenates and sums tensors over a given dimension `dim`.
Examples
--------
>>> inp = convis.prepare_input(np.ones((2,2,100,10,10)))
>>> o = convis.filters.sum(inp,inp,inp,dim=1)
See Also
--------
Sum
"""
dim = kwargs.get('dim',0)
return torch.cat(args,dim=dim).sum(dim=dim,keepdim=True)
[docs]class Sum(Layer):
"""A Layer that combines all inputs into one tensor and
sums over a given dimension.
Can be used to collapse batch or filter dimensions.
Examples
--------
>>> s = Sum(1)
>>> inp = convis.prepare_input(np.ones((2,2,100,10,10)))
>>> o = s(inp,inp,inp)
See Also
--------
sum
"""
def __init__(self, dim=0):
self.dims = 5
self.dim = dim
super(Sum, self).__init__()
def forward(self, *args):
return sum(args,dim=self.dim)
[docs]class Diff(Layer):
"""Takes the difference between two consecutive frames.
Example
-------
.. plot::
:include-source:
import convis
d = Diff()
inp = convis.samples.moving_grating()
o = d.run(inp,dt=200)
o.plot()
"""
def __init__(self):
self.dim = 5
super(DVS2,self).__init__()
self.register_state('last_frame',None)
def forward(self,inp):
if self.last_frame is not None:
first_frame = inp[:,:,:1,:,:] - self.last_frame
else:
first_frame = torch.zeros((inp.size()[0],inp.size()[1],1,inp.size()[3],inp.size()[4]))
self.last_frame = inp[:,:,-1:,:,:]
return torch.cat([first_frame, inp[:,:,1:,:,:] - inp[:,:,:-1,:,:]],dim=2)
from . import simple
from . import retina
from . import spiking