"""Perception (Chapter 24)""" import numpy as np import scipy.signal import matplotlib.pyplot as plt from utils4e import gaussian_kernel_2d import keras from keras.datasets import mnist from keras.models import Sequential from keras.layers import Dense, Activation, Flatten, InputLayer from keras.layers import Conv2D, MaxPooling2D import cv2 # ____________________________________________________ # 24.3 Early Image Processing Operators # 24.3.1 Edge Detection def array_normalization(array, range_min, range_max): """normalize an array in the range of (range_min, range_max)""" if not isinstance(array, np.ndarray): array = np.asarray(array) array = array - np.min(array) array = array * (range_max - range_min) / np.max(array) + range_min return array def gradient_edge_detector(image): """ Image edge detection by calculating gradients in the image :param image: numpy ndarray or an iterable object :return: numpy ndarray, representing a gray scale image """ if not isinstance(image, np.ndarray): img = np.asarray(image) # gradient filters of x and y direction edges x_filter, y_filter = np.array([[1, -1]]), np.array([[1], [-1]]) # convolution between filter and image to get edges y_edges = scipy.signal.convolve2d(img, x_filter, 'same') x_edges = scipy.signal.convolve2d(img, y_filter, 'same') edges = array_normalization(x_edges+y_edges, 0, 255) return edges def gaussian_derivative_edge_detector(image): """Image edge detector using derivative of gaussian kernels""" if not isinstance(image, np.ndarray): img = np.asarray(image) gaussian_filter = gaussian_kernel_2d() # init derivative of gaussian filters x_filter = scipy.signal.convolve2d(gaussian_filter, np.asarray([[1, -1]]), 'same') y_filter = scipy.signal.convolve2d(gaussian_filter, np.asarray([[1], [-1]]), 'same') # extract edges using convolution y_edges = scipy.signal.convolve2d(img, x_filter, 'same') x_edges = scipy.signal.convolve2d(img, y_filter, 'same') edges = array_normalization(x_edges+y_edges, 0, 255) return edges def laplacian_edge_detector(image): """Extract image edge with laplacian filter""" if not isinstance(image, np.ndarray): img = np.asarray(image) # init laplacian filter laplacian_kernel = np.asarray([[0, -1, 0], [-1, 4, -1], [0, -1, 0]]) # extract edges with convolution edges = scipy.signal.convolve2d(img, laplacian_kernel, 'same') edges = array_normalization(edges, 0, 255) return edges def show_edges(edges): """ helper function to show edges picture""" plt.imshow(edges, cmap='gray', vmin=0, vmax=255) plt.axis('off') plt.show() # __________________________________________________ # 24.3.3 Optical flow def sum_squared_difference(pic1, pic2): """ssd of two frames""" pic1 = np.asarray(pic1) pic2 = np.asarray(pic2) assert pic1.shape == pic2.shape min_ssd = float('inf') min_dxy = (float('inf'), float('inf')) # consider picture shift from -30 to 30 for Dx in range(-30, 31): for Dy in range(-30, 31): # shift the image shifted_pic = np.roll(pic2, Dx, axis=0) shifted_pic = np.roll(shifted_pic, Dy, axis=1) # calculate the difference diff = np.sum((pic1 - shifted_pic) ** 2) if diff < min_ssd: min_dxy = (Dx, Dy) min_ssd = diff return min_dxy, min_ssd # ____________________________________________________ # segmentation def gen_gray_scale_picture(size, level=3): """ Generate a picture with different gray scale levels :param size: size of generated picture :param level: the number of level of gray scales in the picture, range (0, 255) are equally divided by number of levels :return image in numpy ndarray type """ assert level > 0 # init an empty image image = np.zeros((size, size)) if level == 1: return image # draw a square on the left upper corner of the image for x in range(size): for y in range(size): image[x,y] += (250//(level-1)) * (max(x, y)*level//size) return image gray_scale_image = gen_gray_scale_picture(3) def probability_contour_detection(image, discs, threshold=0): """ detect edges/contours by applying a set of discs to an image :param image: an image in type of numpy ndarray :param discs: a set of discs/filters to apply to pixels of image :param threshold: threshold to tell whether the pixel at (x, y) is on an edge :return image showing edges in numpy ndarray type """ # init an empty output image res = np.zeros(image.shape) step = discs[0].shape[0] for x_i in range(0, image.shape[0]-step+1,1): for y_i in range(0, image.shape[1]-step+1, 1): diff = [] # apply each pair of discs and calculate the difference for d in range(0, len(discs),2): disc1, disc2 = discs[d], discs[d+1] # crop the region of interest region = image[x_i: x_i+step, y_i: y_i+step] diff.append(np.sum(np.multiply(region, disc1)) - np.sum(np.multiply(region, disc2))) if max(diff) > threshold: # change color of the center of region res[x_i + step//2, y_i + step//2] = 255 return res def group_contour_detection(image, cluster_num=2): """ detecting contours in an image with k-means clustering :param image: an image in numpy ndarray type :param cluster_num: number of clusters in k-means """ img = image Z = np.float32(img) criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0) K = cluster_num # use kmeans in opencv-python ret, label, center = cv2.kmeans(Z, K, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS) center = np.uint8(center) res = center[label.flatten()] res2 = res.reshape((img.shape)) # show the image cv2.imshow('res2', res2) cv2.waitKey(0) cv2.destroyAllWindows() def image_to_graph(image): """ convert an image to an graph in adjacent matrix form """ graph_dict = {} for x in range(image.shape[0]): for y in range(image.shape[1]): graph_dict[(x, y)] = [(x+1, y) if x+1 < image.shape[0] else None, (x, y+1) if y+1 < image.shape[1] else None] return graph_dict def generate_edge_weight(image, v1, v2): """ find edge weight between two vertices in an image :param image: image in numpy ndarray type :param v1, v2: verticles in the image in form of (x index, y index) """ diff = abs(image[v1[0], v1[1]] - image[v2[0], v2[1]]) return 255-diff class Graph: """graph in adjacent matrix to represent an image""" def __init__(self, image): """image: ndarray""" self.graph = image_to_graph(image) # number of columns and rows self.ROW = len(self.graph) self.COL = 2 self.image = image # dictionary to save the maximum flow of each edge self.flow = {} # initialize the flow for s in self.graph: self.flow[s] = {} for t in self.graph[s]: if t: self.flow[s][t] = generate_edge_weight(image, s, t) def bfs(self, s, t, parent): """breadth first search to tell whether there is an edge between source and sink parent: a list to save the path between s and t""" # queue to save the current searching frontier queue = [s] visited = [] while queue: u = queue.pop(0) for node in self.graph[u]: # only select edge with positive flow if node not in visited and node and self.flow[u][node]>0: queue.append(node) visited.append(node) parent.append((u, node)) return True if t in visited else False def min_cut(self, source, sink): """find the minimum cut of the graph between source and sink""" parent = [] max_flow = 0 while self.bfs(source, sink, parent): path_flow = float('inf') # find the minimum flow of s-t path for s, t in parent: path_flow = min(path_flow, self.flow[s][t]) max_flow += path_flow # update all edges between source and sink for s in self.flow: for t in self.flow[s]: if t[0] <= sink[0] and t[1] <= sink[1]: self.flow[s][t] -= path_flow parent = [] res = [] for i in self.flow: for j in self.flow[i]: if self.flow[i][j] == 0 and generate_edge_weight(self.image, i,j) > 0: res.append((i,j)) return res def gen_discs(init_scale, scales=1): """ Generate a collection of disc pairs by splitting an round discs with different angles :param init_scale: the initial size of each half discs :param scales: scale number of each type of half discs, the scale size will be doubled each time :return: the collection of generated discs: [discs of scale1, discs of scale2...] """ discs = [] for m in range(scales): scale = init_scale * (m+1) disc = [] # make the full empty dist white = np.zeros((scale, scale)) center = (scale-1)/2 for i in range(scale): for j in range(scale): if (i-center)**2 + (j-center)**2 <= (center ** 2): white[i, j] = 255 # generate lower half and upper half lower_half = np.copy(white) lower_half[:(scale-1)//2, :] = 0 upper_half = lower_half[::-1, ::-1] # generate left half and right half disc += [lower_half, upper_half, np.transpose(lower_half), np.transpose(upper_half)] # generate upper-left, lower-right, upper-right, lower-left half discs disc += [np.tril(white, 0), np.triu(white, 0), np.flip(np.tril(white, 0), axis=0), np.flip(np.triu(white, 0), axis=0)] discs.append(disc) return discs # __________________________________________________ # 24.4 Classifying Images def load_MINST(train_size, val_size, test_size): """load MINST dataset from keras""" (x_train, y_train), (x_test, y_test) = mnist.load_data() total_size = len(x_train) if train_size + val_size > total_size: train_size = total_size - val_size x_train = x_train.reshape(x_train.shape[0], 1, 28, 28) x_test = x_test.reshape(x_test.shape[0], 1, 28, 28) x_train = x_train.astype('float32') x_train /= 255 test_x = x_test.astype('float32') test_x /= 255 y_train = keras.utils.to_categorical(y_train, 10) y_test = keras.utils.to_categorical(y_test, 10) return (x_train[:train_size], y_train[:train_size]), \ (x_train[train_size:train_size+val_size], y_train[train_size:train_size+val_size]), \ (x_test[:test_size], y_test[:test_size]) def simple_convnet(size=3, num_classes=10): """ simple convolutional network for digit recognition :param size: number of convolution layers :param num_classes: number of output classes :return a convolution network in keras model type """ model = Sequential() # add input layer for images of size (28, 28) model.add( InputLayer(input_shape=(1, 28, 28)) ) # add convolution layers and max pooling layers for _ in range(size): model.add( Conv2D( 32, (2, 2), padding='same', kernel_initializer='random_uniform' ) ) model.add(MaxPooling2D(padding='same')) # add flatten layer and output layers model.add(Flatten()) model.add(Dense(num_classes)) model.add(Activation('softmax')) # compile model opt = keras.optimizers.rmsprop(lr=0.0001, decay=1e-6) model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy']) print(model.summary()) return model def train_model(model): """train the simple convolution network""" # load dataset (train_x, train_y), (val_x, val_y), (test_x, test_y) = load_MINST(1000, 100, 100) model.fit(train_x, train_y, validation_data=(val_x, val_y), epochs=5, verbose=2, batch_size=32) scores = model.evaluate(test_x, test_y, verbose=1) print(scores) return model # _____________________________________________________ # 24.5 DETECTING OBJECTS def selective_search(image): """ selective search for object detection :param image: str, the path of image or image in ndarray type with 3 channels :return list of bounding boxes, each element is in form of [x_min, y_min, x_max, y_max] """ if not image: im = cv2.imread("./images/stapler1-test.png") elif isinstance(image, str): im = cv2.imread(image) else: im =np.stack((image)*3, axis=-1) # use opencv python to extract bounding box with selective search ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation() ss.setBaseImage(im) ss.switchToSelectiveSearchQuality() rects = ss.process() # show bounding boxes with the input image image_out = im.copy() for rect in rects[:100]: print(rect) x, y, w, h = rect cv2.rectangle(image_out, (x, y), (x + w, y + h), (0, 255, 0), 1, cv2.LINE_AA) cv2.imshow("Output", image_out) cv2.waitKey(0) return rects # faster RCNN def pool_rois(feature_map, rois, pooled_height, pooled_width): """ Applies ROI pooling for a single image and varios ROIs :param feature_map: ndarray, in shape of (width, height, channel) :param rois: list of roi :param pooled_height: height of pooled area :param pooled_width: width of pooled area :return list of pooled features """ def curried_pool_roi(roi): return pool_roi(feature_map, roi, pooled_height, pooled_width) pooled_areas = list(map(curried_pool_roi, rois)) return pooled_areas def pool_roi(feature_map, roi, pooled_height, pooled_width): """ Applies a single ROI pooling to a single image :param feature_map: ndarray, in shape of (width, height, channel) :param roi: region of interest, in form of [x_min_ratio, y_min_ratio, x_max_ratio, y_max_ratio] :return feature of pooling output, in shape of (pooled_width, pooled_height) """ # Compute the region of interest feature_map_height = int(feature_map.shape[0]) feature_map_width = int(feature_map.shape[1]) h_start = int(feature_map_height * roi[0]) w_start = int(feature_map_width * roi[1]) h_end = int(feature_map_height * roi[2]) w_end = int(feature_map_width * roi[3]) region = feature_map[h_start:h_end, w_start:w_end, :] # Divide the region into non overlapping areas region_height = h_end - h_start region_width = w_end - w_start h_step = region_height // pooled_height w_step = region_width // pooled_width areas = [[( i * h_step, j * w_step, (i + 1) * h_step if i + 1 < pooled_height else region_height, (j + 1) * w_step if j + 1 < pooled_width else region_width ) for j in range(pooled_width)] for i in range(pooled_height)] # take the maximum of each area and stack the result def pool_area(x): return np.max(region[x[0]:x[2], x[1]:x[3], :]) pooled_features = np.stack([[pool_area(x) for x in row] for row in areas]) return pooled_features # faster rcnn demo can be installed and shown in jupyter notebook # def faster_rcnn_demo(directory): # """ # show the demo of rcnn, the model is from # @inproceedings{renNIPS15fasterrcnn, # Author = {Shaoqing Ren and Kaiming He and Ross Girshick and Jian Sun}, # Title = {Faster {R-CNN}: Towards Real-Time Object Detection # with Region Proposal Networks}, # Booktitle = {Advances in Neural Information Processing Systems ({NIPS})}, # Year = {2015}} # :param directory: the directory where the faster rcnn model is installed # """ # os.chdir(directory + '/lib') # # make file # os.system("make clean") # os.system("make") # # run demo # os.chdir(directory) # os.system("./tools/demo.py") # return 0