import json import os import re from huggingface_hub import snapshot_download import torch from safetensors import safe_open from transformers import AutoProcessor, MllamaForConditionalGeneration, AutoTokenizer, AutoModelForCausalLM, AutoConfig #total_layers=80 # 70B model has 80 layers total_layers=32 # 8B model has 32 layers #cross_attention_layers = [3, 8, 13, 18, 23, 28, 33, 38, 43, 48, 53, 58, 63, 68, 73, 78, 83, 88, 93, 98] # 90B cross_attention_layers = [3, 8, 13, 18, 23, 28, 33, 38] # 11b # Update paths - switch source and target target_model = "meta-llama/Llama-3.1-8B-Instruct" print(f"Target model: {target_model}") source_model = "meta-llama/Llama-3.2-11B-Vision-Instruct" print(f"Source model: {source_model}") def create_inverse_layer_mapping(total_layers=total_layers, cross_attn_layers=cross_attention_layers): """ Creates a mapping from 90B/11B layer indices to 70B/8B layer indices. """ mapping = {} removed_layers = [] #for i in range(100): # 90B has 100 layers (80 + 20 cross-attention layers) for i in range(40): # 11B has 40 layers (32 + 8 cross-attention layers) if i not in cross_attn_layers and len(mapping) < total_layers: mapping[i] = len(mapping) else: removed_layers.append(i) return mapping, removed_layers def load_sharded_state_dict(model_id): """ Load a sharded state dict from either a local directory or a Hugging Face model ID. Args: model_id: Either a local path or a Hugging Face model ID (e.g., "meta-llama/Llama-2-7b") Returns: dict: The loaded state dictionary """ # Check if model_id is a local path if os.path.isdir(model_id): model_dir = model_id else: # If not local, assume it's a Hugging Face model ID and download it print(f"Downloading model from Hugging Face: {model_id}") model_dir = snapshot_download( model_id, allow_patterns=["*.safetensors*", "*.json"], ignore_patterns=["*.bin", "*.md", "*.py"] ) # Load the index file index_file = os.path.join(model_dir, 'model.safetensors.index.json') if not os.path.exists(index_file): raise FileNotFoundError(f"Could not find index file: {index_file}") with open(index_file, 'r') as f: index_data = json.load(f) weight_map = index_data['weight_map'] state_dict = {} shard_to_params = {} # Group parameters by shard file for param_name, shard_file in weight_map.items(): if shard_file not in shard_to_params: shard_to_params[shard_file] = [] shard_to_params[shard_file].append(param_name) # Load parameters from each shard for shard_file, params_in_shard in shard_to_params.items(): shard_path = os.path.join(model_dir, shard_file) with safe_open(shard_path, framework="pt", device="cpu") as f: for name in params_in_shard: state_dict[name] = f.get_tensor(name) return state_dict def compare_model_states(model, new_state_dict): current_state = model.state_dict() unchanged_params = [] changed_params = [] missing_params = [] for name, param in current_state.items(): if name not in new_state_dict: missing_params.append(name) elif torch.equal(param, new_state_dict[name]): unchanged_params.append(name) else: sum_abs_diff = torch.sum(torch.abs(param - new_state_dict[name])) changed_params.append({'name': name, 'sum_abs_diff': sum_abs_diff.item()}) return { 'unchanged': unchanged_params, 'changed': changed_params, 'missing': missing_params } layer_mapping, removed_layers = create_inverse_layer_mapping() # Load source (90B) state dict source_state_dict = load_sharded_state_dict(source_model) # Create new state dict for target model (70B) target_state_dict = {} # Convert parameter names and copy tensors for name, param in source_state_dict.items(): # Skip parameters that aren't part of the language model layers if not (name.startswith('language_model.model.layers.') or name == 'language_model.model.embed_tokens.weight' or name == 'language_model.lm_head.weight' or name == 'language_model.model.norm.weight'): continue if name.startswith('language_model.model.layers.'): # Handle layer parameters layer_match = re.match(r'language_model\.model\.layers\.(\d+)\.(.+)', name) if layer_match: source_layer = int(layer_match.group(1)) if source_layer in layer_mapping: target_layer = layer_mapping[source_layer] new_name = f'model.layers.{target_layer}.{layer_match.group(2)}' target_state_dict[new_name] = param elif name == 'language_model.lm_head.weight': # Handle lm_head weight target_state_dict['lm_head.weight'] = param elif name == 'language_model.model.embed_tokens.weight': # Handle embeddings - keep original vocab size for 70B model original_embed_size = 128256 target_state_dict['model.embed_tokens.weight'] = param[:original_embed_size, :] elif name == 'language_model.model.norm.weight': # Handle model norm weight target_state_dict['model.norm.weight'] = param #write target_state_dict keys to file for verification with open('target_state_dict.txt', 'w') as f: f.write('\n'.join(target_state_dict.keys())) config = AutoConfig.from_pretrained(target_model) model = AutoModelForCausalLM.from_pretrained( None, config=config, state_dict = target_state_dict, torch_dtype=torch.bfloat16, ) ''' origmodel = AutoModelForCausalLM.from_pretrained( target_model, torch_dtype=torch.bfloat16, ) result = compare_model_states(model, origmodel.state_dict()) print("Unchanged parameters:", len(result['unchanged'])) print("Changed parameters:", len(result['changed'])) print("Missing parameters:", len(result['missing'])) #write result to file with open('result.txt', 'w') as f: f.write(json.dumps(result, indent=2)) ''' processor = AutoTokenizer.from_pretrained(target_model) #8b/70b #processor = AutoProcessor.from_pretrained(source_model) #11b/90b model.save_pretrained("Llama-3.2-8B-extracted") processor.save_pretrained("Llama-3.2-8B-extracted")