Spaces:
Runtime error
Runtime error
import numpy as np | |
import scipy.stats as st | |
import tensorflow | |
tensorflow.compat.v1.disable_v2_behavior() | |
tf = tensorflow.compat.v1 | |
def layer(op): | |
def layer_decorated(self, *args, **kwargs): | |
# Automatically set a name if not provided. | |
name = kwargs.setdefault('name', self.get_unique_name(op.__name__)) | |
# Figure out the layer inputs. | |
if len(self.terminals) == 0: | |
raise RuntimeError('No input variables found for layer %s.' % name) | |
elif len(self.terminals) == 1: | |
layer_input = self.terminals[0] | |
else: | |
layer_input = list(self.terminals) | |
# Perform the operation and get the output. | |
layer_output = op(self, layer_input, *args, **kwargs) | |
# Add to layer LUT. | |
self.layers[name] = layer_output | |
# This output is now the input for the next layer. | |
self.feed(layer_output) | |
# Return self for chained calls. | |
return self | |
return layer_decorated | |
class Smoother(object): | |
def __init__(self, inputs, filter_size, sigma): | |
self.inputs = inputs | |
self.terminals = [] | |
self.layers = dict(inputs) | |
self.filter_size = filter_size | |
self.sigma = sigma | |
self.setup() | |
def setup(self): | |
(self.feed('data') | |
.conv(name = 'smoothing')) | |
def get_unique_name(self, prefix): | |
ident = sum(t.startswith(prefix) for t, _ in self.layers.items()) + 1 | |
return '%s_%d' % (prefix, ident) | |
def feed(self, *args): | |
assert len(args) != 0 | |
self.terminals = [] | |
for fed_layer in args: | |
if isinstance(fed_layer, str): | |
try: | |
fed_layer = self.layers[fed_layer] | |
except KeyError: | |
raise KeyError('Unknown layer name fed: %s' % fed_layer) | |
self.terminals.append(fed_layer) | |
return self | |
def gauss_kernel(self, kernlen=21, nsig=3, channels=1): | |
interval = (2*nsig+1.)/(kernlen) | |
x = np.linspace(-nsig-interval/2., nsig+interval/2., kernlen+1) | |
kern1d = np.diff(st.norm.cdf(x)) | |
kernel_raw = np.sqrt(np.outer(kern1d, kern1d)) | |
kernel = kernel_raw/kernel_raw.sum() | |
out_filter = np.array(kernel, dtype = np.float32) | |
out_filter = out_filter.reshape((int(kernlen), int(kernlen), 1, 1)) | |
out_filter = np.repeat(out_filter, channels, axis = 2) | |
return out_filter | |
def make_gauss_var(self, name, size, sigma, c_i): | |
kernel = self.gauss_kernel(size, sigma, c_i) | |
var = tf.Variable(tf.convert_to_tensor(kernel), name=name) | |
return var | |
def get_output(self): | |
'''Returns the smoother output.''' | |
return self.terminals[-1] | |
def conv(self, | |
input, | |
name, | |
padding='SAME'): | |
# Get the number of channels in the input | |
c_i = input.get_shape().as_list()[3] | |
# Convolution for a given input and kernel | |
convolve = lambda i, k: tf.nn.depthwise_conv2d(i, k, [1, 1, 1, 1], | |
padding=padding) | |
with tf.variable_scope(name) as scope: | |
kernel = self.make_gauss_var('gauss_weight', self.filter_size, | |
self.sigma, c_i) | |
output = convolve(input, kernel) | |
return output | |