Lightweight Compression of Intermediate Neural Network Features for Collaborative Intelligence

import numpy as np

def modified_entropy_constrained_quantizer(x, N, b, lambda_, c_min, c_max):
    # Step 1: Clip the training samples
    x_clipped = np.clip(x, c_min, c_max)
    
    # Step 2: Initialize reconstruction values
    x_hat = np.linspace(c_min, c_max, N)
    
    # Main loop
    while True:
        # Step 3: Assign each training sample to quantizer bin
        distances = np.abs(x_clipped[:, np.newaxis] - x_hat)**2 - lambda_ * b
        bin_assignments = np.argmin(distances, axis=1)
        
        # Step 4: Recompute reconstruction values
        x_hat_new = np.zeros_like(x_hat)
        x_hat_new[0] = c_min
        x_hat_new[-1] = c_max
        
        for n in range(1, N-1):
            B_n = x_clipped[bin_assignments == n]
            if len(B_n) > 0:
                x_hat_new[n] = np.mean(B_n)
            else:
                x_hat_new[n] = (x_hat_new[n-1] + x_hat_new[n+1]) / 2
        
        # Check for convergence
        if np.allclose(x_hat, x_hat_new):
            break
        
        x_hat = x_hat_new
    
    # Step 5 is implicitly done in the loop above
    
    # Step 6: Compute quantizer decision thresholds
    t = (x_hat[:-1] + x_hat[1:]) / 2 + lambda_ * (b[:-1] - b[1:]) / (2 * (x_hat[1:] - x_hat[:-1]))
    
    return x_hat, t

# Example usage:
M = 10000  # Number of training samples
N = 8      # Number of quantizer bins
b = np.arange(N-1, 0, -1)  # Codeword lengths
lambda_ = 0.1  # Lagrange multiplier
c_min, c_max = 0, 1  # Activation clipping range

# Generate some random training data
np.random.seed(0)
x = np.random.randn(M)

# Run the algorithm
x_hat, t = modified_entropy_constrained_quantizer(x, N, b, lambda_, c_min, c_max)

print("Reconstruction values:", x_hat)
print("Decision thresholds:", t)
import numpy as np
import torch

# Assuming we are working with PyTorch

def clip_feature_tensors(tensors, cmin, cmax):
    return torch.clamp(tensors, min=cmin, max=cmax)

def quantize(tensors, cmin, cmax, num_levels):
    scale = (cmax - cmin) / (num_levels - 1)
    tensors_clipped = clip_feature_tensors(tensors, cmin, cmax)
    quantized = torch.round((tensors_clipped - cmin) / scale)
    return quantized

def binarize_quantized(quantized, num_levels):
    # Example using truncated unary binarization
    binarized = []
    for q in quantized.view(-1):
        binary_str = '1' * int(q.item()) + '0'
        if int(q.item()) == num_levels - 1:
            binary_str = '1' * int(q.item())
        binarized.append(binary_str)
    return binarized

def entropy_coding(binarized_data):
    # Simplified entropy coding implementation
    encoded_data = "".join(binarized_data)
    return encoded_data

def entropy_decoding(encoded_data):
    # Decoding function for entropy coded data
    return encoded_data

def inverse_quantize(quantized, cmin, cmax, num_levels):
    scale = (cmax - cmin) / (num_levels - 1)
    reconstructed = quantized * scale + cmin
    return reconstructed

def compression_pipeline(feature_tensors, cmin, cmax, num_levels):
    quantized = quantize(feature_tensors, cmin, cmax, num_levels)
    binarized = binarize_quantized(quantized, num_levels)
    encoded_data = entropy_coding(binarized)
    return encoded_data

def decompression_pipeline(encoded_data, cmin, cmax, num_levels):
    binarized = entropy_decoding(encoded_data)
    # Convert binarized back to quantized values (requires decoding logic)
    quantized = torch.tensor([int(bin_str.count('1')) for bin_str in binarized])
    reconstructed = inverse_quantize(quantized, cmin, cmax, num_levels)
    return reconstructed

# Example usage
# Assuming 'model' is a pre-trained DNN model (e.g., ResNet-50)

layer_to_compress = model.layer21 # Example layer
features = layer_to_compress(input_data)  # Input data processed up to layer 21

# Compression phase on edge device
cmin, cmax = 0.0, 10.0  # Example clipping range, ideally determined empirically or via model
num_levels = 4  # Example for 2-bit quantization
compressed_data = compression_pipeline(features, cmin, cmax, num_levels)

# Transmission to cloud and decompression phase
reconstructed_features = decompression_pipeline(compressed_data, cmin, cmax, num_levels)

# Continue with the rest of the model for inference
output = model.layer22_to_end(reconstructed_features)