blob: 5709ff2cff7b7aa6284119379ce43834e0b9c46b [file] [log] [blame]
# Copyright 2023 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import numpy as np
from absl import logging
from tflite_micro.tensorflow.lite.python.schema_py_generated import TensorType
# Map flatbuffer tensor type code to numpy data type. see Table TensorType in tensorflow/lite/schema/schema.fbs
# TODO(b/269487423): use a common util function instead
TENSOR_CODE_TYPE = {
TensorType.FLOAT32: np.float32,
TensorType.FLOAT16: np.float16,
TensorType.INT32: np.int32,
TensorType.UINT8: np.uint8,
TensorType.INT64: np.int64,
TensorType.STRING: np.string_,
TensorType.BOOL: np.bool_,
TensorType.INT16: np.int16,
TensorType.COMPLEX64: np.complex64,
TensorType.INT8: np.int8,
TensorType.FLOAT64: np.float64,
TensorType.COMPLEX128: np.complex128,
TensorType.UINT64: np.uint64,
TensorType.RESOURCE: "RESOURCE",
TensorType.VARIANT: "VARIANT",
TensorType.UINT32: np.uint32,
TensorType.UINT16: np.uint16,
TensorType.INT4: "INT4",
}
# TODO(b/269487423): use a common util function instead
TENSOR_TYPE_CODE = dict((reversed(item) for item in TENSOR_CODE_TYPE.items()))
def clip_range(vals, bit_width):
"""Mimic integer calculation.
Clip the range of vals based on bit width.
e.g., clip_range([300], 8) = [127] since int8 have range [-128, 127]
Args:
vals (np.array): float representation of the integer values
bit_width (int): number of desired bits for vals
Returns:
np.array : clipped vals
"""
# Numpy integer calculation does not do saturation. Implement here
min_val = -2**(bit_width - 1)
max_val = 2**(bit_width - 1) - 1
if vals.max() > max_val or vals.min() < min_val:
logging.info(f"WARNING: integer overflow!")
return np.clip(vals, min_val, max_val)
def quantize_data(data, scale, zero_point=0, bit_width=8):
"""Quantize the data to integer type with desired bit width.
The quantized data is represented using float since integer calculation in
numpy may differ from other implementations (e.g., no integer saturation
protection in numpy)
Args:
data (np.array): float data
scale (float): quantization scale of the data
zero_point (integer): quantization zero point of the data
bit_width (int): number of representative bits for vals
Returns:
np.array : quantized data in float but clipped range
"""
vals = np.round(data / scale) + zero_point
return clip_range(vals, bit_width)
def dequantize_data(quantized_data, scale, zero_point=0):
"""Dequantize the data to integer type with desired bit width.
Args:
quantized_data (np.array): quantized data
scale (float): quantization scale of the data
zero_point (integer): quantization zero point of the data
Returns:
np.array : dequantized data
"""
return scale * (quantized_data - zero_point)
def change_quantization_settings_8to16(tensor, buffers):
"""Change the quantization seeting of the tensor from int8 to int16"""
if (tensor.quantization.quantizedDimension != 0):
raise RuntimeError(
"Only layer level quantization is supported. Per channel quantization is not supported now"
)
scale = tensor.quantization.scale[0]
zero_point = tensor.quantization.zeroPoint[0]
# Set MAX_INT8 from 127 to 128 to compromise the range precision loss due to int8 quantization
MIN_INT8, MAX_INT8 = -128, 128
# Narrow range (-min == max) is used for symmetrical quantization
MIN_INT16, MAX_INT16 = -32767, 32767
# Asymmertical quantized: scale * (qmax - zero_point) = rmax
rmax = scale * (MAX_INT8 - zero_point)
rmin = scale * (MIN_INT8 - zero_point)
# symmertical quantized: scale * qmax = rmax
scale_16 = max(abs(rmax), abs(rmin)) / abs(MIN_INT16)
# Change scale: Symmetrical Quantized
tensor.quantization.scale = [scale_16]
tensor.quantization.zeroPoint = [0]
# requantize the buffer data to int16 if necessary
tensor_buffer = buffers[tensor.buffer]
if type(tensor_buffer.data) != type(None):
expected_buffer_size = np.prod(tensor.shape)
data = np.frombuffer(tensor_buffer.data, dtype=np.int8)
# Different ops may share one buffer. No need to requantize the buffer
# if the buffer has already been processed to int16 (2 bytes)
if data.nbytes == expected_buffer_size * 2:
return
elif data.nbytes != expected_buffer_size:
raise RuntimeError(
f"Bias buffer size {data.nbytes} does not match the expected size {expected_buffer_size * 4}"
)
dequantized_data = dequantize_data(data, tensor.quantization.scale,
tensor.quantization.zeroPoint)
int16_data = quantize_data(dequantized_data, scale_16, 0,
16).astype(np.int16)
tensor_buffer.data = int16_data.tobytes()
def change_activation_tensor_8to16(tensor, buffers):
"""Change the quantization setting of a activation tensor from int8 to int16"""
if tensor.type == TENSOR_TYPE_CODE[np.int8]:
change_quantization_settings_8to16(tensor, buffers)
tensor.type = TENSOR_TYPE_CODE[np.int16]
logging.info(f"Set {tensor.name} from int8 to int16 ")
def requantize_bias_perlayer(buffers, input, weight, bias):
"""Bias is layer wise quantized """
bias_buffer = buffers[bias.buffer]
bias_scale = bias.quantization.scale[0]
bias_zero_pt = bias.quantization.zeroPoint[0]
data = np.frombuffer(bias_buffer.data, dtype=np.int32)
# change scale and zero point
bias_scale_int64 = (input.quantization.scale[0] *
weight.quantization.scale[0])
bias_zero_pt_int64 = 0 # symmetrical quantized
bias.type = TENSOR_TYPE_CODE[np.int64]
bias.quantization.scale = [bias_scale_int64]
bias.quantization.zeroPoint = [bias_zero_pt_int64]
expected_buffer_size = bias.shape[0] # bias has only one dimension
# Different ops may share one buffer. No need to requantize the buffer
# if the buffer has already been processed to int64 (8 bytes)
if data.nbytes == expected_buffer_size * 8:
return
elif data.nbytes != expected_buffer_size * 4:
raise RuntimeError(
f"Bias buffer size {data.nbytes} does not match the expected size {expected_buffer_size * 4}"
)
dequantized_data = dequantize_data(data, bias_scale, bias_zero_pt)
int64_data = quantize_data(dequantized_data, bias_scale_int64,
bias_zero_pt_int64, 64).astype(np.int64)
bias_buffer.data = int64_data.tobytes()
def requantize_bias_perchannel(buffers, input, weight, bias):
"""Bias is channel wise quantized. Requantize bias one by one """
bias_buffer = buffers[bias.buffer]
data = np.frombuffer(bias_buffer.data, dtype=np.int32)
expected_buffer_size = bias.shape[0] # bias has only one dimension
# whether to requantize the bias buffer, False if the buffer has already been requantized
requantize_buffer = True
# Different ops may share one buffer. No need to requantize the buffer
# if the buffer has already been processed to int64 (8 bytes)
if data.nbytes == expected_buffer_size * 8:
requantize_buffer = False
elif data.nbytes != expected_buffer_size * 4:
raise RuntimeError(
f"Bias buffer size {data.nbytes} does not match the expected size {expected_buffer_size * 4}"
)
if len(bias.quantization.scale) != len(weight.quantization.scale):
raise RuntimeError(
f" Per channel quantization requires number of bias scales ({len(bias.quantization.scale)}),\
equals to number of weight scales ({len(weight.quantization.scale)}) "
)
requantized_data = []
requantized_scales = []
requantized_zero_points = []
for element_data, bias_scale, weight_scale, bias_zero_point in zip(
data, bias.quantization.scale, weight.quantization.scale,
bias.quantization.zeroPoint):
bias_scale_int64 = (input.quantization.scale[0] * weight_scale)
bias_zero_pt_int64 = 0 # symmetrical quantized
requantized_scales.append(bias_scale_int64)
requantized_zero_points.append(bias_zero_pt_int64)
if requantize_buffer:
dequantized_data = dequantize_data(element_data, bias_scale,
bias_zero_point)
int64_data = quantize_data(dequantized_data, bias_scale_int64,
bias_zero_pt_int64, 64).astype(np.int64)
requantized_data.append(int64_data)
bias.type = TENSOR_TYPE_CODE[np.int64]
bias.quantization.scale = requantized_scales
bias.quantization.zeroPoint = requantized_zero_points
if requantize_buffer:
bias_buffer.data = np.array(requantized_data).tobytes()
def set_bias_type_int64(buffers, input, weight, bias):
"""Set the bias tensor quantization setting from int32 to int64
Args:
buffers (list): buffers for the model
input (Tensor): the corresponding input tensor for the bias
weight (Tensor): the corresponding weight tensor for the bias
bias (Tensor): the bias tensor that need to be modified
"""
if bias.type == TENSOR_TYPE_CODE[np.int32]:
if len(bias.quantization.scale) == 1:
requantize_bias_perlayer(buffers, input, weight, bias)
else:
requantize_bias_perchannel(buffers, input, weight, bias)
def requantize_fully_connected(tensors, buffers, op):
"""Requantize the fully connected op from int8 to int16
Note: CONV_2D and DEPTHWISE_CONV_2D also use this requantize function since they all share the same input/weight/bias configuration.
See tensorflow/lite/micro/kernels/fully_connected_common.cc
tflite_micro/tensorflow/lite/micro/kernels/depthwise_conv_common.cc
tflite_micro/tensorflow/lite/micro/kernels/conv_common.cc
"""
# Indices are from tensorflow/lite/micro/kernels/fully_connected_common.cc
input_tensor = tensors[op.inputs[0]]
# weight stays the same, no change needed
weight_tensor = tensors[op.inputs[1]]
output_tensor = tensors[op.outputs[0]]
change_activation_tensor_8to16(input_tensor, buffers)
change_activation_tensor_8to16(output_tensor, buffers)
# if the bias does not exist, op.inputs[2] == -1
if op.inputs[2] != -1:
bias_tensor = tensors[op.inputs[2]]
set_bias_type_int64(buffers, input_tensor, weight_tensor, bias_tensor)
def requantize_unidirectional_sequence_lstm(tensors, buffers, op):
"""Requantize the unidirectonal sequance lstm op from int8 to int16 """
input_tensor = tensors[op.inputs[0]]
hidden_state_tensor = tensors[op.inputs[18]]
output_tensor = tensors[op.outputs[0]]
# Indices are from tensorflow/lite/micro/kernels/lstm_shared.h
input_weights_idx = [1, 2, 3, 4]
recurrent_weights_idx = [5, 6, 7, 8]
bias_idx = [12, 13, 14, 15]
change_activation_tensor_8to16(input_tensor, buffers)
change_activation_tensor_8to16(hidden_state_tensor, buffers)
change_activation_tensor_8to16(output_tensor, buffers)
for weight_id, bias_id in zip(input_weights_idx, bias_idx):
weight_tensor = tensors[op.inputs[weight_id]]
bias_tensor = tensors[op.inputs[bias_id]]
set_bias_type_int64(buffers, input_tensor, weight_tensor, bias_tensor)
# recurrent weights have no associated biases
for weight_id in recurrent_weights_idx:
weight_tensor = tensors[op.inputs[weight_id]]
def requantize_softmax(tensors, buffers, op):
"""Requantize the softmax op from int8 to int16"""
input_tensor = tensors[op.inputs[0]]
output_tensor = tensors[op.outputs[0]]
# Change input type
change_activation_tensor_8to16(input_tensor, buffers)
# Output range is always [0,1]
if output_tensor.type == TENSOR_TYPE_CODE[np.int8]:
# change quantization settings
output_tensor.quantization.scale = [1 / 32768]
output_tensor.quantization.zeroPoint = [0]
# Set tensor type
output_tensor.type = TENSOR_TYPE_CODE[np.int16]
logging.info(f"Set {output_tensor.name} from int8 to int16 ")
def requantize_transpose_conv(tensors, buffers, op):
"""Requantize the transpose conv op from int8 to int16"""
# Indices are from tensorflow/lite/micro/kernels/transpose_conv.cc
input_tensor = tensors[op.inputs[2]]
# weight stays the same, no change needed
weight_tensor = tensors[op.inputs[1]]
output_tensor = tensors[op.outputs[0]]
change_activation_tensor_8to16(input_tensor, buffers)
change_activation_tensor_8to16(output_tensor, buffers)
# if the bias does not exist, op.inputs[2] == -1
if len(op.inputs) > 3:
if op.inputs[3] != -1:
bias_tensor = tensors[op.inputs[3]]
set_bias_type_int64(buffers, input_tensor, weight_tensor, bias_tensor)